[ 
https://issues.apache.org/jira/browse/BEAM-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-4829:
--------------------------------
    Labels: stale-P2  (was: )

> Reduce Pub/Sub publishing latency
> ---------------------------------
>
>                 Key: BEAM-4829
>                 URL: https://issues.apache.org/jira/browse/BEAM-4829
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>    Affects Versions: 2.5.0
>            Reporter: Jan Hicken
>            Priority: P2
>              Labels: stale-P2
>
> The current implementation of the {{PubsubUnboundedSink}} uses a global 
> window with a trigger on a fixed batch size of 1000 elements or a processing 
> timespan of 2 seconds. After that, a random sharding of 100 is applied via a 
> {{GroupByKey}} transform. The result is then pushed into a {{DoFn}} which 
> performs the actual publishing step. 
> In case of low-latency (10s or 100s of milliseconds), this logic is quite 
> bad, because it leads to a latency of  around 1.2 seconds, introduced by the 
> transform steps described above.
> There are several possibilities to improve the Pub/Sub sink, for example:
> Let the upper parameters be configured via {{PipelineOptions:}}
>  * {{pubsubBatchSize}}: Approx. maximum number of elements in a Pub/Sub 
> publishing batch
>  * {{pubsubDelayThreshold}}: Max. processing time duration before firing the 
> sharding window
>  * {{pubsubShardCount}}: The number of shards to create before publishing
> This would allow tweaking of the Pub/Sub sink for different scenarious of 
> throughput and message size in the pipeline.
> However, if the throughput is small (< 100 element/s), this mechanism is 
> still quite slow. If we take a look at the Java client at 
> {{com.google.cloud:google-cloud-pubsub}}, the {{Publisher}} class supports a 
> wide range of options to optimize its batching behaviour. This would allow 
> not to rely on a window with group by key functionality and let the publisher 
> itself handle the batching.
> Consider the following {{DoFn}} for publishing messages to Pub/Sub using that 
> client:
> {code:java}
> class PublishFn extends DoFn<PubsubMessage, Void> {
>     private transient Publisher publisher;
>     private final ValueProvider<String> topicPath;
>     public PublishFn(final ValueProvider<String> topicPath) {
>         this.topicPath = topicPath;
>     }
>     @Setup
>     public void setup() throws IOException {
>         publisher = Publisher.defaultBuilder(TopicName.parse(topicPath.get()))
>                 .setBatchingSettings(BatchingSettings.newBuilder()
>                         .setRequestByteThreshold(40000L)
>                         .setElementCountThreshold(1000L)
>                         .setDelayThreshold(Duration.ofMillis(50))
>                         .build())
>                 .build();
>     }
>     @ProcessElement
>     public void processElement(final ProcessContext context) {
>         publisher.publish(context.element());
>     }
>     @Teardown
>     public void teardown() throws Exception {
>         publisher.shutdown();
>     }
>     @Override
>     public void populateDisplayData(final DisplayData.Builder builder) {
>         builder.add(DisplayData.item("topic", topicPath));
>     }
> }
> {code}
> In small test, this resulted in a publish latency of around 50 – 70 ms 
> instead of 1000 – 1200 with the original {{PubsubUnboundedSink}}.
> I can understand, that the windowing mechanism could lead to better 
> performance and throughput in a scenario with a high number of elements per 
> second. However, it would be nice to enable a "low-latency-mode" using the 
> provided code as an example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to