sjvanrossum commented on code in PR #31608: URL: https://github.com/apache/beam/pull/31608#discussion_r2068348295
########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java: ########## @@ -315,6 +340,138 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { // message does not match the expected publish message. } + @Test + public void sendOneMessageWithWrongCoderForOrderingKey() throws IOException { + List<OutgoingMessage> outgoing = + ImmutableList.of( + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(DATA)) + .putAllAttributes(ATTRIBUTES) + .setOrderingKey(ORDERING_KEY_FN.apply(DATA)) + .build(), + TIMESTAMP, + getRecordId(DATA), + null)); + assertThrows( + PipelineExecutionException.class, + () -> { + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.of())) { + PubsubUnboundedSink sink = + new PubsubUnboundedSink( + factory, + StaticValueProvider.of(TOPIC), + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + NUM_SHARDS, + true, + 1 /* batchSize */, + 1 /* batchBytes */, + Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC, + null); + p.apply(Create.of(DATA)) + .apply(ParDo.of(new Stamp(ATTRIBUTES, ORDERING_KEY_FN))) + .apply(sink); + p.run(); + } + }); + // The PubsubTestClientFactory will assert fail on close if the actual published + // message does not match the expected publish message. + } + + @Test + public void sendOneMessagePerOrderingKey() throws IOException { + List<OutgoingMessage> outgoing = + ImmutableList.of( + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(DATA)) + .putAllAttributes(ATTRIBUTES) + .setOrderingKey(ORDERING_KEY_FN.apply(DATA)) + .build(), + TIMESTAMP, + getRecordId(DATA), + null), + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(DATA + DATA)) + .putAllAttributes(ATTRIBUTES) + .setOrderingKey(ORDERING_KEY_FN.apply(DATA + DATA)) + .build(), + TIMESTAMP, + getRecordId(DATA + DATA), + null)); + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.of())) { + PubsubUnboundedSink sink = + new PubsubUnboundedSink( + factory, + StaticValueProvider.of(TOPIC), + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + NUM_SHARDS, + true, + 1 /* batchSize */, + 1 /* batchBytes */, + Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC, + null); + p.apply(Create.of(DATA, DATA + DATA)) + .apply(ParDo.of(new Stamp(ATTRIBUTES, ORDERING_KEY_FN))) + .setCoder(PubsubMessageSchemaCoder.getSchemaCoder()) + .apply(sink); + p.run(); + } + // The PubsubTestClientFactory will assert fail on close if the actual published + // message does not match the expected publish message. + } + + @Test + public void sendMoreThanOneBatchByOrderingKey() throws IOException { + List<OutgoingMessage> outgoing = new ArrayList<>(); + List<String> data = new ArrayList<>(); + int batchSize = 2; + int batchBytes = 1000; + for (int i = 0; i < batchSize * 10; i++) { + String str = String.valueOf(i); + outgoing.add( + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(str)) + .setOrderingKey(ORDERING_KEY_FN.apply(str)) + .build(), + TIMESTAMP, + getRecordId(str), + null)); + data.add(str); + } + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.of())) { + PubsubUnboundedSink sink = + new PubsubUnboundedSink( + factory, + StaticValueProvider.of(TOPIC), + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + NUM_SHARDS, + true, + batchSize, + batchBytes, + Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC, + null); + p.apply(Create.of(data)) Review Comment: I don't think it does, I've added a call to `Collections.shuffle` in d476406 to randomly reorder the test input. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org