scwhittle commented on code in PR #26063:
URL: https://github.com/apache/beam/pull/26063#discussion_r1168369739


##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java:
##########
@@ -2341,6 +2347,97 @@ public void 
testStreamingGroupIntoBatchesWithShardedKeyOverrideBytes() throws IO
     verifyGroupIntoBatchesOverrideBytes(p, true, true);
   }
 
+  @Test
+  public void testPubsubSinkOverride() throws IOException {
+    PipelineOptions options = buildPipelineOptions();
+    List<String> experiments =
+        new ArrayList<>(
+            ImmutableList.of(
+                GcpOptions.STREAMING_ENGINE_EXPERIMENT,
+                GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
+                "use_runner_v2"));
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    dataflowOptions.setExperiments(experiments);
+    dataflowOptions.setStreaming(true);
+    Pipeline p = Pipeline.create(options);
+
+    List<PubsubMessage> testValues =
+        Arrays.asList(
+            new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), 
Collections.emptyMap()));
+    PCollection<PubsubMessage> input =
+        p.apply("CreateValuesBytes", Create.of(testValues))
+            .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    input.apply(PubsubIO.writeMessages().to("projects/project/topics/topic"));
+    p.run();
+
+    AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
+    p.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+
+          @Override
+          public CompositeBehavior enterCompositeTransform(Node node) {

Review Comment:
   it should only be a composite or a primitive right?
   can you simplify this test and remove the unnecessary override here?



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java:
##########
@@ -46,11 +46,15 @@ public class PropertyNames {
   public static final String PARALLEL_INPUT = "parallel_input";
   public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
   public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = 
"pubsub_serialized_attributes_fn";
+
   public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
   public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = 
"pubsub_subscription_runtime_override";
   public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = 
"pubsub_timestamp_label";
   public static final String PUBSUB_TOPIC = "pubsub_topic";
   public static final String PUBSUB_TOPIC_OVERRIDE = 
"pubsub_topic_runtime_override";
+
+  public static final String PUBSUB_DYNAMIC_DESTINATIONS = 
"pubsub_with_dynamic_destinations";

Review Comment:
   nit: pubsub_dynamic_destinations to match enum? ie remove "with"



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
+  private final String timestampLabel;
+  private final String idLabel;
+  private final StreamingModeExecutionContext context;
+  // Function used to convert PCollection elements to PubsubMessage objects.
+
+  PubsubDynamicSink(String timestampLabel, String idLabel, 
StreamingModeExecutionContext context) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.context = context;
+  }
+
+  /** A {@link SinkFactory.Registrar} for pubsub sinks. */
+  @AutoService(SinkFactory.Registrar.class)
+  public static class Registrar implements SinkFactory.Registrar {
+
+    @Override
+    public Map<String, SinkFactory> factories() {
+      PubsubDynamicSink.Factory factory = new Factory();
+      return ImmutableMap.of(
+          "PubsubDynamicSink",
+          factory,
+          "org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
+          factory);
+    }
+  }
+
+  public static class Factory implements SinkFactory {
+    @Override
+    public PubsubDynamicSink create(
+        CloudObject spec,
+        Coder<?> coder,
+        @Nullable PipelineOptions options,
+        @Nullable DataflowExecutionContext executionContext,
+        DataflowOperationContext operationContext)
+        throws Exception {
+      String timestampLabel = getString(spec, 
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "");
+      String idLabel = getString(spec, PropertyNames.PUBSUB_ID_ATTRIBUTE, "");
+
+      return new PubsubDynamicSink(
+          timestampLabel, idLabel, (StreamingModeExecutionContext) 
executionContext);
+    }
+  }
+
+  @Override
+  public Sink.SinkWriter<WindowedValue<PubsubMessage>> writer() {
+    return new PubsubDynamicSink.PubsubWriter();
+  }
+
+  class PubsubWriter implements Sink.SinkWriter<WindowedValue<PubsubMessage>> {
+    private Map<String, Windmill.PubSubMessageBundle.Builder> outputBuilders;

Review Comment:
   final for these?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
+  private final String timestampLabel;
+  private final String idLabel;
+  private final StreamingModeExecutionContext context;
+  // Function used to convert PCollection elements to PubsubMessage objects.
+
+  PubsubDynamicSink(String timestampLabel, String idLabel, 
StreamingModeExecutionContext context) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.context = context;
+  }
+
+  /** A {@link SinkFactory.Registrar} for pubsub sinks. */
+  @AutoService(SinkFactory.Registrar.class)
+  public static class Registrar implements SinkFactory.Registrar {
+
+    @Override
+    public Map<String, SinkFactory> factories() {
+      PubsubDynamicSink.Factory factory = new Factory();
+      return ImmutableMap.of(
+          "PubsubDynamicSink",
+          factory,
+          "org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
+          factory);
+    }
+  }
+
+  public static class Factory implements SinkFactory {

Review Comment:
   can this be private or package visible? or perhaps even inlined in 
factories() above?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
+  private final String timestampLabel;
+  private final String idLabel;
+  private final StreamingModeExecutionContext context;
+  // Function used to convert PCollection elements to PubsubMessage objects.
+
+  PubsubDynamicSink(String timestampLabel, String idLabel, 
StreamingModeExecutionContext context) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.context = context;
+  }
+
+  /** A {@link SinkFactory.Registrar} for pubsub sinks. */
+  @AutoService(SinkFactory.Registrar.class)
+  public static class Registrar implements SinkFactory.Registrar {
+
+    @Override
+    public Map<String, SinkFactory> factories() {
+      PubsubDynamicSink.Factory factory = new Factory();
+      return ImmutableMap.of(
+          "PubsubDynamicSink",
+          factory,
+          "org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
+          factory);
+    }
+  }
+
+  public static class Factory implements SinkFactory {
+    @Override
+    public PubsubDynamicSink create(
+        CloudObject spec,
+        Coder<?> coder,
+        @Nullable PipelineOptions options,
+        @Nullable DataflowExecutionContext executionContext,
+        DataflowOperationContext operationContext)
+        throws Exception {
+      String timestampLabel = getString(spec, 
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "");
+      String idLabel = getString(spec, PropertyNames.PUBSUB_ID_ATTRIBUTE, "");
+
+      return new PubsubDynamicSink(
+          timestampLabel, idLabel, (StreamingModeExecutionContext) 
executionContext);
+    }
+  }
+
+  @Override
+  public Sink.SinkWriter<WindowedValue<PubsubMessage>> writer() {
+    return new PubsubDynamicSink.PubsubWriter();
+  }
+
+  class PubsubWriter implements Sink.SinkWriter<WindowedValue<PubsubMessage>> {
+    private Map<String, Windmill.PubSubMessageBundle.Builder> outputBuilders;
+    private ByteStringOutputStream stream; // Kept across adds for buffer 
reuse.
+
+    PubsubWriter() {
+      outputBuilders = Maps.newHashMap();
+      stream = new ByteStringOutputStream();
+    }
+
+    public ByteString getDataFromMessage(PubsubMessage formatted, 
ByteStringOutputStream stream)
+        throws IOException {
+      Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
+          
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload()));
+      if (formatted.getAttributeMap() != null) {
+        pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
+      }
+      pubsubMessageBuilder.build().writeTo(stream);
+      return stream.toByteStringAndReset();
+    }
+
+    public void close(Windmill.PubSubMessageBundle.Builder outputBuilder) 
throws IOException {
+      Windmill.PubSubMessageBundle pubsubMessages = outputBuilder.build();
+      if (pubsubMessages.getMessagesCount() > 0) {
+        context.getOutputBuilder().addPubsubMessages(pubsubMessages);

Review Comment:
   can you avoid the build above and instead add the builder here?
   
   or is there a reason to not just directly add to the 
context.getOutputBuilder() in add? You can still have the map to identify the 
bundle but just add directly to it.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java:
##########
@@ -114,6 +115,7 @@ public RunnerApi.FunctionSpec translate(
         AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubSink> transform,
         SdkComponents components) {
       PubSubWritePayload.Builder payloadBuilder = 
PubSubWritePayload.newBuilder();
+      @Nullable

Review Comment:
   seems you need to nullcheck below if nullable



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline 
start time.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be 
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos = 
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros = 
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In 
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For 
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));

Review Comment:
   also show string and proto examples and not just avro?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline 
start time.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be 
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos = 
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros = 
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In 
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For 
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
+ * }</pre>
+ *
+ * Dynamic topic destinations can be accomplished by specifying a function to 
extract the topic from

Review Comment:
   Provide link and mirror other text?
   
   In order to write to dynamic topic destinations, the {@link ...} methods can 
be used, specifying a function to extract the topic from the record.
   



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java:
##########
@@ -2341,6 +2347,97 @@ public void 
testStreamingGroupIntoBatchesWithShardedKeyOverrideBytes() throws IO
     verifyGroupIntoBatchesOverrideBytes(p, true, true);
   }
 
+  @Test
+  public void testPubsubSinkOverride() throws IOException {
+    PipelineOptions options = buildPipelineOptions();
+    List<String> experiments =
+        new ArrayList<>(
+            ImmutableList.of(
+                GcpOptions.STREAMING_ENGINE_EXPERIMENT,
+                GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
+                "use_runner_v2"));
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    dataflowOptions.setExperiments(experiments);
+    dataflowOptions.setStreaming(true);
+    Pipeline p = Pipeline.create(options);
+
+    List<PubsubMessage> testValues =
+        Arrays.asList(
+            new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), 
Collections.emptyMap()));
+    PCollection<PubsubMessage> input =
+        p.apply("CreateValuesBytes", Create.of(testValues))
+            .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    input.apply(PubsubIO.writeMessages().to("projects/project/topics/topic"));
+    p.run();
+
+    AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
+    p.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+
+          @Override
+          public CompositeBehavior enterCompositeTransform(Node node) {
+            if (node.getTransform() instanceof 
DataflowRunner.StreamingPubsubIOWrite) {
+              sawPubsubOverride.set(true);
+            }
+            return CompositeBehavior.ENTER_TRANSFORM;
+          }
+
+          @Override
+          public void visitPrimitiveTransform(@UnknownKeyFor @NonNull 
@Initialized Node node) {
+            if (node.getTransform() instanceof 
DataflowRunner.StreamingPubsubIOWrite) {
+              sawPubsubOverride.set(true);
+            }
+          }
+        });
+    assertTrue(sawPubsubOverride.get());
+  }
+
+  @Test
+  public void testPubsubSinkDynamicOverride() throws IOException {
+    PipelineOptions options = buildPipelineOptions();
+    List<String> experiments =
+        new ArrayList<>(
+            ImmutableList.of(
+                GcpOptions.STREAMING_ENGINE_EXPERIMENT,
+                GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
+                "use_runner_v2"));
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    dataflowOptions.setExperiments(experiments);
+    dataflowOptions.setStreaming(true);
+    Pipeline p = Pipeline.create(options);
+
+    List<PubsubMessage> testValues =
+        Arrays.asList(
+            new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), 
Collections.emptyMap())
+                .withTopic(""));
+    PCollection<PubsubMessage> input =
+        p.apply("CreateValuesBytes", Create.of(testValues))
+            .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    input.apply(PubsubIO.writeMessagesDynamic());
+    p.run();
+
+    AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
+    p.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+
+          @Override
+          public CompositeBehavior enterCompositeTransform(Node node) {

Review Comment:
   ditto



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({

Review Comment:
   since this is new, can you avoid supressing these?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java:
##########
@@ -357,26 +357,33 @@ public static TopicPath topicPathFromName(String 
projectId, String topicName) {
   public abstract static class OutgoingMessage implements Serializable {
 
     /** Underlying Message. May not have publish timestamp set. */
-    public abstract PubsubMessage message();
+    public abstract PubsubMessage getMessage();
 
     /** Timestamp for element (ms since epoch). */
-    public abstract long timestampMsSinceEpoch();
+    public abstract long getTimestampMsSinceEpoch();
 
     /**
      * If using an id attribute, the record id to associate with this record's 
metadata so the
      * receiver can reject duplicates. Otherwise {@literal null}.
      */
     public abstract @Nullable String recordId();
 
+    public abstract @Nullable String topic();
+
     public static OutgoingMessage of(
-        PubsubMessage message, long timestampMsSinceEpoch, @Nullable String 
recordId) {
-      return new AutoValue_PubsubClient_OutgoingMessage(message, 
timestampMsSinceEpoch, recordId);
+        PubsubMessage message,
+        long timestampMsSinceEpoch,
+        @Nullable String recordId,
+        String topic) {

Review Comment:
   should topic be nullable here? or the accessor not nullable?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java:
##########
@@ -357,26 +357,33 @@ public static TopicPath topicPathFromName(String 
projectId, String topicName) {
   public abstract static class OutgoingMessage implements Serializable {
 
     /** Underlying Message. May not have publish timestamp set. */
-    public abstract PubsubMessage message();
+    public abstract PubsubMessage getMessage();
 
     /** Timestamp for element (ms since epoch). */
-    public abstract long timestampMsSinceEpoch();
+    public abstract long getTimestampMsSinceEpoch();
 
     /**
      * If using an id attribute, the record id to associate with this record's 
metadata so the
      * receiver can reject duplicates. Otherwise {@literal null}.
      */
     public abstract @Nullable String recordId();
 
+    public abstract @Nullable String topic();
+
     public static OutgoingMessage of(
-        PubsubMessage message, long timestampMsSinceEpoch, @Nullable String 
recordId) {
-      return new AutoValue_PubsubClient_OutgoingMessage(message, 
timestampMsSinceEpoch, recordId);
+        PubsubMessage message,
+        long timestampMsSinceEpoch,
+        @Nullable String recordId,
+        String topic) {
+      return new AutoValue_PubsubClient_OutgoingMessage(
+          message, timestampMsSinceEpoch, recordId, topic);
     }
 
     public static OutgoingMessage of(
         org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message,
         long timestampMsSinceEpoch,
-        @Nullable String recordId) {
+        @Nullable String recordId,
+        String topic) {

Review Comment:
   ditto



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
+  private final String timestampLabel;
+  private final String idLabel;
+  private final StreamingModeExecutionContext context;
+  // Function used to convert PCollection elements to PubsubMessage objects.
+
+  PubsubDynamicSink(String timestampLabel, String idLabel, 
StreamingModeExecutionContext context) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.context = context;
+  }
+
+  /** A {@link SinkFactory.Registrar} for pubsub sinks. */
+  @AutoService(SinkFactory.Registrar.class)
+  public static class Registrar implements SinkFactory.Registrar {
+
+    @Override
+    public Map<String, SinkFactory> factories() {
+      PubsubDynamicSink.Factory factory = new Factory();
+      return ImmutableMap.of(
+          "PubsubDynamicSink",
+          factory,
+          "org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
+          factory);
+    }
+  }
+
+  public static class Factory implements SinkFactory {
+    @Override
+    public PubsubDynamicSink create(
+        CloudObject spec,
+        Coder<?> coder,
+        @Nullable PipelineOptions options,
+        @Nullable DataflowExecutionContext executionContext,
+        DataflowOperationContext operationContext)
+        throws Exception {
+      String timestampLabel = getString(spec, 
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "");
+      String idLabel = getString(spec, PropertyNames.PUBSUB_ID_ATTRIBUTE, "");
+
+      return new PubsubDynamicSink(
+          timestampLabel, idLabel, (StreamingModeExecutionContext) 
executionContext);
+    }
+  }
+
+  @Override
+  public Sink.SinkWriter<WindowedValue<PubsubMessage>> writer() {
+    return new PubsubDynamicSink.PubsubWriter();
+  }
+
+  class PubsubWriter implements Sink.SinkWriter<WindowedValue<PubsubMessage>> {
+    private Map<String, Windmill.PubSubMessageBundle.Builder> outputBuilders;
+    private ByteStringOutputStream stream; // Kept across adds for buffer 
reuse.
+
+    PubsubWriter() {
+      outputBuilders = Maps.newHashMap();
+      stream = new ByteStringOutputStream();
+    }
+
+    public ByteString getDataFromMessage(PubsubMessage formatted, 
ByteStringOutputStream stream)
+        throws IOException {
+      Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
+          
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload()));

Review Comment:
   any way to avoid this copy?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import avro.shaded.com.google.common.collect.Lists;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link PubsubSink}. */
+@RunWith(JUnit4.class)
+public class PubsubDynamicSinkTest {
+  @Mock StreamingModeExecutionContext mockContext;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testWriteDynamicDestinations() throws Exception {
+    Windmill.WorkItemCommitRequest.Builder outputBuilder =
+        Windmill.WorkItemCommitRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("key"))
+            .setWorkToken(0);
+
+    when(mockContext.getOutputBuilder()).thenReturn(outputBuilder);
+
+    Map<String, Object> spec = new HashMap<>();
+    spec.put(PropertyNames.OBJECT_TYPE_NAME, "");
+    spec.put(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "ts");
+    spec.put(PropertyNames.PUBSUB_ID_ATTRIBUTE, "id");
+
+    CloudObject cloudSinkSpec = CloudObject.fromSpec(spec);
+    PubsubDynamicSink.Factory factory = new PubsubDynamicSink.Factory();
+    PubsubDynamicSink sink =
+        factory.create(
+            cloudSinkSpec,
+            WindowedValue.getFullCoder(VoidCoder.of(), 
IntervalWindow.getCoder()),
+            null,
+            mockContext,
+            null);
+
+    Sink.SinkWriter<WindowedValue<PubsubMessage>> writer = sink.writer();
+
+    List<Windmill.Message> expectedMessages = Lists.newArrayList();
+    for (int i = 0; i < 10; ++i) {
+      byte[] payload = String.format("value_%d", 
i).getBytes(StandardCharsets.UTF_8);
+      Pubsub.PubsubMessage pubsubMessage =
+          
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(payload)).build();
+      expectedMessages.add(
+          Windmill.Message.newBuilder()
+              .setTimestamp(i * 1000)
+              .setData(pubsubMessage.toByteString())
+              .build());
+      writer.add(
+          WindowedValue.timestampedValueInGlobalWindow(
+              new PubsubMessage(payload, null).withTopic("topic1"), new 
Instant(i)));

Review Comment:
   vary the payload and timestamp per topic to better verify



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline 
start time.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be 
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos = 
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros = 
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In 
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For 
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
+ * }</pre>
+ *
+ * Dynamic topic destinations can be accomplished by specifying a function to 
extract the topic from
+ * the record. For example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).
+ *      to((ValueInSingleWindow<Event> quote) -> {
+ *               String country = quote.getCountry();
+ *               return "projects/myproject/topics/events_" + country;
+ *              });
+ * }</pre>
+ *
+ * Dynamic topics can also be specified by writing {@link PubsubMessage} 
objects containing the
+ * topic. For example:
+ *
+ * <pre>{@code
+ * events.apply(MapElements.into(new TypeDescriptor<PubsubMessage>() {})
+ *                         .via(e -> new PubsubMessage(
+ *                             e.toByteString(), 
Collections.emptyMap()).withTopic(e.getCountry())))
+ * .apply(PubsubIO.writeMessagesDynamic());
+ * }</pre>
+ *
+ * <h3>Custom timestamps</h3>
+ *
+ * All messages read from PubSub have a stable publish timestamp that is 
independent of when the
+ * message is read from the PubSub topic. By default, the publish time is used 
as the timestamp for
+ * all messages read and the watermark is based on that. If there is a 
different logical timestamp
+ * to be used, that timestamp must be published in a PubSub attribute. The 
attribute is specified

Review Comment:
   that timestamp may be published in a Pubsub attribute specified using {@link 
....}



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1350,23 +1385,15 @@ public void startBundle(StartBundleContext c) throws 
IOException {
       @ProcessElement
       public void processElement(ProcessContext c) throws IOException, 
SizeLimitExceededException {
         PubsubMessage message = getFormatFn().apply(c.element());
-        int messageSize = validateAndGetPubsubMessageSize(message);
-        if (messageSize > maxPublishBatchByteSize) {

Review Comment:
   keep this as sanity check?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -242,7 +271,19 @@ private static class WriterFn extends DoFn<KV<Integer, 
Iterable<OutgoingMessage>
 
     /** BLOCKING Send {@code messages} as a batch to Pubsub. */
     private void publishBatch(List<OutgoingMessage> messages, int bytes) 
throws IOException {
-      int n = pubsubClient.publish(topic.get(), messages);
+      int n = 0;
+      if (topic != null) {
+        n = pubsubClient.publish(topic.get(), messages);
+      } else {
+        Map<TopicPath, List<OutgoingMessage>> messagesPerTopic = 
Maps.newHashMap();
+        for (OutgoingMessage message : messages) {
+          TopicPath topicPath = 
PubsubClient.topicPathFromPath(message.topic());

Review Comment:
   could do this translation during publish loop so it's per topic instead of 
per message



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -495,6 +604,10 @@ static class PubsubSink extends 
PTransform<PCollection<byte[]>, PDone> {
       this.outer = outer;
     }
 
+    boolean isDynamic() {

Review Comment:
   remove? seems PubsubUnboundedSink would be used if this was true?



##########
website/www/site/layouts/shortcodes/flink_python_pipeline_options.html:
##########
@@ -133,6 +132,11 @@
   <td>Sets the behavior of reusing objects.</td>
   <td>Default: <code>false</code></td>
 </tr>
+<tr>
+  <td><code>operator_chaining</code></td>

Review Comment:
   ditto



##########
website/www/site/layouts/shortcodes/flink_java_pipeline_options.html:
##########
@@ -133,6 +132,11 @@
   <td>Sets the behavior of reusing objects.</td>
   <td>Default: <code>false</code></td>
 </tr>
+<tr>
+  <td><code>operatorChaining</code></td>

Review Comment:
   revert this file?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import avro.shaded.com.google.common.collect.Lists;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link PubsubSink}. */
+@RunWith(JUnit4.class)
+public class PubsubDynamicSinkTest {
+  @Mock StreamingModeExecutionContext mockContext;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testWriteDynamicDestinations() throws Exception {
+    Windmill.WorkItemCommitRequest.Builder outputBuilder =
+        Windmill.WorkItemCommitRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("key"))
+            .setWorkToken(0);
+
+    when(mockContext.getOutputBuilder()).thenReturn(outputBuilder);
+
+    Map<String, Object> spec = new HashMap<>();
+    spec.put(PropertyNames.OBJECT_TYPE_NAME, "");
+    spec.put(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "ts");
+    spec.put(PropertyNames.PUBSUB_ID_ATTRIBUTE, "id");
+
+    CloudObject cloudSinkSpec = CloudObject.fromSpec(spec);
+    PubsubDynamicSink.Factory factory = new PubsubDynamicSink.Factory();

Review Comment:
   would it be better to look up in the registry if that is normally how things 
are wired up? (would also mean you don't have to expose Factory)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to