sjvanrossum commented on code in PR #31608: URL: https://github.com/apache/beam/pull/31608#discussion_r2063667677
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java: ########## @@ -1710,51 +1735,44 @@ public void startBundle(StartBundleContext c) throws IOException { public void processElement(@Element PubsubMessage message, @Timestamp Instant timestamp) throws IOException, SizeLimitExceededException { // Validate again here just as a sanity check. + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 Review Comment: ```suggestion // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 // - Size validation makes no distinction between JSON and Protobuf encoding // - Accounting for HTTP to gRPC transcoding is non-trivial ``` ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java: ########## @@ -179,6 +201,16 @@ public void process( null); reportedLineage = topic; } + if (!usesOrderingKey && !Strings.isNullOrEmpty(message.getOrderingKey())) { + if (!logOrderingKeyUnconfigured) { + LOG.warn( + "Encountered Pubsub message with ordering key but this sink was not configured to " + + "retain ordering keys, so they will be dropped. Please set #withOrderingKeys()."); + + logOrderingKeyUnconfigured = true; Review Comment: Looks like the field is marked as an instance field so this would only be modified/observed locally on a DoFn copy or appropriately synchronized before being retrieved from a stage executor cache. The gotcha there is that it won't trigger just once, but once for every copy of this DoFn that's created to satisfy the current stage parallelism. ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java: ########## @@ -1710,51 +1735,44 @@ public void startBundle(StartBundleContext c) throws IOException { public void processElement(@Element PubsubMessage message, @Timestamp Instant timestamp) throws IOException, SizeLimitExceededException { // Validate again here just as a sanity check. + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 PreparePubsubWriteDoFn.validatePubsubMessageSize(message, maxPublishBatchByteSize); - byte[] payload = message.getPayload(); - int messageSize = payload.length; + // NOTE: The record id is always null. + final OutgoingMessage msg = + OutgoingMessage.of(message, timestamp.getMillis(), null, message.getTopic()); + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 Review Comment: ```suggestion // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 // - Size validation makes no distinction between JSON and Protobuf encoding // - Accounting for HTTP to gRPC transcoding is non-trivial ``` ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java: ########## @@ -1710,51 +1735,44 @@ public void startBundle(StartBundleContext c) throws IOException { public void processElement(@Element PubsubMessage message, @Timestamp Instant timestamp) throws IOException, SizeLimitExceededException { // Validate again here just as a sanity check. + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 PreparePubsubWriteDoFn.validatePubsubMessageSize(message, maxPublishBatchByteSize); - byte[] payload = message.getPayload(); - int messageSize = payload.length; + // NOTE: The record id is always null. + final OutgoingMessage msg = + OutgoingMessage.of(message, timestamp.getMillis(), null, message.getTopic()); + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 + final int messageSize = msg.getMessage().getData().size(); - PubsubTopic pubsubTopic; + final PubsubTopic pubsubTopic; if (getTopicProvider() != null) { pubsubTopic = getTopicProvider().get(); } else { - pubsubTopic = - PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(message.getTopic())); - } - // Checking before adding the message stops us from violating max batch size or bytes - OutgoingData currentTopicOutput = - output.computeIfAbsent(pubsubTopic, t -> new OutgoingData()); - if (currentTopicOutput.messages.size() >= maxPublishBatchSize - || (!currentTopicOutput.messages.isEmpty() - && (currentTopicOutput.bytes + messageSize) >= maxPublishBatchByteSize)) { - publish(pubsubTopic, currentTopicOutput.messages); - currentTopicOutput.messages.clear(); - currentTopicOutput.bytes = 0; + pubsubTopic = PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(msg.topic())); } - Map<String, String> attributes = message.getAttributeMap(); - String orderingKey = message.getOrderingKey(); - - com.google.pubsub.v1.PubsubMessage.Builder msgBuilder = - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(payload)) - .putAllAttributes(attributes); - - if (orderingKey != null) { - msgBuilder.setOrderingKey(orderingKey); + // Checking before adding the message stops us from violating max batch size or bytes + String orderingKey = getPublishWithOrderingKey() ? msg.getMessage().getOrderingKey() : ""; + final OutgoingData currentTopicAndOrderingKeyOutput = + output.computeIfAbsent(KV.of(pubsubTopic, orderingKey), t -> new OutgoingData()); + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 + if (currentTopicAndOrderingKeyOutput.messages.size() >= maxPublishBatchSize + || (!currentTopicAndOrderingKeyOutput.messages.isEmpty() + && (currentTopicAndOrderingKeyOutput.bytes + messageSize) + >= maxPublishBatchByteSize)) { + publish(pubsubTopic, currentTopicAndOrderingKeyOutput.messages); + currentTopicAndOrderingKeyOutput.messages.clear(); + currentTopicAndOrderingKeyOutput.bytes = 0; } - // NOTE: The record id is always null. - currentTopicOutput.messages.add( - OutgoingMessage.of( - msgBuilder.build(), timestamp.getMillis(), null, message.getTopic())); - currentTopicOutput.bytes += messageSize; + currentTopicAndOrderingKeyOutput.messages.add(msg); + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org