sjvanrossum commented on code in PR #31608:
URL: https://github.com/apache/beam/pull/31608#discussion_r2063667677
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1710,51 +1735,44 @@ public void startBundle(StartBundleContext c) throws
IOException {
public void processElement(@Element PubsubMessage message, @Timestamp
Instant timestamp)
throws IOException, SizeLimitExceededException {
// Validate again here just as a sanity check.
+ // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
Review Comment:
```suggestion
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
// - Size validation makes no distinction between JSON and Protobuf
encoding
// - Accounting for HTTP to gRPC transcoding is non-trivial
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java:
##########
@@ -179,6 +201,16 @@ public void process(
null);
reportedLineage = topic;
}
+ if (!usesOrderingKey && !Strings.isNullOrEmpty(message.getOrderingKey())) {
+ if (!logOrderingKeyUnconfigured) {
+ LOG.warn(
+ "Encountered Pubsub message with ordering key but this sink was
not configured to "
+ + "retain ordering keys, so they will be dropped. Please set
#withOrderingKeys().");
+
+ logOrderingKeyUnconfigured = true;
Review Comment:
Looks like the field is marked as an instance field so this would only be
modified/observed locally on a DoFn copy or appropriately synchronized before
being retrieved from a stage executor cache.
The gotcha there is that it won't trigger just once, but once for every copy
of this DoFn that's created to satisfy the current stage parallelism.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1710,51 +1735,44 @@ public void startBundle(StartBundleContext c) throws
IOException {
public void processElement(@Element PubsubMessage message, @Timestamp
Instant timestamp)
throws IOException, SizeLimitExceededException {
// Validate again here just as a sanity check.
+ // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
PreparePubsubWriteDoFn.validatePubsubMessageSize(message,
maxPublishBatchByteSize);
- byte[] payload = message.getPayload();
- int messageSize = payload.length;
+ // NOTE: The record id is always null.
+ final OutgoingMessage msg =
+ OutgoingMessage.of(message, timestamp.getMillis(), null,
message.getTopic());
+ // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
Review Comment:
```suggestion
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
// - Size validation makes no distinction between JSON and Protobuf
encoding
// - Accounting for HTTP to gRPC transcoding is non-trivial
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1710,51 +1735,44 @@ public void startBundle(StartBundleContext c) throws
IOException {
public void processElement(@Element PubsubMessage message, @Timestamp
Instant timestamp)
throws IOException, SizeLimitExceededException {
// Validate again here just as a sanity check.
+ // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
PreparePubsubWriteDoFn.validatePubsubMessageSize(message,
maxPublishBatchByteSize);
- byte[] payload = message.getPayload();
- int messageSize = payload.length;
+ // NOTE: The record id is always null.
+ final OutgoingMessage msg =
+ OutgoingMessage.of(message, timestamp.getMillis(), null,
message.getTopic());
+ // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
+ final int messageSize = msg.getMessage().getData().size();
- PubsubTopic pubsubTopic;
+ final PubsubTopic pubsubTopic;
if (getTopicProvider() != null) {
pubsubTopic = getTopicProvider().get();
} else {
- pubsubTopic =
-
PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(message.getTopic()));
- }
- // Checking before adding the message stops us from violating max
batch size or bytes
- OutgoingData currentTopicOutput =
- output.computeIfAbsent(pubsubTopic, t -> new OutgoingData());
- if (currentTopicOutput.messages.size() >= maxPublishBatchSize
- || (!currentTopicOutput.messages.isEmpty()
- && (currentTopicOutput.bytes + messageSize) >=
maxPublishBatchByteSize)) {
- publish(pubsubTopic, currentTopicOutput.messages);
- currentTopicOutput.messages.clear();
- currentTopicOutput.bytes = 0;
+ pubsubTopic =
PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(msg.topic()));
}
- Map<String, String> attributes = message.getAttributeMap();
- String orderingKey = message.getOrderingKey();
-
- com.google.pubsub.v1.PubsubMessage.Builder msgBuilder =
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(payload))
- .putAllAttributes(attributes);
-
- if (orderingKey != null) {
- msgBuilder.setOrderingKey(orderingKey);
+ // Checking before adding the message stops us from violating max
batch size or bytes
+ String orderingKey = getPublishWithOrderingKey() ?
msg.getMessage().getOrderingKey() : "";
+ final OutgoingData currentTopicAndOrderingKeyOutput =
+ output.computeIfAbsent(KV.of(pubsubTopic, orderingKey), t -> new
OutgoingData());
+ // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
+ if (currentTopicAndOrderingKeyOutput.messages.size() >=
maxPublishBatchSize
+ || (!currentTopicAndOrderingKeyOutput.messages.isEmpty()
+ && (currentTopicAndOrderingKeyOutput.bytes + messageSize)
+ >= maxPublishBatchByteSize)) {
+ publish(pubsubTopic, currentTopicAndOrderingKeyOutput.messages);
+ currentTopicAndOrderingKeyOutput.messages.clear();
+ currentTopicAndOrderingKeyOutput.bytes = 0;
}
- // NOTE: The record id is always null.
- currentTopicOutput.messages.add(
- OutgoingMessage.of(
- msgBuilder.build(), timestamp.getMillis(), null,
message.getTopic()));
- currentTopicOutput.bytes += messageSize;
+ currentTopicAndOrderingKeyOutput.messages.add(msg);
+ // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]