[
https://issues.apache.org/jira/browse/BEAM-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091295#comment-17091295
]
Sebastian Graca commented on BEAM-9759:
---------------------------------------
Like I mentioned in the description, I don't think that the way the PoC has
been implemented is a way to go for the real fix. Tinkering with AWS client is
not a robust solution. Also the PoC relied on the internal implementation of
{{KinesisIO.Read}} which was in this case exposed to the client of this API and
this is not something that should be done.
I suppose that another approach needs to be taken (can't say which at this
moment) so that the ability to parallelise is in some way exposed by
{{KinesisIO.Read}} API and clients can take advantage of this if they need it.
I'm willing to see how this can be implemented on KinesisIO side, but
unfortunately don't have enough capacity right now to focus on that, so don't
expect something working soon.
However the problem is also related to the fact that {{DataflowRunner}} invokes
{{split}} method and this on other hand is implementation detail of this
runner. I'm not sure if the issue can only be solved by adding some parallelism
to KinesisIO because this can break if {{DataflowRunner}}'s implementation
changes.
> Pipeline creation with large number of shards/streams takes long time
> ---------------------------------------------------------------------
>
> Key: BEAM-9759
> URL: https://issues.apache.org/jira/browse/BEAM-9759
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kinesis, runner-dataflow
> Affects Versions: 2.19.0
> Reporter: Sebastian Graca
> Priority: Major
>
> We are processing multiple Kinesis streams using pipelines running on
> {{DataflowRunner}}. The time needed to start such pipeline from a pipeline
> definition (execution of {{org.apache.beam.sdk.Pipeline.run()}} method) takes
> considerable amount of time. In our case:
> * a pipeline that consumes data from 196 streams (237 shards in total)
> starts in 7 minutes
> * a pipeline that consumes data from 111 streams (261 shards in total)
> starts in 4 minutes
> I've been investigating this and found out that when {{Pipeline.run}} is
> invoked, the whole pipeline graph is traversed and serialized so it can be
> passed to the Dataflow backend. Here's part of the stacktrace that shows this
> traversal:
> {code:java}
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1252)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getRecords$2(SimplifiedKinesisClient.java:137)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:134)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:119)
> at
> org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:195)
> at
> org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
> at
> org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
> at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:88)
> at
> org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:87)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
> at
> org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1630)
> at
> org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1627)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:494)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:433)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:192)
> at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795)
> at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
> {code}
> As you can see, during serialization,
> {{org.apache.beam.sdk.io.kinesis.KinesisSource.split}} method is called. This
> method finds all shards for the stream and also validates each shard by
> reading from it. As this process is sequential it takes considerable time
> that is dependent both on the number of streams (which has the greatest
> impact) and also the number of shards. Even with a single stream that has
> large number of shards, the pipeline startup time will be noticeable.
> I wonder if it's possible to optimise this somehow?
> One way could be to parallelise the whole process, both on the stream and
> shard level. As this is split between Beam core and KinesisIO this can be
> complex.
> Another solution, that I could think of, is having the information about
> valid stream shards ready before calling {{Pipeline.run}}. It there were a
> way to create a {{KinesisIO.Read}} operation in such a way that it cached
> shard information and enabled a client code to control the parallelisation of
> this operation this would allow for a great reduction of the startup time.
> I was able to make a PoC to verify how much parallelisation of this process
> can improve startup time and just by implementing this on the stream level I
> was able to reduce the startup time from 7 minutes to 2.5 minutes.
> Unfortunately this was a really hacky solution and I don't consider it to be
> a one that should be implemented - I hacked the AWS client used by KinesisIO
> to cache all responses from server and called {{split}} method in parallel on
> all sources before executing {{Pipeline.run}}. However this proves that
> there's a huge room for improvement for pipelines that deal with multiple
> streams and/or shards.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)