[
https://issues.apache.org/jira/browse/FLINK-39150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18062010#comment-18062010
]
Lei Yang commented on FLINK-39150:
----------------------------------
[~piotrprz] Thanks for pointing this out. I’ve verified it locally, and the
issue still exists in the scenario you mentioned. The root cause is that the
current thread’s class loader does not include the user JAR. We should use both
the user class loader and the table planner class loader together to
deserialize it. I will fix this issue thoroughly.
> Join operator crashes jobs when using custom types or custom type serializers
> -----------------------------------------------------------------------------
>
> Key: FLINK-39150
> URL: https://issues.apache.org/jira/browse/FLINK-39150
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.0.0, 2.2.0
> Reporter: Piotr Przybylski
> Assignee: Lei Yang
> Priority: Major
> Labels: pull-request-available
>
> Flink fails to start a job when Table API's join is used on a Table that
> references a custom user type, like a POJO or a custom {{TypeSerializer}}
> attached to some native Java type's TypeInformation.
> Example:
> {code:java}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> Table table1 = tEnv.fromDataStream(
> env.fromData(Row.of(1))
> .returns(ROW_NAMED(new String[]{"id"}, INT))
> );
> // note that for this to fail TestClass must *not* be in org.apache.flink
> namespace
> Table table2 = tEnv.fromDataStream(
> env.fromData(Row.of(1, new TestClass()))
> .returns(ROW_NAMED(
> new String[]{"id2", "value"},
> INT,
> new PojoTypeInfo<>(TestClass.class, new
> ArrayList<>())))
> );
> tEnv.toDataStream(table1.leftOuterJoin(table2, $("id").isEqual($("id2"))))
> .sinkTo(new DiscardingSink<>());{code}
> Error from logs:
> {code:java}
> ERROR
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler
> [] - Failed to handle job event
> ExecutionJobVertexFinishedEvent{vertexId=cbc357ccb763df2852fee8c4fc7d55f2,
> resultInfos={52fee8c522470986cbc357ccdd2d92bc=org.apache.flink.runtime.scheduler.adaptivebatch.AllToAllBlockingResultInfo@30d3445e}}.
> java.lang.RuntimeException: Failed to deserialize AdaptiveJoin instance.
> Please check whether the flink-table-planner-loader.jar is in the classpath.
> at
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.lazyInitialize(AdaptiveJoinOperatorFactory.java:123)
> ~[classes/:?]
> at
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.checkAndLazyInitialize(AdaptiveJoinOperatorFactory.java:90)
> ~[classes/:?]
> at
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.getJoinType(AdaptiveJoinOperatorFactory.java:72)
> ~[classes/:?]
> at
> org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.tryOptimizeAdaptiveJoin(AdaptiveBroadcastJoinOptimizationStrategy.java:105)
> ~[classes/:?]
> at
> org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy.visitDownstreamAdaptiveJoinNode(BaseAdaptiveJoinOperatorOptimizationStrategy.java:90)
> ~[classes/:?]
> at
> org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.onOperatorsFinished(AdaptiveBroadcastJoinOptimizationStrategy.java:74)
> ~[classes/:?]
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizer.onOperatorsFinished(StreamGraphOptimizer.java:72)
> ~[classes/:?]
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.tryOptimizeStreamGraph(DefaultAdaptiveExecutionHandler.java:118)
> ~[classes/:?]{code}
> This worked in Flink 1.20, but started failing in 2.0 when type information
> started being serialized as part of {{{}AdaptiveJoinOperatorFactory{}}},
> which can be deserialized only with classes provided by
> {{SubmoduleClassLoader}} (FLINK-36634).
> Full test case: [https://github.com/apache/flink/pull/27663]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)