[BEAM-2170] PubsubMessageWithAttributesCoder should not NPE on messages without attributes
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d9943a3c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d9943a3c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d9943a3c Branch: refs/heads/master Commit: d9943a3cbd402872053f5482cf08cb3b70416bd4 Parents: defb554 Author: Eugene Kirpichov <[email protected]> Authored: Thu May 4 14:12:24 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Thu May 4 15:59:11 2017 -0700 ---------------------------------------------------------------------- .../PubsubMessageWithAttributesCoder.java | 14 ++++---- .../io/gcp/pubsub/PubsubUnboundedSinkTest.java | 34 +++++++++++++++++--- 2 files changed, 35 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d9943a3c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java index f70955d..e061edc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java @@ -31,10 +31,11 @@ import org.apache.beam.sdk.values.TypeDescriptor; /** A coder for PubsubMessage including attributes. */ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage> { - private static final Coder<byte[]> PAYLOAD_CODER = - NullableCoder.of(ByteArrayCoder.of()); - private static final Coder<Map<String, String>> ATTRIBUTES_CODER = MapCoder.of( - StringUtf8Coder.of(), StringUtf8Coder.of()); + // A message's payload can not be null + private static final Coder<byte[]> PAYLOAD_CODER = ByteArrayCoder.of(); + // A message's attributes can be null. + private static final Coder<Map<String, String>> ATTRIBUTES_CODER = + NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); public static Coder<PubsubMessage> of(TypeDescriptor<PubsubMessage> ignored) { return of(); @@ -46,10 +47,7 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage> public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { - PAYLOAD_CODER.encode( - value.getPayload(), - outStream, - context.nested()); + PAYLOAD_CODER.encode(value.getPayload(), outStream, context.nested()); ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context); } http://git-wip-us.apache.org/repos/asf/beam/blob/d9943a3c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index cc3c85e..e32e9a8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactor import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.RecordIdMethod; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -42,7 +41,6 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -96,7 +94,6 @@ public class PubsubUnboundedSinkTest implements Serializable { } @Test - @Category(NeedsRunner.class) public void sendOneMessage() throws IOException { List<OutgoingMessage> outgoing = ImmutableList.of(new OutgoingMessage( @@ -123,7 +120,35 @@ public class PubsubUnboundedSinkTest implements Serializable { } @Test - @Category(NeedsRunner.class) + public void sendOneMessageWithoutAttributes() throws IOException { + List<OutgoingMessage> outgoing = + ImmutableList.of( + new OutgoingMessage( + DATA.getBytes(), null /* attributes */, TIMESTAMP, getRecordId(DATA))); + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish( + TOPIC, outgoing, ImmutableList.<OutgoingMessage>of())) { + PubsubUnboundedSink sink = + new PubsubUnboundedSink( + factory, + StaticValueProvider.of(TOPIC), + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + NUM_SHARDS, + 1 /* batchSize */, + 1 /* batchBytes */, + Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC); + p.apply(Create.of(ImmutableList.of(DATA))) + .apply(ParDo.of(new Stamp(null /* attributes */))) + .apply(sink); + p.run(); + } + // The PubsubTestClientFactory will assert fail on close if the actual published + // message does not match the expected publish message. + } + + @Test public void sendMoreThanOneBatchByNumMessages() throws IOException { List<OutgoingMessage> outgoing = new ArrayList<>(); List<String> data = new ArrayList<>(); @@ -152,7 +177,6 @@ public class PubsubUnboundedSinkTest implements Serializable { } @Test - @Category(NeedsRunner.class) public void sendMoreThanOneBatchByByteSize() throws IOException { List<OutgoingMessage> outgoing = new ArrayList<>(); List<String> data = new ArrayList<>();
