Repository: beam Updated Branches: refs/heads/master 5b0a8684a -> ad12f6316
Moves PubsubMessage to upper level and renames payload Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/210e216d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/210e216d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/210e216d Branch: refs/heads/master Commit: 210e216d95b14846cb51c94948a1c06157154de6 Parents: e57b501 Author: Eugene Kirpichov <[email protected]> Authored: Wed May 3 18:17:34 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Wed May 3 19:18:46 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 22 +++---- .../sdk/io/gcp/pubsub/PubsubCoderRegistrar.java | 3 +- .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 49 ++-------------- .../beam/sdk/io/gcp/pubsub/PubsubMessage.java | 61 ++++++++++++++++++++ .../pubsub/PubsubMessagePayloadOnlyCoder.java | 10 ++-- .../PubsubMessageWithAttributesCoder.java | 12 ++-- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 12 ++-- .../io/gcp/pubsub/PubsubUnboundedSource.java | 16 ++--- .../io/gcp/pubsub/PubsubUnboundedSinkTest.java | 4 +- .../gcp/pubsub/PubsubUnboundedSourceTest.java | 4 +- 10 files changed, 107 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index f7455b3..9e5a2fb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -89,7 +89,7 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; @@ -867,7 +867,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * instead defer to Windmill's implementation. */ private static class StreamingPubsubIORead - extends PTransform<PBegin, PCollection<PubsubIO.PubsubMessage>> { + extends PTransform<PBegin, PCollection<PubsubMessage>> { private final PubsubUnboundedSource transform; /** @@ -883,8 +883,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } @Override - public PCollection<PubsubIO.PubsubMessage> expand(PBegin input) { - return PCollection.<PubsubIO.PubsubMessage>createPrimitiveOutputInternal( + public PCollection<PubsubMessage> expand(PBegin input) { + return PCollection.<PubsubMessage>createPrimitiveOutputInternal( input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) .setCoder(new PubsubMessageWithAttributesCoder()); } @@ -956,9 +956,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } private static class IdentityMessageFn - extends SimpleFunction<PubsubIO.PubsubMessage, PubsubIO.PubsubMessage> { + extends SimpleFunction<PubsubMessage, PubsubMessage> { @Override - public PubsubIO.PubsubMessage apply(PubsubIO.PubsubMessage input) { + public PubsubMessage apply(PubsubMessage input) { return input; } } @@ -968,7 +968,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * instead defer to Windmill's implementation. */ private static class StreamingPubsubIOWrite - extends PTransform<PCollection<PubsubIO.PubsubMessage>, PDone> { + extends PTransform<PCollection<PubsubMessage>, PDone> { private final PubsubUnboundedSink transform; /** @@ -984,7 +984,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } @Override - public PDone expand(PCollection<PubsubIO.PubsubMessage> input) { + public PDone expand(PCollection<PubsubMessage> input) { return PDone.in(input.getPipeline()); } @@ -1332,7 +1332,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { private class StreamingPubsubIOWriteOverrideFactory implements PTransformOverrideFactory< - PCollection<PubsubIO.PubsubMessage>, PDone, PubsubUnboundedSink> { + PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> { private final DataflowRunner runner; private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) { @@ -1340,9 +1340,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } @Override - public PTransformReplacement<PCollection<PubsubIO.PubsubMessage>, PDone> + public PTransformReplacement<PCollection<PubsubMessage>, PDone> getReplacementTransform( - AppliedPTransform<PCollection<PubsubIO.PubsubMessage>, PDone, PubsubUnboundedSink> + AppliedPTransform<PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java index 5944305..062f350 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java @@ -30,7 +30,6 @@ public class PubsubCoderRegistrar implements CoderRegistrar { @Override public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() { return ImmutableMap.<Class<?>, CoderFactory>of( - PubsubIO.PubsubMessage.class, - CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of())); + PubsubMessage.class, CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 133839c..e023ad0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; @@ -156,44 +155,6 @@ public class PubsubIO { } /** - * Class representing a Pub/Sub message. Each message contains a single message payload and - * a map of attached attributes. - */ - public static class PubsubMessage { - - private byte[] message; - private Map<String, String> attributes; - - public PubsubMessage(byte[] message, Map<String, String> attributes) { - this.message = message; - this.attributes = attributes; - } - - /** - * Returns the main PubSub message. - */ - public byte[] getMessage() { - return message; - } - - /** - * Returns the given attribute value. If not such attribute exists, returns null. - */ - @Nullable - public String getAttribute(String attribute) { - checkNotNull(attribute, "attribute"); - return attributes.get(attribute); - } - - /** - * Returns the full map of attributes. This is an unmodifiable map. - */ - public Map<String, String> getAttributeMap() { - return attributes; - } - } - - /** * Class representing a Cloud Pub/Sub Subscription. */ public static class PubsubSubscription implements Serializable { @@ -471,7 +432,7 @@ public class PubsubIO { /** * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The - * messages will only contain a {@link PubsubMessage#getMessage() payload}, but no {@link + * messages will only contain a {@link PubsubMessage#getPayload() payload}, but no {@link * PubsubMessage#getAttributeMap() attributes}. */ public static Read<PubsubMessage> readPubsubMessagesWithoutAttributes() { @@ -484,7 +445,7 @@ public class PubsubIO { /** * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The - * messages will contain both a {@link PubsubMessage#getMessage() payload} and {@link + * messages will contain both a {@link PubsubMessage#getPayload() payload} and {@link * PubsubMessage#getAttributeMap() attributes}. */ public static Read<PubsubMessage> readPubsubMessagesWithAttributes() { @@ -939,7 +900,7 @@ public class PubsubIO { public void processElement(ProcessContext c) throws IOException { byte[] payload; PubsubMessage message = getFormatFn().apply(c.element()); - payload = message.getMessage(); + payload = message.getPayload(); Map<String, String> attributes = message.getAttributeMap(); // NOTE: The record id is always null. output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null)); @@ -981,7 +942,7 @@ public class PubsubIO { private static class ParsePayloadAsUtf8 extends SimpleFunction<PubsubMessage, String> { @Override public String apply(PubsubMessage input) { - return new String(input.getMessage(), StandardCharsets.UTF_8); + return new String(input.getPayload(), StandardCharsets.UTF_8); } } @@ -995,7 +956,7 @@ public class PubsubIO { @Override public T apply(PubsubMessage input) { try { - return CoderUtils.decodeFromByteArray(coder, input.getMessage()); + return CoderUtils.decodeFromByteArray(coder, input.getPayload()); } catch (CoderException e) { throw new RuntimeException("Could not decode Pubsub message", e); } http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java new file mode 100644 index 0000000..69f850a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Class representing a Pub/Sub message. Each message contains a single message payload and + * a map of attached attributes. + */ +public class PubsubMessage { + + private byte[] message; + private Map<String, String> attributes; + + public PubsubMessage(byte[] payload, Map<String, String> attributes) { + this.message = payload; + this.attributes = attributes; + } + + /** + * Returns the main PubSub message. + */ + public byte[] getPayload() { + return message; + } + + /** + * Returns the given attribute value. If not such attribute exists, returns null. + */ + @Nullable + public String getAttribute(String attribute) { + checkNotNull(attribute, "attribute"); + return attributes.get(attribute); + } + + /** + * Returns the full map of attributes. This is an unmodifiable map. + */ + public Map<String, String> getAttributeMap() { + return attributes; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java index f0dae46..81c1a45 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java @@ -27,22 +27,22 @@ import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.util.StreamUtils; /** A coder for PubsubMessage treating the raw bytes being decoded as the message's payload. */ -public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubIO.PubsubMessage> { +public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> { public static PubsubMessagePayloadOnlyCoder of() { return new PubsubMessagePayloadOnlyCoder(); } @Override - public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context) + public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { checkState(context.isWholeStream, "Expected to only be used in a whole-stream context"); - outStream.write(value.getMessage()); + outStream.write(value.getPayload()); } @Override - public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException { + public PubsubMessage decode(InputStream inStream, Context context) throws IOException { checkState(context.isWholeStream, "Expected to only be used in a whole-stream context"); - return new PubsubIO.PubsubMessage( + return new PubsubMessage( StreamUtils.getBytes(inStream), ImmutableMap.<String, String>of()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java index be9493c..f70955d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java @@ -30,13 +30,13 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.values.TypeDescriptor; /** A coder for PubsubMessage including attributes. */ -public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.PubsubMessage> { +public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage> { private static final Coder<byte[]> PAYLOAD_CODER = NullableCoder.of(ByteArrayCoder.of()); private static final Coder<Map<String, String>> ATTRIBUTES_CODER = MapCoder.of( StringUtf8Coder.of(), StringUtf8Coder.of()); - public static Coder<PubsubIO.PubsubMessage> of(TypeDescriptor<PubsubIO.PubsubMessage> ignored) { + public static Coder<PubsubMessage> of(TypeDescriptor<PubsubMessage> ignored) { return of(); } @@ -44,19 +44,19 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.Pubsu return new PubsubMessageWithAttributesCoder(); } - public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context) + public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { PAYLOAD_CODER.encode( - value.getMessage(), + value.getPayload(), outStream, context.nested()); ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context); } @Override - public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException { + public PubsubMessage decode(InputStream inStream, Context context) throws IOException { byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested()); Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context); - return new PubsubIO.PubsubMessage(payload, attributes); + return new PubsubMessage(payload, attributes); } } http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 67530ec..9d97e91 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -81,7 +81,7 @@ import org.joda.time.Duration; * to dedup messages. * </ul> */ -public class PubsubUnboundedSink extends PTransform<PCollection<PubsubIO.PubsubMessage>, PDone> { +public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, PDone> { /** * Default maximum number of messages per publish. */ @@ -154,7 +154,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubIO.PubsubM /** * Convert elements to messages and shard them. */ - private static class ShardFn extends DoFn<PubsubIO.PubsubMessage, KV<Integer, OutgoingMessage>> { + private static class ShardFn extends DoFn<PubsubMessage, KV<Integer, OutgoingMessage>> { private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements"); private final int numShards; private final RecordIdMethod recordIdMethod; @@ -167,8 +167,8 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubIO.PubsubM @ProcessElement public void processElement(ProcessContext c) throws Exception { elementCounter.inc(); - PubsubIO.PubsubMessage message = c.element(); - byte[] elementBytes = message.getMessage(); + PubsubMessage message = c.element(); + byte[] elementBytes = message.getPayload(); Map<String, String> attributes = message.getAttributeMap(); long timestampMsSinceEpoch = c.timestamp().getMillis(); @@ -427,11 +427,11 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubIO.PubsubM } @Override - public PDone expand(PCollection<PubsubIO.PubsubMessage> input) { + public PDone expand(PCollection<PubsubMessage> input) { input .apply( "PubsubUnboundedSink.Window", - Window.<PubsubIO.PubsubMessage>into(new GlobalWindows()) + Window.<PubsubMessage>into(new GlobalWindows()) .triggering( Repeatedly.forever( AfterFirst.of( http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index d366949..e5be71b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -107,7 +107,7 @@ import org.slf4j.LoggerFactory; * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency. * </ul> */ -public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<PubsubIO.PubsubMessage>> { +public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<PubsubMessage>> { private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class); /** @@ -389,7 +389,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub * but not yet consumed downstream and/or ACKed back to Pubsub. */ @VisibleForTesting - static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubIO.PubsubMessage> { + static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubMessage> { /** * For access to topic and checkpointCoder. */ @@ -963,11 +963,11 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub } @Override - public PubsubIO.PubsubMessage getCurrent() throws NoSuchElementException { + public PubsubMessage getCurrent() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - return new PubsubIO.PubsubMessage(current.elementBytes, current.attributes); + return new PubsubMessage(current.elementBytes, current.attributes); } @Override @@ -1088,7 +1088,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub // ================================================================================ @VisibleForTesting - static class PubsubSource extends UnboundedSource<PubsubIO.PubsubMessage, PubsubCheckpoint> { + static class PubsubSource extends UnboundedSource<PubsubMessage, PubsubCheckpoint> { public final PubsubUnboundedSource outer; // The subscription to read from. @VisibleForTesting @@ -1161,7 +1161,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub } @Override - public Coder<PubsubIO.PubsubMessage> getDefaultOutputCoder() { + public Coder<PubsubMessage> getDefaultOutputCoder() { return new PubsubMessageWithAttributesCoder(); } @@ -1181,7 +1181,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub // StatsFn // ================================================================================ - private static class StatsFn extends DoFn<PubsubIO.PubsubMessage, PubsubIO.PubsubMessage> { + private static class StatsFn extends DoFn<PubsubMessage, PubsubMessage> { private final Counter elementCounter = SourceMetrics.elementsRead(); private final PubsubClientFactory pubsubFactory; @@ -1398,7 +1398,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub } @Override - public PCollection<PubsubIO.PubsubMessage> expand(PBegin input) { + public PCollection<PubsubMessage> expand(PBegin input) { return input.getPipeline().begin() .apply(Read.from(new PubsubSource(this))) .apply("PubsubUnboundedSource.Stats", http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index f2f40bb..cc3c85e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -60,7 +60,7 @@ public class PubsubUnboundedSinkTest implements Serializable { private static final String ID_ATTRIBUTE = "id"; private static final int NUM_SHARDS = 10; - private static class Stamp extends DoFn<String, PubsubIO.PubsubMessage> { + private static class Stamp extends DoFn<String, PubsubMessage> { private final Map<String, String> attributes; private Stamp() { @@ -74,7 +74,7 @@ public class PubsubUnboundedSinkTest implements Serializable { @ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp( - new PubsubIO.PubsubMessage( + new PubsubMessage( c.element().getBytes(StandardCharsets.UTF_8), attributes), new Instant(TIMESTAMP)); } http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java index 592dfa3..ee467da 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java @@ -114,8 +114,8 @@ public class PubsubUnboundedSourceTest { factory = null; } - private static String data(PubsubIO.PubsubMessage message) { - return new String(message.getMessage(), StandardCharsets.UTF_8); + private static String data(PubsubMessage message) { + return new String(message.getPayload(), StandardCharsets.UTF_8); } @Test
