[
https://issues.apache.org/jira/browse/BEAM-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Beam JIRA Bot updated BEAM-4829:
--------------------------------
Labels: (was: stale-P2)
> Reduce Pub/Sub publishing latency
> ---------------------------------
>
> Key: BEAM-4829
> URL: https://issues.apache.org/jira/browse/BEAM-4829
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Affects Versions: 2.5.0
> Reporter: Jan Hicken
> Priority: P3
>
> The current implementation of the {{PubsubUnboundedSink}} uses a global
> window with a trigger on a fixed batch size of 1000 elements or a processing
> timespan of 2 seconds. After that, a random sharding of 100 is applied via a
> {{GroupByKey}} transform. The result is then pushed into a {{DoFn}} which
> performs the actual publishing step.
> In case of low-latency (10s or 100s of milliseconds), this logic is quite
> bad, because it leads to a latency of around 1.2 seconds, introduced by the
> transform steps described above.
> There are several possibilities to improve the Pub/Sub sink, for example:
> Let the upper parameters be configured via {{PipelineOptions:}}
> * {{pubsubBatchSize}}: Approx. maximum number of elements in a Pub/Sub
> publishing batch
> * {{pubsubDelayThreshold}}: Max. processing time duration before firing the
> sharding window
> * {{pubsubShardCount}}: The number of shards to create before publishing
> This would allow tweaking of the Pub/Sub sink for different scenarious of
> throughput and message size in the pipeline.
> However, if the throughput is small (< 100 element/s), this mechanism is
> still quite slow. If we take a look at the Java client at
> {{com.google.cloud:google-cloud-pubsub}}, the {{Publisher}} class supports a
> wide range of options to optimize its batching behaviour. This would allow
> not to rely on a window with group by key functionality and let the publisher
> itself handle the batching.
> Consider the following {{DoFn}} for publishing messages to Pub/Sub using that
> client:
> {code:java}
> class PublishFn extends DoFn<PubsubMessage, Void> {
> private transient Publisher publisher;
> private final ValueProvider<String> topicPath;
> public PublishFn(final ValueProvider<String> topicPath) {
> this.topicPath = topicPath;
> }
> @Setup
> public void setup() throws IOException {
> publisher = Publisher.defaultBuilder(TopicName.parse(topicPath.get()))
> .setBatchingSettings(BatchingSettings.newBuilder()
> .setRequestByteThreshold(40000L)
> .setElementCountThreshold(1000L)
> .setDelayThreshold(Duration.ofMillis(50))
> .build())
> .build();
> }
> @ProcessElement
> public void processElement(final ProcessContext context) {
> publisher.publish(context.element());
> }
> @Teardown
> public void teardown() throws Exception {
> publisher.shutdown();
> }
> @Override
> public void populateDisplayData(final DisplayData.Builder builder) {
> builder.add(DisplayData.item("topic", topicPath));
> }
> }
> {code}
> In small test, this resulted in a publish latency of around 50 – 70 ms
> instead of 1000 – 1200 with the original {{PubsubUnboundedSink}}.
> I can understand, that the windowing mechanism could lead to better
> performance and throughput in a scenario with a high number of elements per
> second. However, it would be nice to enable a "low-latency-mode" using the
> provided code as an example.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)