Junrui Li created FLINK-35766:
---------------------------------
Summary: When the job contains many YieldingOperatorFactory
instances, compiling the JobGraph hangs
Key: FLINK-35766
URL: https://issues.apache.org/jira/browse/FLINK-35766
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Reporter: Junrui Li
When a job contains YieldingOperatorFactory instances, the time complexity of
compiling the JobGraph is very high (with a complexity of O(N!)). This leads to
the job compilation hanging on creating chains when there are many
YieldingOperatorFactory instances (e.g., more than 30).
This is a very rare bug, but we have users who use SQL that contains many
LookupJoins that use YieldingOperatorFactory in the production environment. A
simple reproducible case is as follows:
{code:java}
@Test
void test() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
env.fromSource(
new NumberSequenceSource(0, 10),
WatermarkStrategy.noWatermarks(), "input")
.map((x) -> x)
// add 32 YieldingOperatorFactory
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.transform(
"test", BasicTypeInfo.LONG_TYPE_INFO, new
YieldingTestOperatorFactory<>())
.addSink(new DiscardingSink<>());
env.getStreamGraph().getJobGraph();
} {code}
The reason is that there is no caching when determining edge chainable, leading
to repeated backward traversal each time a YiedlingOperatorFactor is
encountered onwards (see code:
[https://github.com/apache/flink/blob/90fc679df073754b93eb5c220373daad7dca0a32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L1602]).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)