Repository: beam
Updated Branches:
  refs/heads/master defb55405 -> 3c1d411a2


Moves coder choice into PubsubSource


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9d152498
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9d152498
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9d152498

Branch: refs/heads/master
Commit: 9d152498414ead3db77d64f49a2e90f4d4d255e0
Parents: c9b7fe4
Author: Eugene Kirpichov <[email protected]>
Authored: Thu May 4 14:58:40 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Thu May 4 15:59:11 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java    | 9 +--------
 .../beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java       | 8 ++++----
 2 files changed, 5 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9d152498/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index fa2d20f..51da111 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -721,14 +721,7 @@ public class PubsubIO {
               getTimestampAttribute(),
               getIdAttribute(),
               getNeedsAttributes());
-      return input
-          .getPipeline()
-          .apply(source)
-          .setCoder(
-              getNeedsAttributes()
-                  ? PubsubMessageWithAttributesCoder.of()
-                  : PubsubMessagePayloadOnlyCoder.of())
-          .apply(MapElements.via(getParseFn()));
+      return input.apply(source).apply(MapElements.via(getParseFn()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/9d152498/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index e5be71b..c2cbe73 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1155,14 +1155,14 @@ public class PubsubUnboundedSource extends 
PTransform<PBegin, PCollection<Pubsub
     @Nullable
     @Override
     public Coder<PubsubCheckpoint> getCheckpointMarkCoder() {
-      @SuppressWarnings("unchecked") PubsubCheckpointCoder typedCoder =
-          (PubsubCheckpointCoder) CHECKPOINT_CODER;
-      return typedCoder;
+      return CHECKPOINT_CODER;
     }
 
     @Override
     public Coder<PubsubMessage> getDefaultOutputCoder() {
-      return new PubsubMessageWithAttributesCoder();
+      return outer.getNeedsAttributes()
+          ? PubsubMessageWithAttributesCoder.of()
+          : PubsubMessagePayloadOnlyCoder.of();
     }
 
     @Override

Reply via email to