sjvanrossum commented on PR #31608:
URL: https://github.com/apache/beam/pull/31608#issuecomment-2214469956

   Highlighting this here as well, while trying to retrofit ordering keys onto 
the existing sinks I thought of rewriting the sink using `GroupIntoBatches` for 
ordering keys and `GroupIntoBatches.WithShardedKeys` for normal messages. This 
would unify the bounded and unbounded writers, allow runners to determine 
sharding for normal messages instead of 100 shards and allow ordering keys to 
be grouped exactly by topic and ordering key instead of explicitly bucketing 
with `murmur3_32`.
   
   While writing that sink I stumbled on some issues regarding message size 
validation as documented in #31800.
   I've got a few fixes in progress which will:
   1. Fix the validation issue in `PreparePubsubWriteDoFn` for both REST and 
gRPC clients.
   2. Use a more ergonomic batching mechanism than I had initially proposed in 
ahmedabu98/beam#427.
   
   My thoughts on fixing the validation issue is to introduce a 
`PubsubMessage.SizeValidator` interface, a visitor over all fields of the 
message which returns size parts and is summed by 
`PubsubMessage#validateSize()`, allowing messages and attributes to be 
overridden to account for things like the size implicit attribute entries 
`timestampAttribute` (values are up to 20 B, millis since epoch 
`Long#toString()`) and `idAttribute` (values are 24 B in Dataflow's native 
sink, 36 B in `PubsubUnboundedSink`).
   
   Coincidentally, the revised batching mechanism I had imagined turns out to 
be very close to the implementation found in Google Cloud Pub/Sub Client for 
Java 
(https://github.com/googleapis/java-pubsub/blob/main/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java)
 and would live in `PubsubClient` instead of the existing implementations in 
`BoundedWriter` and `PubsubUnboundedSink`. Side note: The batching mechanism in 
the client library does not account for the encoding overhead of the topic and 
messages field of `PublishRequest` (1-3 bytes for topic, 1-5 bytes per message) 
which could cause batched publish requests to fail today and may still cause 
them to fail when Pub/Sub switches to explicit size validation if this isn't 
fixed, but I'll happily raise an issue or propose a fix for them separately.
   
   @ahmedabu98 the fixes to the batching mechanism should address the comments 
you had raised on ahmedabu98/beam#427 about my use of variable assignments in 
the condition of an if statement so I'll get those commits added to that PR.
   A separate PR makes more sense for the other bugfix.
   In a separate comment we discussed including a GIB/GIB-WSK sink as the 
default sink for writes with ordering keys, but the design and implementation 
of that sink may add unnecessary bloat to this PR and reviews since it could be 
toggled for all types of writes if exposed through 
`PubsubIO.Write#withAutoSharding()` for example.
   
   @robertwb, @Abacn any thoughts as reviewers?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to