[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834798#comment-17834798
 ] 

Zhu Zhu edited comment on FLINK-32513 at 4/8/24 6:27 AM:
---------------------------------------------------------

{{Transformation}} is not a public interface, it is an @Internal class. 
Ideally, kafka connectors should not directly manipulate {{Transformation}}.
Yet we may try to find a workaround to avoid break existing kafka connectors.


was (Author: zhuzh):
{{Transformation}} is not a public interface, it is an @Internal class. 
Ideally, kafka connectors should not directly manipulate {{Transformation}}.
Production-wise, we may take a look whether there is a workaround to avoid 
break existing kafka connectors.

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32513
>                 URL: https://issues.apache.org/jira/browse/FLINK-32513
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.15.3, 1.16.1, 1.17.1
>         Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>            Reporter: Vladislav Keda
>            Assignee: Jeyhun Karimov
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.18.2, 1.20.0, 1.19.1
>
>         Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.lambda$existsUnboundedSource$1(StreamGraphGenerator.java:509)
>       at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator$$Lambda$1988.1989814391.test(Unknown
>  Source:-1)
>       at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
>       at 
> java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632)
>       at 
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
>       at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
>       at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>       at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>       at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>       at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528)
>       at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.existsUnboundedSource(StreamGraphGenerator.java:506)
>       at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.shouldExecuteInBatchMode(StreamGraphGenerator.java:487)
>       at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:313)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2248)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2225)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2052){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to