Piotr Przybylski created FLINK-39150:
----------------------------------------
Summary: Join operator crashes jobs when using custom types or
custom type serializers
Key: FLINK-39150
URL: https://issues.apache.org/jira/browse/FLINK-39150
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 2.2.0, 2.0.0
Reporter: Piotr Przybylski
Flink fails to start a job when Table API's join is used on a Table that
references a custom user type, like a POJO or a custom {{TypeSerializer}}
attached to some native Java type's TypeInformation.
Example:
{code:java}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table table1 = tEnv.fromDataStream(
env.fromData(Row.of(1))
.returns(ROW_NAMED(new String[]{"id"}, INT))
);
Table table2 = tEnv.fromDataStream(
env.fromData(Row.of(1, new TestClass()))
.returns(ROW_NAMED(
new String[]{"id2", "value"},
INT,
new PojoTypeInfo<>(FlinkNamespaceTestClass.class, new
ArrayList<>())))
);
tEnv.toDataStream(table1.leftOuterJoin(table2, $("id").isEqual($("id2"))))
.sinkTo(new DiscardingSink<>());{code}
Error from logs:
{code:java}
ERROR
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler
[] - Failed to handle job event
ExecutionJobVertexFinishedEvent{vertexId=cbc357ccb763df2852fee8c4fc7d55f2,
resultInfos={52fee8c522470986cbc357ccdd2d92bc=org.apache.flink.runtime.scheduler.adaptivebatch.AllToAllBlockingResultInfo@30d3445e}}.
java.lang.RuntimeException: Failed to deserialize AdaptiveJoin instance. Please
check whether the flink-table-planner-loader.jar is in the classpath.
at
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.lazyInitialize(AdaptiveJoinOperatorFactory.java:123)
~[classes/:?]
at
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.checkAndLazyInitialize(AdaptiveJoinOperatorFactory.java:90)
~[classes/:?]
at
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.getJoinType(AdaptiveJoinOperatorFactory.java:72)
~[classes/:?]
at
org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.tryOptimizeAdaptiveJoin(AdaptiveBroadcastJoinOptimizationStrategy.java:105)
~[classes/:?]
at
org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy.visitDownstreamAdaptiveJoinNode(BaseAdaptiveJoinOperatorOptimizationStrategy.java:90)
~[classes/:?]
at
org.apache.flink.table.runtime.strategy.AdaptiveBroadcastJoinOptimizationStrategy.onOperatorsFinished(AdaptiveBroadcastJoinOptimizationStrategy.java:74)
~[classes/:?]
at
org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizer.onOperatorsFinished(StreamGraphOptimizer.java:72)
~[classes/:?]
at
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.tryOptimizeStreamGraph(DefaultAdaptiveExecutionHandler.java:118)
~[classes/:?]{code}
This worked in Flink 1.20, but started failing in 2.0 when type information
started being serialized as part of {{{}AdaptiveJoinOperatorFactory{}}}, which
can be deserialized only with classes provided by {{SubmoduleClassLoader}}
(FLINK-36634).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)