arunpandianp commented on code in PR #35901:
URL: https://github.com/apache/beam/pull/35901#discussion_r2373093036


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -4058,6 +4070,121 @@ public void testStuckCommit() throws Exception {
         removeDynamicFields(result.get(1L)));
   }
 
+  @Test
+  public void testSwitchStreamingWorkerHarness() throws Exception {
+    if (!streamingEngine) {
+      return;
+    }
+
+    String serverName = "StreamingDataflowWorkerTestChannel";
+    Server fakeServer =
+        grpcCleanup
+            .register(
+                InProcessServerBuilder.forName(serverName)
+                    .directExecutor()
+                    .addService(new 
FakeWindmillServer.FakeWindmillMetadataService(server))
+                    .addService(
+                        new CloudWindmillServiceV1Alpha1Grpc
+                            .CloudWindmillServiceV1Alpha1ImplBase() {})
+                    .build())
+            .start();
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    // Start with CloudPath.
+    DataflowWorkerHarnessOptions options =
+        
createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false");
+    options.setWindmillServiceEndpoint(serverName);
+
+    StreamingDataflowWorker worker =
+        makeWorker(
+            defaultWorkerParams()
+                .setOptions(options)
+                .setInstructions(instructions)
+                .publishCounters()
+                .build());
+
+    ArgumentCaptor<Consumer<StreamingGlobalConfig>> observerCaptor =
+        ArgumentCaptor.forClass(Consumer.class);
+
+    worker.start();
+
+    verify(mockGlobalConfigHandle, 
atLeastOnce()).registerConfigObserver(observerCaptor.capture());
+
+    List<Consumer<StreamingGlobalConfig>> observers = 
observerCaptor.getAllValues();
+
+    assertTrue(
+        "Worker should start with SingleSourceWorkerHarness",
+        worker.getStreamingWorkerHarness() instanceof 
SingleSourceWorkerHarness);
+
+    // Process some work with CloudPath.
+    server.whenGetWorkCalled().thenReturn(makeInput(1, 1000));
+    Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
+    assertEquals(1, result.size());
+    assertTrue(result.containsKey(1L));
+
+    ExecutorService harnessSwitchExecutor = worker.getHarnessSwitchExecutor();
+
+    // Prepare WorkerMetadataResponse
+    server.injectWorkerMetadata(
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(1)
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken("workerToken1")
+                    .setDirectEndpoint(
+                        
WindmillServiceAddress.create(HostAndPort.fromParts("localhost", 12345))
+                            .gcpServiceAddress()
+                            .toString())
+                    .build())
+            .build());
+
+    // Switch to Directpath.
+    StreamingGlobalConfig directPathConfig =
+        StreamingGlobalConfig.builder()
+            .setUserWorkerJobSettings(
+                Windmill.UserWorkerRunnerV1Settings.newBuilder()
+                    
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH)
+                    .build())
+            .build();
+
+    for (Consumer<StreamingGlobalConfig> observer : observers) {
+      observer.accept(directPathConfig);
+    }
+
+    // Wait for the harnessSwitchExecutor to complete the switch.
+    Future<?> directPathSwitchFuture = harnessSwitchExecutor.submit(() -> {});
+    // Wait for the dummy task to complete. The dummy task will be executed 
after
+    // switchStreamingWorkerHarness has completed.
+    directPathSwitchFuture.get(30, TimeUnit.SECONDS);
+    assertTrue(
+        "Worker should switch to FanOutStreamingEngineWorkerHarness",
+        worker.getStreamingWorkerHarness() instanceof 
FanOutStreamingEngineWorkerHarness);
+

Review Comment:
   add more work and assert if they commit with direct path and then later with 
cloudpath?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to