This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7a309c2 [BEAM-10869] Make WriteToPubsub output serialized
PubsubMessage proto bytes when using runner v2
new c9e2580 Merge pull request #12806 from [BEAM-10869] Make
WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2
7a309c2 is described below
commit 7a309c2ce602fba67351f36067f9c65b150b86a8
Author: Boyuan Zhang <[email protected]>
AuthorDate: Wed Sep 9 21:05:02 2020 -0700
[BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes
when using runner v2
---
.../beam/runners/dataflow/DataflowRunner.java | 150 ++++++++++++++++++---
.../beam/sdk/io/gcp/pubsub/ExternalRead.java | 20 +--
.../beam/sdk/io/gcp/pubsub/ExternalWrite.java | 19 +--
.../beam/sdk/io/gcp/pubsub/PubsubMessages.java | 58 ++++++++
sdks/python/apache_beam/io/gcp/pubsub.py | 46 ++++---
sdks/python/apache_beam/io/gcp/pubsub_test.py | 5 +-
.../runners/dataflow/dataflow_runner.py | 8 +-
.../apache_beam/runners/direct/direct_runner.py | 19 ++-
8 files changed, 240 insertions(+), 85 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8931143..1e0415d 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -79,6 +79,7 @@ import
org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.core.construction.WriteFilesTranslation;
import
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import
org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
+import
org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -92,6 +93,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
@@ -109,6 +111,7 @@ import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -129,6 +132,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
@@ -155,6 +159,7 @@ import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -476,10 +481,18 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
new StreamingPubsubIOReadOverrideFactory()));
}
if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
- overridesBuilder.add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
- new StreamingPubsubIOWriteOverrideFactory(this)));
+ if (hasExperiment(options, "use_runner_v2")
+ || hasExperiment(options, "use_unified_worker")) {
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
+ new DataflowWriteToPubsubRunnerV2OverrideFactory()));
+ } else {
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
+ new StreamingPubsubIOWriteOverrideFactory(this)));
+ }
}
overridesBuilder.add(
PTransformOverride.of(
@@ -1441,6 +1454,39 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
/**
* Suppress application of {@link PubsubUnboundedSink#expand} in streaming
mode so that we can
+ * instead defer to Windmill's implementation when using Dataflow runner v2.
+ */
+ private static class DataflowRunnerV2PubsubSink extends
PTransform<PCollection<byte[]>, PDone> {
+
+ private final PubsubUnboundedSink transform;
+
+ public DataflowRunnerV2PubsubSink(PubsubUnboundedSink transform) {
+ this.transform = transform;
+ }
+
+ PubsubUnboundedSink getOverriddenTransform() {
+ return transform;
+ }
+
+ @Override
+ public PDone expand(PCollection<byte[]> input) {
+ return PDone.in(input.getPipeline());
+ }
+
+ @Override
+ protected String getKindString() {
+ return "DataflowRunnerV2PubsubSink";
+ }
+
+ static {
+ DataflowPipelineTranslator.registerTransformTranslator(
+ DataflowRunnerV2PubsubSink.class,
+ new
StreamingPubsubSinkTranslators.DataflowRunnerV2PubsubSinkTranslator());
+ }
+ }
+
+ /**
+ * Suppress application of {@link PubsubUnboundedSink#expand} in streaming
mode so that we can
* instead defer to Windmill's implementation.
*/
private static class StreamingPubsubIOWrite
@@ -1469,21 +1515,45 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
static {
DataflowPipelineTranslator.registerTransformTranslator(
- StreamingPubsubIOWrite.class, new
StreamingPubsubIOWriteTranslator());
+ StreamingPubsubIOWrite.class,
+ new
StreamingPubsubSinkTranslators.StreamingPubsubIOWriteTranslator());
}
}
- /** Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node.
*/
- private static class StreamingPubsubIOWriteTranslator
- implements TransformTranslator<StreamingPubsubIOWrite> {
+ private static class StreamingPubsubSinkTranslators {
+ /** Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal
node. */
+ static class StreamingPubsubIOWriteTranslator
+ implements TransformTranslator<StreamingPubsubIOWrite> {
- @Override
- public void translate(StreamingPubsubIOWrite transform, TranslationContext
context) {
- checkArgument(
- context.getPipelineOptions().isStreaming(),
- "StreamingPubsubIOWrite is only for streaming pipelines.");
- PubsubUnboundedSink overriddenTransform =
transform.getOverriddenTransform();
- StepTranslationContext stepContext = context.addStep(transform,
"ParallelWrite");
+ @Override
+ public void translate(StreamingPubsubIOWrite transform,
TranslationContext context) {
+ checkArgument(
+ context.getPipelineOptions().isStreaming(),
+ "StreamingPubsubIOWrite is only for streaming pipelines.");
+ StepTranslationContext stepContext = context.addStep(transform,
"ParallelWrite");
+ StreamingPubsubSinkTranslators.translate(
+ transform.getOverriddenTransform(), stepContext,
context.getInput(transform));
+ }
+ }
+
+ /** Rewrite {@link DataflowRunnerV2PubsubSink} to the appropriate internal
node. */
+ static class DataflowRunnerV2PubsubSinkTranslator
+ implements TransformTranslator<DataflowRunnerV2PubsubSink> {
+ @Override
+ public void translate(DataflowRunnerV2PubsubSink transform,
TranslationContext context) {
+ checkArgument(
+ context.getPipelineOptions().isStreaming(),
+ "StreamingPubsubIOWrite is only for streaming pipelines.");
+ StepTranslationContext stepContext = context.addStep(transform,
"ParallelWrite");
+ StreamingPubsubSinkTranslators.translate(
+ transform.getOverriddenTransform(), stepContext,
context.getInput(transform));
+ }
+ }
+
+ private static void translate(
+ PubsubUnboundedSink overriddenTransform,
+ StepTranslationContext stepContext,
+ PCollection input) {
stepContext.addInput(PropertyNames.FORMAT, "pubsub");
if (overriddenTransform.getTopicProvider().isAccessible()) {
stepContext.addInput(
@@ -1508,7 +1578,7 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
// Using a GlobalWindowCoder as a place holder because GlobalWindowCoder
is known coder.
stepContext.addEncodingInput(
WindowedValue.getFullCoder(VoidCoder.of(),
GlobalWindow.Coder.INSTANCE));
- stepContext.addInput(PropertyNames.PARALLEL_INPUT,
context.getInput(transform));
+ stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
}
}
@@ -1957,6 +2027,54 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
}
}
+ /**
+ * A replacement {@link PTransform} for {@link PubsubUnboundedSink} when
using dataflow runner v2.
+ */
+ private static class DataflowWriteToPubsubForRunnerV2
+ extends PTransform<PCollection<PubsubMessage>, PDone> {
+
+ private final PubsubUnboundedSink transform;
+
+ public DataflowWriteToPubsubForRunnerV2(PubsubUnboundedSink transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PDone expand(PCollection<PubsubMessage> input) {
+ input
+ .apply(
+ "Output Serialized PubsubMessage Proto",
+ MapElements.into(new TypeDescriptor<byte[]>() {})
+ .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto()))
+ .setCoder(ByteArrayCoder.of())
+ .apply(new DataflowRunnerV2PubsubSink(transform));
+
+ return PDone.in(input.getPipeline());
+ }
+ }
+
+ /**
+ * A {@link PTransformOverrideFactory} to provide replacement {@link
PTransform} for {@link
+ * PubsubUnboundedSink} when using dataflow runner v2.
+ */
+ private static class DataflowWriteToPubsubRunnerV2OverrideFactory
+ implements PTransformOverrideFactory<PCollection<PubsubMessage>, PDone,
PubsubUnboundedSink> {
+
+ @Override
+ public PTransformReplacement<PCollection<PubsubMessage>, PDone>
getReplacementTransform(
+ AppliedPTransform<PCollection<PubsubMessage>, PDone,
PubsubUnboundedSink> transform) {
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ new DataflowWriteToPubsubForRunnerV2(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PCollection<?>> outputs, PDone newOutput) {
+ return Collections.emptyMap();
+ }
+ }
+
@VisibleForTesting
static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
implements PTransformOverrideFactory<
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
index 93612ae..250b68c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
@@ -18,18 +18,17 @@
package org.apache.beam.sdk.io.gcp.pubsub;
import com.google.auto.service.AutoService;
-import com.google.protobuf.ByteString;
import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubSubscription;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic;
+import
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.ParsePayloadAsPubsubMessageProto;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -111,21 +110,4 @@ public final class ExternalRead implements
ExternalTransformRegistrar {
return readBuilder.build();
}
}
-
- // Convert the PubsubMessage to a PubsubMessage proto, then return its
serialized representation.
- private static class ParsePayloadAsPubsubMessageProto
- implements SerializableFunction<PubsubMessage, byte[]> {
- @Override
- public byte[] apply(PubsubMessage input) {
- Map<String, String> attributes = input.getAttributeMap();
- com.google.pubsub.v1.PubsubMessage.Builder message =
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(input.getPayload()));
- // TODO(BEAM-8085) this should not be null
- if (attributes != null) {
- message.putAllAttributes(attributes);
- }
- return message.build().toByteArray();
- }
- }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 4733e32..1258c48 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -18,16 +18,15 @@
package org.apache.beam.sdk.io.gcp.pubsub;
import com.google.auto.service.AutoService;
-import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic;
+import
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.ParsePubsubMessageProtoAsPayload;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -72,7 +71,8 @@ public final class ExternalWrite implements
ExternalTransformRegistrar {
@Override
public PTransform<PCollection<byte[]>, PDone> buildExternal(Configuration
config) {
- PubsubIO.Write.Builder<byte[]> writeBuilder =
PubsubIO.Write.newBuilder(new FormatFn());
+ PubsubIO.Write.Builder<byte[]> writeBuilder =
+ PubsubIO.Write.newBuilder(new ParsePubsubMessageProtoAsPayload());
if (config.topic != null) {
StaticValueProvider<String> topic =
StaticValueProvider.of(config.topic);
writeBuilder.setTopicProvider(NestedValueProvider.of(topic,
PubsubTopic::fromPath));
@@ -86,17 +86,4 @@ public final class ExternalWrite implements
ExternalTransformRegistrar {
return writeBuilder.build();
}
}
-
- private static class FormatFn implements SerializableFunction<byte[],
PubsubMessage> {
- @Override
- public PubsubMessage apply(byte[] input) {
- try {
- com.google.pubsub.v1.PubsubMessage message =
- com.google.pubsub.v1.PubsubMessage.parseFrom(input);
- return new PubsubMessage(message.getData().toByteArray(),
message.getAttributesMap());
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException("Could not decode Pubsub message", e);
- }
- }
- }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
new file mode 100644
index 0000000..6d9e295
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+/** Common util functions for converting between PubsubMessage proto and
{@link PubsubMessage}. */
+public class PubsubMessages {
+ // Convert the PubsubMessage to a PubsubMessage proto, then return its
serialized representation.
+ public static class ParsePayloadAsPubsubMessageProto
+ implements SerializableFunction<PubsubMessage, byte[]> {
+ @Override
+ public byte[] apply(PubsubMessage input) {
+ Map<String, String> attributes = input.getAttributeMap();
+ com.google.pubsub.v1.PubsubMessage.Builder message =
+ com.google.pubsub.v1.PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(input.getPayload()));
+ // TODO(BEAM-8085) this should not be null
+ if (attributes != null) {
+ message.putAllAttributes(attributes);
+ }
+ return message.build().toByteArray();
+ }
+ }
+
+ // Convert the serialized PubsubMessage proto to PubsubMessage.
+ public static class ParsePubsubMessageProtoAsPayload
+ implements SerializableFunction<byte[], PubsubMessage> {
+ @Override
+ public PubsubMessage apply(byte[] input) {
+ try {
+ com.google.pubsub.v1.PubsubMessage message =
+ com.google.pubsub.v1.PubsubMessage.parseFrom(input);
+ return new PubsubMessage(message.getData().toByteArray(),
message.getAttributesMap());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Could not decode Pubsub message", e);
+ }
+ }
+ }
+}
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py
b/sdks/python/apache_beam/io/gcp/pubsub.py
index b0f8bdf..6c90a0b 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -246,6 +246,10 @@ class _WriteStringsToPubSub(PTransform):
topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
"""
super(_WriteStringsToPubSub, self).__init__()
+ self.with_attributes = False
+ self.id_label = None
+ self.timestamp_attribute = None
+ self.project, self.topic_name = parse_topic(topic)
self._sink = _PubSubSink(
topic, id_label=None, with_attributes=False, timestamp_attribute=None)
@@ -286,11 +290,13 @@ class WriteToPubSub(PTransform):
self.with_attributes = with_attributes
self.id_label = id_label
self.timestamp_attribute = timestamp_attribute
+ self.project, self.topic_name = parse_topic(topic)
+ self.full_topic = topic
self._sink = _PubSubSink(
topic, id_label, with_attributes, timestamp_attribute)
@staticmethod
- def to_proto_str(element):
+ def message_to_proto_str(element):
# type: (PubsubMessage) -> bytes
if not isinstance(element, PubsubMessage):
raise TypeError(
@@ -298,13 +304,18 @@ class WriteToPubSub(PTransform):
'value: %r' % (type(element), element))
return element._to_proto_str()
+ @staticmethod
+ def bytes_to_proto_str(element):
+ # type: (bytes) -> bytes
+ msg = pubsub.types.pubsub_pb2.PubsubMessage()
+ msg.data = element
+ return msg.SerializeToString()
+
def expand(self, pcoll):
if self.with_attributes:
- pcoll = pcoll | 'ToProtobuf' >> Map(self.to_proto_str)
-
- # Without attributes, message data is written as-is. With attributes,
- # message data + attributes are passed as a serialized protobuf string (see
- # ``PubsubMessage._to_proto_str`` for exact protobuf message type).
+ pcoll = pcoll | 'ToProtobuf' >> Map(self.message_to_proto_str)
+ else:
+ pcoll = pcoll | 'ToProtobuf' >> Map(self.bytes_to_proto_str)
pcoll.element_type = bytes
return pcoll | Write(self._sink)
@@ -313,6 +324,16 @@ class WriteToPubSub(PTransform):
# TODO(BEAM-3812): Use an actual URN here.
return self.to_runner_api_pickled(context)
+ def display_data(self):
+ return {
+ 'topic': DisplayDataItem(self.full_topic, label='Pubsub Topic'),
+ 'id_label': DisplayDataItem(self.id_label, label='ID Label Attribute'),
+ 'with_attributes': DisplayDataItem(
+ True, label='With Attributes').drop_if_none(),
+ 'timestamp_attribute': DisplayDataItem(
+ self.timestamp_attribute, label='Timestamp Attribute'),
+ }
+
PROJECT_ID_REGEXP = '[a-z][-a-z0-9:.]{4,61}[a-z0-9]'
SUBSCRIPTION_REGEXP = 'projects/([^/]+)/subscriptions/(.+)'
@@ -418,10 +439,11 @@ class _PubSubSink(dataflow_io.NativeSink):
id_label, # type: Optional[str]
with_attributes, # type: bool
timestamp_attribute # type: Optional[str]
- ):
+ ):
self.coder = coders.BytesCoder()
self.full_topic = topic
self.id_label = id_label
+ #TODO(BEAM-10869): Remove with_attributes since we will never look at it.
self.with_attributes = with_attributes
self.timestamp_attribute = timestamp_attribute
@@ -432,15 +454,5 @@ class _PubSubSink(dataflow_io.NativeSink):
"""Sink format name required for remote execution."""
return 'pubsub'
- def display_data(self):
- return {
- 'topic': DisplayDataItem(self.full_topic, label='Pubsub Topic'),
- 'id_label': DisplayDataItem(self.id_label, label='ID Label Attribute'),
- 'with_attributes': DisplayDataItem(
- self.with_attributes, label='With Attributes').drop_if_none(),
- 'timestamp_attribute': DisplayDataItem(
- self.timestamp_attribute, label='Timestamp Attribute'),
- }
-
def writer(self):
raise NotImplementedError
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 77f1bc9..40fe386 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -312,16 +312,15 @@ class TestPubSubSource(unittest.TestCase):
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestPubSubSink(unittest.TestCase):
def test_display_data(self):
- sink = _PubSubSink(
+ sink = WriteToPubSub(
'projects/fakeprj/topics/a_topic',
id_label='id',
- with_attributes=False,
timestamp_attribute='time')
dd = DisplayData.create_from(sink)
expected_items = [
DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic'),
DisplayDataItemMatcher('id_label', 'id'),
- DisplayDataItemMatcher('with_attributes', False),
+ DisplayDataItemMatcher('with_attributes', True),
DisplayDataItemMatcher('timestamp_attribute', 'time'),
]
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index dd92286..204169c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -1243,6 +1243,7 @@ class DataflowRunner(PipelineRunner):
# Setting this property signals Dataflow runner to return full
# PubsubMessages instead of just the data part of the payload.
step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
+
if transform.source.timestamp_attribute is not None:
step.add_property(
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
@@ -1344,10 +1345,9 @@ class DataflowRunner(PipelineRunner):
if transform.sink.id_label:
step.add_property(
PropertyNames.PUBSUB_ID_LABEL, transform.sink.id_label)
- if transform.sink.with_attributes:
- # Setting this property signals Dataflow runner that the PCollection
- # contains PubsubMessage objects instead of just raw data.
- step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
+ # Setting this property signals Dataflow runner that the PCollection
+ # contains PubsubMessage objects instead of just raw data.
+ step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
if transform.sink.timestamp_attribute is not None:
step.add_property(
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 8f221aa..e57b5f8 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -398,19 +398,19 @@ class _DirectWriteToPubSubFn(DoFn):
BUFFER_SIZE_ELEMENTS = 100
FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5
- def __init__(self, sink):
- self.project = sink.project
- self.short_topic_name = sink.topic_name
- self.id_label = sink.id_label
- self.timestamp_attribute = sink.timestamp_attribute
- self.with_attributes = sink.with_attributes
+ def __init__(self, transform):
+ self.project = transform.project
+ self.short_topic_name = transform.topic_name
+ self.id_label = transform.id_label
+ self.timestamp_attribute = transform.timestamp_attribute
+ self.with_attributes = transform.with_attributes
# TODO(BEAM-4275): Add support for id_label and timestamp_attribute.
- if sink.id_label:
+ if transform.id_label:
raise NotImplementedError(
'DirectRunner: id_label is not supported for '
'PubSub writes')
- if sink.timestamp_attribute:
+ if transform.timestamp_attribute:
raise NotImplementedError(
'DirectRunner: timestamp_attribute is not '
'supported for PubSub writes')
@@ -475,8 +475,7 @@ def _get_pubsub_transform_overrides(pipeline_options):
raise Exception(
'PubSub I/O is only available in streaming mode '
'(use the --streaming flag).')
- return beam.ParDo(
- _DirectWriteToPubSubFn(applied_ptransform.transform._sink))
+ return beam.ParDo(_DirectWriteToPubSubFn(applied_ptransform.transform))
return [ReadFromPubSubOverride(), WriteToPubSubOverride()]