kennknowles opened a new issue, #19013:
URL: https://github.com/apache/beam/issues/19013
The current implementation of the `PubsubUnboundedSink` uses a global window
with a trigger on a fixed batch size of 1000 elements or a processing timespan
of 2 seconds. After that, a random sharding of 100 is applied via a
`GroupByKey` transform. The result is then pushed into a `DoFn` which performs
the actual publishing step.
In case of low-latency (10s or 100s of milliseconds), this logic is quite
bad, because it leads to a latency of around 1.2 seconds, introduced by the
transform steps described above.
There are several possibilities to improve the Pub/Sub sink, for example:
Let the upper parameters be configured via `PipelineOptions:`
* `pubsubBatchSize`: Approx. maximum number of elements in a Pub/Sub
publishing batch
* `pubsubDelayThreshold`: Max. processing time duration before firing the
sharding window
* `pubsubShardCount`: The number of shards to create before publishing
This would allow tweaking of the Pub/Sub sink for different scenarious of
throughput and message size in the pipeline.
However, if the throughput is small (< 100 element/s), this mechanism is
still quite slow. If we take a look at the Java client at
`com.google.cloud:google-cloud-pubsub`, the `Publisher` class supports a wide
range of options to optimize its batching behaviour. This would allow not to
rely on a window with group by key functionality and let the publisher itself
handle the batching.
Consider the following `DoFn` for publishing messages to Pub/Sub using that
client:
```
class PublishFn extends DoFn<PubsubMessage, Void> {
private transient Publisher publisher;
private final ValueProvider<String> topicPath;
public PublishFn(final ValueProvider<String>
topicPath) {
this.topicPath = topicPath;
}
@Setup
public void setup() throws
IOException {
publisher =
Publisher.defaultBuilder(TopicName.parse(topicPath.get()))
.setBatchingSettings(BatchingSettings.newBuilder()
.setRequestByteThreshold(40000L)
.setElementCountThreshold(1000L)
.setDelayThreshold(Duration.ofMillis(50))
.build())
.build();
}
@ProcessElement
public
void processElement(final ProcessContext context) {
publisher.publish(context.element());
}
@Teardown
public void teardown() throws Exception {
publisher.shutdown();
}
@Override
public void populateDisplayData(final DisplayData.Builder builder) {
builder.add(DisplayData.item("topic", topicPath));
}
}
```
In small test, this resulted in a publish latency of around 50 – 70 ms
instead of 1000 – 1200 with the original `PubsubUnboundedSink`.
I can understand, that the windowing mechanism could lead to better
performance and throughput in a scenario with a high number of elements per
second. However, it would be nice to enable a "low-latency-mode" using the
provided code as an example.
Imported from Jira
[BEAM-4829](https://issues.apache.org/jira/browse/BEAM-4829). Original Jira may
contain additional context.
Reported by: JanHicken.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]