[
https://issues.apache.org/jira/browse/BEAM-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096197#comment-17096197
]
Sebastian Graca commented on BEAM-9759:
---------------------------------------
[~reuvenlax] Thank you for your comment. I wasn't aware of Spliitable DoFn.
I had a quick glimpse over available documentation and it looks like a major
undertaking involving reimplementing current KinesisIO. While it's tempting to
support dynamic changes to Kinesis setup without pipeline restart, I'm afraid
that from our perspective it's too risky and also involves too much work.
Our current setup has proven to be very stable, we have built some tooling and
monitoring around it which would need to be significantly re-architected. We
would need also to thoroughly test the new implementation while the current
KinesisIO has been already battle-tested and most of the issues have been found
and resolved.
While one reason of our pipeline updates are new Kinesis streams being added or
removed, we also need to update a pipeline when we change code or
configuration. This won't be solved by switching to Splittable DoFn-s, am I
correct?
> Pipeline creation with large number of shards/streams takes long time
> ---------------------------------------------------------------------
>
> Key: BEAM-9759
> URL: https://issues.apache.org/jira/browse/BEAM-9759
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kinesis, runner-dataflow
> Affects Versions: 2.19.0
> Reporter: Sebastian Graca
> Priority: Major
>
> We are processing multiple Kinesis streams using pipelines running on
> {{DataflowRunner}}. The time needed to start such pipeline from a pipeline
> definition (execution of {{org.apache.beam.sdk.Pipeline.run()}} method) takes
> considerable amount of time. In our case:
> * a pipeline that consumes data from 196 streams (237 shards in total)
> starts in 7 minutes
> * a pipeline that consumes data from 111 streams (261 shards in total)
> starts in 4 minutes
> I've been investigating this and found out that when {{Pipeline.run}} is
> invoked, the whole pipeline graph is traversed and serialized so it can be
> passed to the Dataflow backend. Here's part of the stacktrace that shows this
> traversal:
> {code:java}
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1252)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getRecords$2(SimplifiedKinesisClient.java:137)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:134)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:119)
> at
> org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:195)
> at
> org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
> at
> org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
> at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:88)
> at
> org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:87)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
> at
> org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1630)
> at
> org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1627)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:494)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:433)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:192)
> at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795)
> at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
> {code}
> As you can see, during serialization,
> {{org.apache.beam.sdk.io.kinesis.KinesisSource.split}} method is called. This
> method finds all shards for the stream and also validates each shard by
> reading from it. As this process is sequential it takes considerable time
> that is dependent both on the number of streams (which has the greatest
> impact) and also the number of shards. Even with a single stream that has
> large number of shards, the pipeline startup time will be noticeable.
> I wonder if it's possible to optimise this somehow?
> One way could be to parallelise the whole process, both on the stream and
> shard level. As this is split between Beam core and KinesisIO this can be
> complex.
> Another solution, that I could think of, is having the information about
> valid stream shards ready before calling {{Pipeline.run}}. It there were a
> way to create a {{KinesisIO.Read}} operation in such a way that it cached
> shard information and enabled a client code to control the parallelisation of
> this operation this would allow for a great reduction of the startup time.
> I was able to make a PoC to verify how much parallelisation of this process
> can improve startup time and just by implementing this on the stream level I
> was able to reduce the startup time from 7 minutes to 2.5 minutes.
> Unfortunately this was a really hacky solution and I don't consider it to be
> a one that should be implemented - I hacked the AWS client used by KinesisIO
> to cache all responses from server and called {{split}} method in parallel on
> all sources before executing {{Pipeline.run}}. However this proves that
> there's a huge room for improvement for pipelines that deal with multiple
> streams and/or shards.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)