[ 
https://issues.apache.org/jira/browse/BEAM-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095092#comment-17095092
 ] 

Reuven Lax commented on BEAM-9759:
----------------------------------

The best solution would be to create a new SplittableDoFn version of the 
Kinesis runner. This would have several advantages:
 # It could support dynamic changes (at run time) of the list of Kinesis 
topics. I believe this is a major reason that you currently need to update the 
pipeline so often, and this would remove that need.

     2. The splitting could then happen at run time instead of 
graph-construction time, and could also be parallelized.

 

> Pipeline creation with large number of shards/streams takes long time
> ---------------------------------------------------------------------
>
>                 Key: BEAM-9759
>                 URL: https://issues.apache.org/jira/browse/BEAM-9759
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kinesis, runner-dataflow
>    Affects Versions: 2.19.0
>            Reporter: Sebastian Graca
>            Priority: Major
>
> We are processing multiple Kinesis streams using pipelines running on 
> {{DataflowRunner}}. The time needed to start such pipeline from a pipeline 
> definition (execution of {{org.apache.beam.sdk.Pipeline.run()}} method) takes 
> considerable amount of time. In our case:
>  * a pipeline that consumes data from 196 streams (237 shards in total) 
> starts in 7 minutes
>  * a pipeline that consumes data from 111 streams (261 shards in total) 
> starts in 4 minutes
> I've been investigating this and found out that when {{Pipeline.run}} is 
> invoked, the whole pipeline graph is traversed and serialized so it can be 
> passed to the Dataflow backend. Here's part of the stacktrace that shows this 
> traversal:
> {code:java}
> at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1252)
> at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getRecords$2(SimplifiedKinesisClient.java:137)
> at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
> at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:134)
> at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:119)
> at 
> org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:195)
> at 
> org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
> at 
> org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
> at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:88)
> at 
> org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:87)
> at 
> org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
> at 
> org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1630)
> at 
> org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1627)
> at 
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:494)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
> at 
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:433)
> at 
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:192)
> at 
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795)
> at 
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
> {code}
> As you can see, during serialization, 
> {{org.apache.beam.sdk.io.kinesis.KinesisSource.split}} method is called. This 
> method finds all shards for the stream and also validates each shard by 
> reading from it. As this process is sequential it takes considerable time 
> that is dependent both on the number of streams (which has the greatest 
> impact) and also the number of shards. Even with a single stream that has 
> large number of shards, the pipeline startup time will be noticeable.
> I wonder if it's possible to optimise this somehow?
>  One way could be to parallelise the whole process, both on the stream and 
> shard level. As this is split between Beam core and KinesisIO this can be 
> complex.
>  Another solution, that I could think of, is having the information about 
> valid stream shards ready before calling {{Pipeline.run}}. It there were a 
> way to create a {{KinesisIO.Read}} operation in such a way that it cached 
> shard information and enabled a client code to control the parallelisation of 
> this operation this would allow for a great reduction of the startup time.
> I was able to make a PoC to verify how much parallelisation of this process 
> can improve startup time and just by implementing this on the stream level I 
> was able to reduce the startup time from 7 minutes to 2.5 minutes. 
> Unfortunately this was a really hacky solution and I don't consider it to be 
> a one that should be implemented - I hacked the AWS client used by KinesisIO 
> to cache all responses from server and called {{split}} method in parallel on 
> all sources before executing {{Pipeline.run}}. However this proves that 
> there's a huge room for improvement for pipelines that deal with multiple 
> streams and/or shards.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to