[ https://issues.apache.org/jira/browse/BEAM-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot updated BEAM-4829: -------------------------------- Labels: stale-P2 (was: ) > Reduce Pub/Sub publishing latency > --------------------------------- > > Key: BEAM-4829 > URL: https://issues.apache.org/jira/browse/BEAM-4829 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp > Affects Versions: 2.5.0 > Reporter: Jan Hicken > Priority: P2 > Labels: stale-P2 > > The current implementation of the {{PubsubUnboundedSink}} uses a global > window with a trigger on a fixed batch size of 1000 elements or a processing > timespan of 2 seconds. After that, a random sharding of 100 is applied via a > {{GroupByKey}} transform. The result is then pushed into a {{DoFn}} which > performs the actual publishing step. > In case of low-latency (10s or 100s of milliseconds), this logic is quite > bad, because it leads to a latency of around 1.2 seconds, introduced by the > transform steps described above. > There are several possibilities to improve the Pub/Sub sink, for example: > Let the upper parameters be configured via {{PipelineOptions:}} > * {{pubsubBatchSize}}: Approx. maximum number of elements in a Pub/Sub > publishing batch > * {{pubsubDelayThreshold}}: Max. processing time duration before firing the > sharding window > * {{pubsubShardCount}}: The number of shards to create before publishing > This would allow tweaking of the Pub/Sub sink for different scenarious of > throughput and message size in the pipeline. > However, if the throughput is small (< 100 element/s), this mechanism is > still quite slow. If we take a look at the Java client at > {{com.google.cloud:google-cloud-pubsub}}, the {{Publisher}} class supports a > wide range of options to optimize its batching behaviour. This would allow > not to rely on a window with group by key functionality and let the publisher > itself handle the batching. > Consider the following {{DoFn}} for publishing messages to Pub/Sub using that > client: > {code:java} > class PublishFn extends DoFn<PubsubMessage, Void> { > private transient Publisher publisher; > private final ValueProvider<String> topicPath; > public PublishFn(final ValueProvider<String> topicPath) { > this.topicPath = topicPath; > } > @Setup > public void setup() throws IOException { > publisher = Publisher.defaultBuilder(TopicName.parse(topicPath.get())) > .setBatchingSettings(BatchingSettings.newBuilder() > .setRequestByteThreshold(40000L) > .setElementCountThreshold(1000L) > .setDelayThreshold(Duration.ofMillis(50)) > .build()) > .build(); > } > @ProcessElement > public void processElement(final ProcessContext context) { > publisher.publish(context.element()); > } > @Teardown > public void teardown() throws Exception { > publisher.shutdown(); > } > @Override > public void populateDisplayData(final DisplayData.Builder builder) { > builder.add(DisplayData.item("topic", topicPath)); > } > } > {code} > In small test, this resulted in a publish latency of around 50 – 70 ms > instead of 1000 – 1200 with the original {{PubsubUnboundedSink}}. > I can understand, that the windowing mechanism could lead to better > performance and throughput in a scenario with a high number of elements per > second. However, it would be nice to enable a "low-latency-mode" using the > provided code as an example. -- This message was sent by Atlassian Jira (v8.3.4#803005)