kennknowles opened a new issue, #19013:
URL: https://github.com/apache/beam/issues/19013

   The current implementation of the `PubsubUnboundedSink` uses a global window 
with a trigger on a fixed batch size of 1000 elements or a processing timespan 
of 2 seconds. After that, a random sharding of 100 is applied via a 
`GroupByKey` transform. The result is then pushed into a `DoFn` which performs 
the actual publishing step. 
   
   In case of low-latency (10s or 100s of milliseconds), this logic is quite 
bad, because it leads to a latency of  around 1.2 seconds, introduced by the 
transform steps described above.
   
   There are several possibilities to improve the Pub/Sub sink, for example:
   
   Let the upper parameters be configured via `PipelineOptions:`
    * `pubsubBatchSize`: Approx. maximum number of elements in a Pub/Sub 
publishing batch
    * `pubsubDelayThreshold`: Max. processing time duration before firing the 
sharding window
    * `pubsubShardCount`: The number of shards to create before publishing
   
   This would allow tweaking of the Pub/Sub sink for different scenarious of 
throughput and message size in the pipeline.
   
   However, if the throughput is small (< 100 element/s), this mechanism is 
still quite slow. If we take a look at the Java client at 
`com.google.cloud:google-cloud-pubsub`, the `Publisher` class supports a wide 
range of options to optimize its batching behaviour. This would allow not to 
rely on a window with group by key functionality and let the publisher itself 
handle the batching.
   
   Consider the following `DoFn` for publishing messages to Pub/Sub using that 
client:
   ```
   
   class PublishFn extends DoFn<PubsubMessage, Void> {
       private transient Publisher publisher;
   
   
      private final ValueProvider<String> topicPath;
   
       public PublishFn(final ValueProvider<String>
   topicPath) {
           this.topicPath = topicPath;
       }
   
       @Setup
       public void setup() throws
   IOException {
           publisher = 
Publisher.defaultBuilder(TopicName.parse(topicPath.get()))
       
              .setBatchingSettings(BatchingSettings.newBuilder()
                           .setRequestByteThreshold(40000L)
   
                          .setElementCountThreshold(1000L)
                           .setDelayThreshold(Duration.ofMillis(50))
   
                          .build())
                   .build();
       }
   
       @ProcessElement
       public
   void processElement(final ProcessContext context) {
           publisher.publish(context.element());
   
      }
   
       @Teardown
       public void teardown() throws Exception {
           publisher.shutdown();
   
      }
   
       @Override
       public void populateDisplayData(final DisplayData.Builder builder) {
     
        builder.add(DisplayData.item("topic", topicPath));
       }
   }
   
   ```
   
   In small test, this resulted in a publish latency of around 50 – 70 ms 
instead of 1000 – 1200 with the original `PubsubUnboundedSink`.
   
   I can understand, that the windowing mechanism could lead to better 
performance and throughput in a scenario with a high number of elements per 
second. However, it would be nice to enable a "low-latency-mode" using the 
provided code as an example.
   
   Imported from Jira 
[BEAM-4829](https://issues.apache.org/jira/browse/BEAM-4829). Original Jira may 
contain additional context.
   Reported by: JanHicken.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to