kennknowles commented on code in PR #26063:
URL: https://github.com/apache/beam/pull/26063#discussion_r1520570011


##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java:
##########
@@ -2341,6 +2347,74 @@ public void 
testStreamingGroupIntoBatchesWithShardedKeyOverrideBytes() throws IO
     verifyGroupIntoBatchesOverrideBytes(p, true, true);
   }
 
+  @Test
+  public void testPubsubSinkOverride() throws IOException {
+    PipelineOptions options = buildPipelineOptions();
+    List<String> experiments =
+        new ArrayList<>(
+            ImmutableList.of(
+                GcpOptions.STREAMING_ENGINE_EXPERIMENT,
+                GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
+                "use_runner_v2"));
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    dataflowOptions.setExperiments(experiments);
+    dataflowOptions.setStreaming(true);
+    Pipeline p = Pipeline.create(options);
+
+    List<PubsubMessage> testValues =
+        Arrays.asList(
+            new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), 
Collections.emptyMap()));
+    PCollection<PubsubMessage> input =
+        p.apply("CreateValuesBytes", Create.of(testValues))
+            .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    input.apply(PubsubIO.writeMessages().to("projects/project/topics/topic"));
+    p.run();
+
+    AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
+    p.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          @Override
+          public void visitPrimitiveTransform(@UnknownKeyFor @NonNull 
@Initialized Node node) {
+            if (node.getTransform() instanceof 
DataflowRunner.StreamingPubsubIOWrite) {
+              sawPubsubOverride.set(true);
+            }
+          }
+        });
+    assertTrue(sawPubsubOverride.get());

Review Comment:
   We should never be using client-side overrides for v2, where we want it all 
to happen in the service. Is there necessary functionality in the override? It 
happens after the v2 pipeline is created anyhow I think...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to