[
https://issues.apache.org/jira/browse/FLINK-39150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18061513#comment-18061513
]
Lei Yang commented on FLINK-39150:
----------------------------------
[~piotrprz] [~zhuzh] The root cause of the Adaptive Join deserialization
failure is that `PlannerComponentClassLoader` applies a strict whitelist-based
routing strategy and does **not** automatically fall back to the parent
`ClassLoader` for non-whitelisted packages.
- For **whitelisted packages** (e.g., `org.apache.flink.*`), it follows the
configured lookup order (parent-first/component-first) and can fall back
accordingly.
- For **non-whitelisted packages** (e.g., custom user POJOs), it defaults to a
**component-only** lookup. Since these classes exist only in the user JAR
(parent) and not in the isolated `table-planner-*.jar` (component),
deserialization fails with `ClassNotFoundException`.
I will fix this by changing the ClassLoader strategy so that for
non-whitelisted packages, it will automatically fall back to the
parent/component `ClassLoader` when resolving user-defined classes, ensuring
custom user types can be resolved during the AdaptiveJoin deserialization.
> Join operator crashes jobs when using custom types or custom type serializers
> -----------------------------------------------------------------------------
>
> Key: FLINK-39150
> URL: https://issues.apache.org/jira/browse/FLINK-39150
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.0.0, 2.2.0
> Reporter: Piotr Przybylski
> Assignee: Lei Yang
> Priority: Major
> Labels: pull-request-available
>
> Flink fails to start a job when Table API's join is used on a Table that
> references a custom user type, like a POJO or a custom {{TypeSerializer}}
> attached to some native Java type's TypeInformation.
> Example:
> {code:java}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> Table table1 = tEnv.fromDataStream(
> env.fromData(Row.of(1))
> .returns(ROW_NAMED(new String[]{"id"}, INT))
> );
> // note that for this to fail TestClass must *not* be in org.apache.flink
> namespace
> Table table2 = tEnv.fromDataStream(
> env.fromData(Row.of(1, new TestClass()))
> .returns(ROW_NAMED(
> new String[]{"id2", "value"},
> INT,
> new PojoTypeInfo<>(TestClass.class, new
> ArrayList<>())))
> );
> tEnv.toDataStream(table1.leftOuterJoin(table2, $("id").isEqual($("id2"))))
> .sinkTo(new DiscardingSink<>());{code}
> Error from logs:
> {code:java}
> ERROR
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler
> [] - Failed to handle job event
> ExecutionJobVertexFinishedEvent{vertexId=cbc357ccb763df2852fee8c4fc7d55f2,
> resultInfos={52fee8c522470986cbc357ccdd2d92bc=org.apache.flink.runtime.scheduler.adaptivebatch.AllToAllBlockingResultInfo@30d3445e}}.
> java.lang.RuntimeException: Failed to deserialize AdaptiveJoin instance.
> Please check whether the flink-table-planner-loader.jar is in the classpath.
> at
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.lazyInitialize(AdaptiveJoinOperatorFactory.java:123)
> ~[classes/:?]
> at
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.checkAndLazyInitialize(AdaptiveJoinOperatorFactory.java:90)
> ~[classes/:?]
> at
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.getJoinType(AdaptiveJoinOperatorFactory.java:72)
> ~[classes/:?]
> at
> org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.tryOptimizeAdaptiveJoin(AdaptiveBroadcastJoinOptimizationStrategy.java:105)
> ~[classes/:?]
> at
> org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy.visitDownstreamAdaptiveJoinNode(BaseAdaptiveJoinOperatorOptimizationStrategy.java:90)
> ~[classes/:?]
> at
> org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.onOperatorsFinished(AdaptiveBroadcastJoinOptimizationStrategy.java:74)
> ~[classes/:?]
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizer.onOperatorsFinished(StreamGraphOptimizer.java:72)
> ~[classes/:?]
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.tryOptimizeStreamGraph(DefaultAdaptiveExecutionHandler.java:118)
> ~[classes/:?]{code}
> This worked in Flink 1.20, but started failing in 2.0 when type information
> started being serialized as part of {{{}AdaptiveJoinOperatorFactory{}}},
> which can be deserialized only with classes provided by
> {{SubmoduleClassLoader}} (FLINK-36634).
> Full test case: [https://github.com/apache/flink/pull/27663]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)