[
https://issues.apache.org/jira/browse/FLINK-35766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Weijie Guo reassigned FLINK-35766:
----------------------------------
Assignee: Junrui Li
> When the job contains many YieldingOperatorFactory instances, compiling the
> JobGraph hangs
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-35766
> URL: https://issues.apache.org/jira/browse/FLINK-35766
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Reporter: Junrui Li
> Assignee: Junrui Li
> Priority: Major
>
> When a job contains YieldingOperatorFactory instances, the time complexity of
> compiling the JobGraph is very high (with a complexity of O(N!)). This leads
> to the job compilation hanging on creating chains when there are many
> YieldingOperatorFactory instances (e.g., more than 30).
> This is a very rare bug, but we have users who use SQL that contains many
> LookupJoins that use YieldingOperatorFactory in the production environment. A
> simple reproducible case is as follows:
> {code:java}
> @Test
> void test() {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(1);
> env.fromSource(
> new NumberSequenceSource(0, 10),
> WatermarkStrategy.noWatermarks(), "input")
> .map((x) -> x)
> // add 32 YieldingOperatorFactory
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .transform(
> "test", BasicTypeInfo.LONG_TYPE_INFO, new
> YieldingTestOperatorFactory<>())
> .addSink(new DiscardingSink<>());
> env.getStreamGraph().getJobGraph();
> } {code}
> The reason is that there is no caching when determining edge chainable,
> leading to repeated backward traversal each time a YiedlingOperatorFactor is
> encountered onwards (see code:
> [https://github.com/apache/flink/blob/90fc679df073754b93eb5c220373daad7dca0a32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L1602]).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)