sjvanrossum commented on PR #31608: URL: https://github.com/apache/beam/pull/31608#issuecomment-2214469956
Highlighting this here as well, while trying to retrofit ordering keys onto the existing sinks I thought of rewriting the sink using `GroupIntoBatches` for ordering keys and `GroupIntoBatches.WithShardedKeys` for normal messages. This would unify the bounded and unbounded writers, allow runners to determine sharding for normal messages instead of 100 shards and allow ordering keys to be grouped exactly by topic and ordering key instead of explicitly bucketing with `murmur3_32`. While writing that sink I stumbled on some issues regarding message size validation as documented in #31800. I've got a few fixes in progress which will: 1. Fix the validation issue in `PreparePubsubWriteDoFn` for both REST and gRPC clients. 2. Use a more ergonomic batching mechanism than I had initially proposed in ahmedabu98/beam#427. My thoughts on fixing the validation issue is to introduce a `PubsubMessage.SizeValidator` interface, a visitor over all fields of the message which returns size parts and is summed by `PubsubMessage#validateSize()`, allowing messages and attributes to be overridden to account for things like the size implicit attribute entries `timestampAttribute` (values are up to 20 B, millis since epoch `Long#toString()`) and `idAttribute` (values are 24 B in Dataflow's native sink, 36 B in `PubsubUnboundedSink`). Coincidentally, the revised batching mechanism I had imagined turns out to be very close to the implementation found in Google Cloud Pub/Sub Client for Java (https://github.com/googleapis/java-pubsub/blob/main/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java) and would live in `PubsubClient` instead of the existing implementations in `BoundedWriter` and `PubsubUnboundedSink`. Side note: The batching mechanism in the client library does not account for the encoding overhead of the topic and messages field of `PublishRequest` (1-3 bytes for topic, 1-5 bytes per message) which could cause batched publish requests to fail today and may still cause them to fail when Pub/Sub switches to explicit size validation if this isn't fixed, but I'll happily raise an issue or propose a fix for them separately. @ahmedabu98 the fixes to the batching mechanism should address the comments you had raised on ahmedabu98/beam#427 about my use of variable assignments in the condition of an if statement so I'll get those commits added to that PR. A separate PR makes more sense for the other bugfix. In a separate comment we discussed including a GIB/GIB-WSK sink as the default sink for writes with ordering keys, but the design and implementation of that sink may add unnecessary bloat to this PR and reviews since it could be toggled for all types of writes if exposed through `PubsubIO.Write#withAutoSharding()` for example. @robertwb, @Abacn any thoughts as reviewers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org