[
https://issues.apache.org/jira/browse/BEAM-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548520#comment-17548520
]
Danny McCormick commented on BEAM-9759:
---------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/20152
> Pipeline creation with large number of shards/streams takes long time
> ---------------------------------------------------------------------
>
> Key: BEAM-9759
> URL: https://issues.apache.org/jira/browse/BEAM-9759
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kinesis, runner-dataflow
> Affects Versions: 2.19.0
> Reporter: Sebastian Graca
> Priority: P3
>
> We are processing multiple Kinesis streams using pipelines running on
> {{DataflowRunner}}. The time needed to start such pipeline from a pipeline
> definition (execution of {{org.apache.beam.sdk.Pipeline.run()}} method) takes
> considerable amount of time. In our case:
> * a pipeline that consumes data from 196 streams (237 shards in total)
> starts in 7 minutes
> * a pipeline that consumes data from 111 streams (261 shards in total)
> starts in 4 minutes
> I've been investigating this and found out that when {{Pipeline.run}} is
> invoked, the whole pipeline graph is traversed and serialized so it can be
> passed to the Dataflow backend. Here's part of the stacktrace that shows this
> traversal:
> {code:java}
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1252)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getRecords$2(SimplifiedKinesisClient.java:137)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:134)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:119)
> at
> org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:195)
> at
> org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
> at
> org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
> at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:88)
> at
> org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:87)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
> at
> org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1630)
> at
> org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1627)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:494)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:433)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:192)
> at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795)
> at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
> {code}
> As you can see, during serialization,
> {{org.apache.beam.sdk.io.kinesis.KinesisSource.split}} method is called. This
> method finds all shards for the stream and also validates each shard by
> reading from it. As this process is sequential it takes considerable time
> that is dependent both on the number of streams (which has the greatest
> impact) and also the number of shards. Even with a single stream that has
> large number of shards, the pipeline startup time will be noticeable.
> I wonder if it's possible to optimise this somehow?
> One way could be to parallelise the whole process, both on the stream and
> shard level. As this is split between Beam core and KinesisIO this can be
> complex.
> Another solution, that I could think of, is having the information about
> valid stream shards ready before calling {{Pipeline.run}}. It there were a
> way to create a {{KinesisIO.Read}} operation in such a way that it cached
> shard information and enabled a client code to control the parallelisation of
> this operation this would allow for a great reduction of the startup time.
> I was able to make a PoC to verify how much parallelisation of this process
> can improve startup time and just by implementing this on the stream level I
> was able to reduce the startup time from 7 minutes to 2.5 minutes.
> Unfortunately this was a really hacky solution and I don't consider it to be
> a one that should be implemented - I hacked the AWS client used by KinesisIO
> to cache all responses from server and called {{split}} method in parallel on
> all sources before executing {{Pipeline.run}}. However this proves that
> there's a huge room for improvement for pipelines that deal with multiple
> streams and/or shards.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)