Makes PubsubMessagePayloadOnlyCoder not require whole-stream context Now that PubsubIO.Read can directly read PubsubMessage's, they should be treated as first-class PCollection elements, and they can be encoded/decoded in any contexts.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9b7fe44 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9b7fe44 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9b7fe44 Branch: refs/heads/master Commit: c9b7fe443368badf6fd9fbd08f5234f17766c2cf Parents: d9943a3 Author: Eugene Kirpichov <[email protected]> Authored: Thu May 4 14:40:08 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Thu May 4 15:59:11 2017 -0700 ---------------------------------------------------------------------- .../io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c9b7fe44/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java index 81c1a45..d120f72 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java @@ -17,17 +17,18 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; -import static com.google.common.base.Preconditions.checkState; - import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.util.StreamUtils; /** A coder for PubsubMessage treating the raw bytes being decoded as the message's payload. */ public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> { + private static final Coder<byte[]> PAYLOAD_CODER = ByteArrayCoder.of(); + public static PubsubMessagePayloadOnlyCoder of() { return new PubsubMessagePayloadOnlyCoder(); } @@ -35,14 +36,12 @@ public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> { @Override public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { - checkState(context.isWholeStream, "Expected to only be used in a whole-stream context"); - outStream.write(value.getPayload()); + PAYLOAD_CODER.encode(value.getPayload(), outStream, context); } @Override public PubsubMessage decode(InputStream inStream, Context context) throws IOException { - checkState(context.isWholeStream, "Expected to only be used in a whole-stream context"); return new PubsubMessage( - StreamUtils.getBytes(inStream), ImmutableMap.<String, String>of()); + PAYLOAD_CODER.decode(inStream, context), ImmutableMap.<String, String>of()); } }
