sjvanrossum commented on code in PR #31608:
URL: https://github.com/apache/beam/pull/31608#discussion_r2063667677


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1710,51 +1735,44 @@ public void startBundle(StartBundleContext c) throws 
IOException {
       public void processElement(@Element PubsubMessage message, @Timestamp 
Instant timestamp)
           throws IOException, SizeLimitExceededException {
         // Validate again here just as a sanity check.
+        // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800

Review Comment:
   ```suggestion
           // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
           // - Size validation makes no distinction between JSON and Protobuf 
encoding
           // - Accounting for HTTP to gRPC transcoding is non-trivial
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java:
##########
@@ -179,6 +201,16 @@ public void process(
               null);
       reportedLineage = topic;
     }
+    if (!usesOrderingKey && !Strings.isNullOrEmpty(message.getOrderingKey())) {
+      if (!logOrderingKeyUnconfigured) {
+        LOG.warn(
+            "Encountered Pubsub message with ordering key but this sink was 
not configured to "
+                + "retain ordering keys, so they will be dropped. Please set 
#withOrderingKeys().");
+
+        logOrderingKeyUnconfigured = true;

Review Comment:
   Looks like the field is marked as an instance field so this would only be 
modified/observed locally on a DoFn copy or appropriately synchronized before 
being retrieved from a stage executor cache.
   The gotcha there is that it won't trigger just once, but once for every copy 
of this DoFn that's created to satisfy the current stage parallelism.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1710,51 +1735,44 @@ public void startBundle(StartBundleContext c) throws 
IOException {
       public void processElement(@Element PubsubMessage message, @Timestamp 
Instant timestamp)
           throws IOException, SizeLimitExceededException {
         // Validate again here just as a sanity check.
+        // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
         PreparePubsubWriteDoFn.validatePubsubMessageSize(message, 
maxPublishBatchByteSize);
-        byte[] payload = message.getPayload();
-        int messageSize = payload.length;
+        // NOTE: The record id is always null.
+        final OutgoingMessage msg =
+            OutgoingMessage.of(message, timestamp.getMillis(), null, 
message.getTopic());
+        // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800

Review Comment:
   ```suggestion
           // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
           // - Size validation makes no distinction between JSON and Protobuf 
encoding
           // - Accounting for HTTP to gRPC transcoding is non-trivial
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1710,51 +1735,44 @@ public void startBundle(StartBundleContext c) throws 
IOException {
       public void processElement(@Element PubsubMessage message, @Timestamp 
Instant timestamp)
           throws IOException, SizeLimitExceededException {
         // Validate again here just as a sanity check.
+        // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
         PreparePubsubWriteDoFn.validatePubsubMessageSize(message, 
maxPublishBatchByteSize);
-        byte[] payload = message.getPayload();
-        int messageSize = payload.length;
+        // NOTE: The record id is always null.
+        final OutgoingMessage msg =
+            OutgoingMessage.of(message, timestamp.getMillis(), null, 
message.getTopic());
+        // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
+        final int messageSize = msg.getMessage().getData().size();
 
-        PubsubTopic pubsubTopic;
+        final PubsubTopic pubsubTopic;
         if (getTopicProvider() != null) {
           pubsubTopic = getTopicProvider().get();
         } else {
-          pubsubTopic =
-              
PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(message.getTopic()));
-        }
-        // Checking before adding the message stops us from violating max 
batch size or bytes
-        OutgoingData currentTopicOutput =
-            output.computeIfAbsent(pubsubTopic, t -> new OutgoingData());
-        if (currentTopicOutput.messages.size() >= maxPublishBatchSize
-            || (!currentTopicOutput.messages.isEmpty()
-                && (currentTopicOutput.bytes + messageSize) >= 
maxPublishBatchByteSize)) {
-          publish(pubsubTopic, currentTopicOutput.messages);
-          currentTopicOutput.messages.clear();
-          currentTopicOutput.bytes = 0;
+          pubsubTopic = 
PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(msg.topic()));
         }
 
-        Map<String, String> attributes = message.getAttributeMap();
-        String orderingKey = message.getOrderingKey();
-
-        com.google.pubsub.v1.PubsubMessage.Builder msgBuilder =
-            com.google.pubsub.v1.PubsubMessage.newBuilder()
-                .setData(ByteString.copyFrom(payload))
-                .putAllAttributes(attributes);
-
-        if (orderingKey != null) {
-          msgBuilder.setOrderingKey(orderingKey);
+        // Checking before adding the message stops us from violating max 
batch size or bytes
+        String orderingKey = getPublishWithOrderingKey() ? 
msg.getMessage().getOrderingKey() : "";
+        final OutgoingData currentTopicAndOrderingKeyOutput =
+            output.computeIfAbsent(KV.of(pubsubTopic, orderingKey), t -> new 
OutgoingData());
+        // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
+        if (currentTopicAndOrderingKeyOutput.messages.size() >= 
maxPublishBatchSize
+            || (!currentTopicAndOrderingKeyOutput.messages.isEmpty()
+                && (currentTopicAndOrderingKeyOutput.bytes + messageSize)
+                    >= maxPublishBatchByteSize)) {
+          publish(pubsubTopic, currentTopicAndOrderingKeyOutput.messages);
+          currentTopicAndOrderingKeyOutput.messages.clear();
+          currentTopicAndOrderingKeyOutput.bytes = 0;
         }
 
-        // NOTE: The record id is always null.
-        currentTopicOutput.messages.add(
-            OutgoingMessage.of(
-                msgBuilder.build(), timestamp.getMillis(), null, 
message.getTopic()));
-        currentTopicOutput.bytes += messageSize;
+        currentTopicAndOrderingKeyOutput.messages.add(msg);
+        // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to