scwhittle commented on code in PR #31608: URL: https://github.com/apache/beam/pull/31608#discussion_r2068137571
########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java: ########## @@ -315,6 +340,138 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { // message does not match the expected publish message. } + @Test + public void sendOneMessageWithWrongCoderForOrderingKey() throws IOException { + List<OutgoingMessage> outgoing = + ImmutableList.of( + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(DATA)) + .putAllAttributes(ATTRIBUTES) + .setOrderingKey(ORDERING_KEY_FN.apply(DATA)) + .build(), + TIMESTAMP, + getRecordId(DATA), + null)); + assertThrows( + PipelineExecutionException.class, + () -> { + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.of())) { + PubsubUnboundedSink sink = + new PubsubUnboundedSink( + factory, + StaticValueProvider.of(TOPIC), + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + NUM_SHARDS, + true, + 1 /* batchSize */, + 1 /* batchBytes */, + Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC, + null); + p.apply(Create.of(DATA)) + .apply(ParDo.of(new Stamp(ATTRIBUTES, ORDERING_KEY_FN))) + .apply(sink); + p.run(); + } + }); + // The PubsubTestClientFactory will assert fail on close if the actual published + // message does not match the expected publish message. + } + + @Test + public void sendOneMessagePerOrderingKey() throws IOException { + List<OutgoingMessage> outgoing = + ImmutableList.of( + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(DATA)) + .putAllAttributes(ATTRIBUTES) + .setOrderingKey(ORDERING_KEY_FN.apply(DATA)) + .build(), + TIMESTAMP, + getRecordId(DATA), + null), + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(DATA + DATA)) + .putAllAttributes(ATTRIBUTES) + .setOrderingKey(ORDERING_KEY_FN.apply(DATA + DATA)) + .build(), + TIMESTAMP, + getRecordId(DATA + DATA), + null)); + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.of())) { + PubsubUnboundedSink sink = + new PubsubUnboundedSink( + factory, + StaticValueProvider.of(TOPIC), + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + NUM_SHARDS, + true, + 1 /* batchSize */, + 1 /* batchBytes */, + Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC, + null); + p.apply(Create.of(DATA, DATA + DATA)) + .apply(ParDo.of(new Stamp(ATTRIBUTES, ORDERING_KEY_FN))) + .setCoder(PubsubMessageSchemaCoder.getSchemaCoder()) + .apply(sink); + p.run(); + } + // The PubsubTestClientFactory will assert fail on close if the actual published + // message does not match the expected publish message. + } + + @Test + public void sendMoreThanOneBatchByOrderingKey() throws IOException { + List<OutgoingMessage> outgoing = new ArrayList<>(); + List<String> data = new ArrayList<>(); + int batchSize = 2; + int batchBytes = 1000; + for (int i = 0; i < batchSize * 10; i++) { + String str = String.valueOf(i); + outgoing.add( + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(str)) + .setOrderingKey(ORDERING_KEY_FN.apply(str)) + .build(), + TIMESTAMP, + getRecordId(str), + null)); + data.add(str); + } + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.of())) { + PubsubUnboundedSink sink = + new PubsubUnboundedSink( + factory, + StaticValueProvider.of(TOPIC), + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + NUM_SHARDS, + true, + batchSize, + batchBytes, + Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC, + null); + p.apply(Create.of(data)) Review Comment: do you know if Create randomizes the data ordering? If not it might improve the test to do so first as otherwise I think the test could pass even if batching logic was broken since ordering key 1 is processed fully before ordering key 2 and would fill batches evenly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org