scwhittle commented on code in PR #31608:
URL: https://github.com/apache/beam/pull/31608#discussion_r2068137571


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java:
##########
@@ -315,6 +340,138 @@ public void sendMoreThanOneBatchByByteSize() throws 
IOException {
     // message does not match the expected publish message.
   }
 
+  @Test
+  public void sendOneMessageWithWrongCoderForOrderingKey() throws IOException {
+    List<OutgoingMessage> outgoing =
+        ImmutableList.of(
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8(DATA))
+                    .putAllAttributes(ATTRIBUTES)
+                    .setOrderingKey(ORDERING_KEY_FN.apply(DATA))
+                    .build(),
+                TIMESTAMP,
+                getRecordId(DATA),
+                null));
+    assertThrows(
+        PipelineExecutionException.class,
+        () -> {
+          try (PubsubTestClientFactory factory =
+              PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, 
ImmutableList.of())) {
+            PubsubUnboundedSink sink =
+                new PubsubUnboundedSink(
+                    factory,
+                    StaticValueProvider.of(TOPIC),
+                    TIMESTAMP_ATTRIBUTE,
+                    ID_ATTRIBUTE,
+                    NUM_SHARDS,
+                    true,
+                    1 /* batchSize */,
+                    1 /* batchBytes */,
+                    Duration.standardSeconds(2),
+                    RecordIdMethod.DETERMINISTIC,
+                    null);
+            p.apply(Create.of(DATA))
+                .apply(ParDo.of(new Stamp(ATTRIBUTES, ORDERING_KEY_FN)))
+                .apply(sink);
+            p.run();
+          }
+        });
+    // The PubsubTestClientFactory will assert fail on close if the actual 
published
+    // message does not match the expected publish message.
+  }
+
+  @Test
+  public void sendOneMessagePerOrderingKey() throws IOException {
+    List<OutgoingMessage> outgoing =
+        ImmutableList.of(
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8(DATA))
+                    .putAllAttributes(ATTRIBUTES)
+                    .setOrderingKey(ORDERING_KEY_FN.apply(DATA))
+                    .build(),
+                TIMESTAMP,
+                getRecordId(DATA),
+                null),
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8(DATA + DATA))
+                    .putAllAttributes(ATTRIBUTES)
+                    .setOrderingKey(ORDERING_KEY_FN.apply(DATA + DATA))
+                    .build(),
+                TIMESTAMP,
+                getRecordId(DATA + DATA),
+                null));
+    try (PubsubTestClientFactory factory =
+        PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, 
ImmutableList.of())) {
+      PubsubUnboundedSink sink =
+          new PubsubUnboundedSink(
+              factory,
+              StaticValueProvider.of(TOPIC),
+              TIMESTAMP_ATTRIBUTE,
+              ID_ATTRIBUTE,
+              NUM_SHARDS,
+              true,
+              1 /* batchSize */,
+              1 /* batchBytes */,
+              Duration.standardSeconds(2),
+              RecordIdMethod.DETERMINISTIC,
+              null);
+      p.apply(Create.of(DATA, DATA + DATA))
+          .apply(ParDo.of(new Stamp(ATTRIBUTES, ORDERING_KEY_FN)))
+          .setCoder(PubsubMessageSchemaCoder.getSchemaCoder())
+          .apply(sink);
+      p.run();
+    }
+    // The PubsubTestClientFactory will assert fail on close if the actual 
published
+    // message does not match the expected publish message.
+  }
+
+  @Test
+  public void sendMoreThanOneBatchByOrderingKey() throws IOException {
+    List<OutgoingMessage> outgoing = new ArrayList<>();
+    List<String> data = new ArrayList<>();
+    int batchSize = 2;
+    int batchBytes = 1000;
+    for (int i = 0; i < batchSize * 10; i++) {
+      String str = String.valueOf(i);
+      outgoing.add(
+          OutgoingMessage.of(
+              com.google.pubsub.v1.PubsubMessage.newBuilder()
+                  .setData(ByteString.copyFromUtf8(str))
+                  .setOrderingKey(ORDERING_KEY_FN.apply(str))
+                  .build(),
+              TIMESTAMP,
+              getRecordId(str),
+              null));
+      data.add(str);
+    }
+    try (PubsubTestClientFactory factory =
+        PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, 
ImmutableList.of())) {
+      PubsubUnboundedSink sink =
+          new PubsubUnboundedSink(
+              factory,
+              StaticValueProvider.of(TOPIC),
+              TIMESTAMP_ATTRIBUTE,
+              ID_ATTRIBUTE,
+              NUM_SHARDS,
+              true,
+              batchSize,
+              batchBytes,
+              Duration.standardSeconds(2),
+              RecordIdMethod.DETERMINISTIC,
+              null);
+      p.apply(Create.of(data))

Review Comment:
   do you know if Create randomizes the data ordering? If not it might improve 
the test to do so first as otherwise I think the test could pass even if 
batching logic was broken since ordering key 1 is processed fully before 
ordering key 2 and would fill batches evenly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to