Sebastian Graca created BEAM-9759:
-------------------------------------

             Summary: Pipeline creation with large number of shards/streams 
takes long time
                 Key: BEAM-9759
                 URL: https://issues.apache.org/jira/browse/BEAM-9759
             Project: Beam
          Issue Type: Bug
          Components: io-java-kinesis, runner-dataflow
    Affects Versions: 2.19.0
            Reporter: Sebastian Graca


We are processing multiple Kinesis streams using pipelines running on 
{{DataflowRunner}}. The time needed to start such pipeline from a pipeline 
definition (execution of {{org.apache.beam.sdk.Pipeline.run()}} method) takes 
considerable amount of time. In our case:
 * a pipeline that consumes data from 196 streams (237 shards in total) starts 
in 7 minutes
 * a pipeline that consumes data from 111 streams (261 shards in total) starts 
in 4 minutes

I've been investigating this and found out that when {{Pipeline.run}} is 
invoked, the whole pipeline graph is traversed and serialized so it can be 
passed to the Dataflow backend. Here's part of the stacktrace that shows this 
traversal:
{code:java}
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1252)
at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getRecords$2(SimplifiedKinesisClient.java:137)
at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:134)
at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:119)
at 
org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:195)
at 
org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
at 
org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:88)
at 
org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:87)
at 
org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
at 
org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1630)
at 
org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1627)
at 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:494)
at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
at 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:433)
at 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:192)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
{code}
As you can see, during serialization, 
{{org.apache.beam.sdk.io.kinesis.KinesisSource.split}} method is called. This 
method finds all shards for the stream and also validates each shard by reading 
from it. As this process is sequential it takes considerable time that is 
dependent both on the number of streams (which has the greatest impact) and 
also the number of shards. Even with a single stream that has large number of 
shards, the pipeline startup time will be noticeable.

I wonder if it's possible to optimise this somehow?
 One way could be to parallelise the whole process, both on the stream and 
shard level. As this is split between Beam core and KinesisIO this can be 
complex.
 Another solution, that I could think of, is having the information about valid 
stream shards ready before calling {{Pipeline.run}}. It there were a way to 
create a {{KinesisIO.Read}} operation in such a way that it cached shard 
information and enabled a client code to control the parallelisation of this 
operation this would allow for a great reduction of the startup time.

I was able to make a PoC to verify how much parallelisation of this process can 
improve startup time and just by implementing this on the stream level I was 
able to reduce the startup time from 7 minutes to 2.5 minutes. Unfortunately 
this was a really hacky solution and I don't consider it to be a one that 
should be implemented - I hacked the AWS client used by KinesisIO to cache all 
responses from server and called {{split}} method in parallel on all sources 
before executing {{Pipeline.run}}. However this proves that there's a huge room 
for improvement for pipelines that deal with multiple streams and/or shards.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to