Repository: beam Updated Branches: refs/heads/master defb55405 -> 3c1d411a2
Moves coder choice into PubsubSource Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9d152498 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9d152498 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9d152498 Branch: refs/heads/master Commit: 9d152498414ead3db77d64f49a2e90f4d4d255e0 Parents: c9b7fe4 Author: Eugene Kirpichov <[email protected]> Authored: Thu May 4 14:58:40 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Thu May 4 15:59:11 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 9 +-------- .../beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 8 ++++---- 2 files changed, 5 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9d152498/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index fa2d20f..51da111 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -721,14 +721,7 @@ public class PubsubIO { getTimestampAttribute(), getIdAttribute(), getNeedsAttributes()); - return input - .getPipeline() - .apply(source) - .setCoder( - getNeedsAttributes() - ? PubsubMessageWithAttributesCoder.of() - : PubsubMessagePayloadOnlyCoder.of()) - .apply(MapElements.via(getParseFn())); + return input.apply(source).apply(MapElements.via(getParseFn())); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/9d152498/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index e5be71b..c2cbe73 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -1155,14 +1155,14 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub @Nullable @Override public Coder<PubsubCheckpoint> getCheckpointMarkCoder() { - @SuppressWarnings("unchecked") PubsubCheckpointCoder typedCoder = - (PubsubCheckpointCoder) CHECKPOINT_CODER; - return typedCoder; + return CHECKPOINT_CODER; } @Override public Coder<PubsubMessage> getDefaultOutputCoder() { - return new PubsubMessageWithAttributesCoder(); + return outer.getNeedsAttributes() + ? PubsubMessageWithAttributesCoder.of() + : PubsubMessagePayloadOnlyCoder.of(); } @Override
