sjvanrossum commented on code in PR #31608:
URL: https://github.com/apache/beam/pull/31608#discussion_r2068348295


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java:
##########
@@ -315,6 +340,138 @@ public void sendMoreThanOneBatchByByteSize() throws 
IOException {
     // message does not match the expected publish message.
   }
 
+  @Test
+  public void sendOneMessageWithWrongCoderForOrderingKey() throws IOException {
+    List<OutgoingMessage> outgoing =
+        ImmutableList.of(
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8(DATA))
+                    .putAllAttributes(ATTRIBUTES)
+                    .setOrderingKey(ORDERING_KEY_FN.apply(DATA))
+                    .build(),
+                TIMESTAMP,
+                getRecordId(DATA),
+                null));
+    assertThrows(
+        PipelineExecutionException.class,
+        () -> {
+          try (PubsubTestClientFactory factory =
+              PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, 
ImmutableList.of())) {
+            PubsubUnboundedSink sink =
+                new PubsubUnboundedSink(
+                    factory,
+                    StaticValueProvider.of(TOPIC),
+                    TIMESTAMP_ATTRIBUTE,
+                    ID_ATTRIBUTE,
+                    NUM_SHARDS,
+                    true,
+                    1 /* batchSize */,
+                    1 /* batchBytes */,
+                    Duration.standardSeconds(2),
+                    RecordIdMethod.DETERMINISTIC,
+                    null);
+            p.apply(Create.of(DATA))
+                .apply(ParDo.of(new Stamp(ATTRIBUTES, ORDERING_KEY_FN)))
+                .apply(sink);
+            p.run();
+          }
+        });
+    // The PubsubTestClientFactory will assert fail on close if the actual 
published
+    // message does not match the expected publish message.
+  }
+
+  @Test
+  public void sendOneMessagePerOrderingKey() throws IOException {
+    List<OutgoingMessage> outgoing =
+        ImmutableList.of(
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8(DATA))
+                    .putAllAttributes(ATTRIBUTES)
+                    .setOrderingKey(ORDERING_KEY_FN.apply(DATA))
+                    .build(),
+                TIMESTAMP,
+                getRecordId(DATA),
+                null),
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8(DATA + DATA))
+                    .putAllAttributes(ATTRIBUTES)
+                    .setOrderingKey(ORDERING_KEY_FN.apply(DATA + DATA))
+                    .build(),
+                TIMESTAMP,
+                getRecordId(DATA + DATA),
+                null));
+    try (PubsubTestClientFactory factory =
+        PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, 
ImmutableList.of())) {
+      PubsubUnboundedSink sink =
+          new PubsubUnboundedSink(
+              factory,
+              StaticValueProvider.of(TOPIC),
+              TIMESTAMP_ATTRIBUTE,
+              ID_ATTRIBUTE,
+              NUM_SHARDS,
+              true,
+              1 /* batchSize */,
+              1 /* batchBytes */,
+              Duration.standardSeconds(2),
+              RecordIdMethod.DETERMINISTIC,
+              null);
+      p.apply(Create.of(DATA, DATA + DATA))
+          .apply(ParDo.of(new Stamp(ATTRIBUTES, ORDERING_KEY_FN)))
+          .setCoder(PubsubMessageSchemaCoder.getSchemaCoder())
+          .apply(sink);
+      p.run();
+    }
+    // The PubsubTestClientFactory will assert fail on close if the actual 
published
+    // message does not match the expected publish message.
+  }
+
+  @Test
+  public void sendMoreThanOneBatchByOrderingKey() throws IOException {
+    List<OutgoingMessage> outgoing = new ArrayList<>();
+    List<String> data = new ArrayList<>();
+    int batchSize = 2;
+    int batchBytes = 1000;
+    for (int i = 0; i < batchSize * 10; i++) {
+      String str = String.valueOf(i);
+      outgoing.add(
+          OutgoingMessage.of(
+              com.google.pubsub.v1.PubsubMessage.newBuilder()
+                  .setData(ByteString.copyFromUtf8(str))
+                  .setOrderingKey(ORDERING_KEY_FN.apply(str))
+                  .build(),
+              TIMESTAMP,
+              getRecordId(str),
+              null));
+      data.add(str);
+    }
+    try (PubsubTestClientFactory factory =
+        PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, 
ImmutableList.of())) {
+      PubsubUnboundedSink sink =
+          new PubsubUnboundedSink(
+              factory,
+              StaticValueProvider.of(TOPIC),
+              TIMESTAMP_ATTRIBUTE,
+              ID_ATTRIBUTE,
+              NUM_SHARDS,
+              true,
+              batchSize,
+              batchBytes,
+              Duration.standardSeconds(2),
+              RecordIdMethod.DETERMINISTIC,
+              null);
+      p.apply(Create.of(data))

Review Comment:
   I don't think it does, I've added a call to `Collections.shuffle` in d476406 
to randomly reorder the test input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to