arunpandianp commented on code in PR #35901:
URL: https://github.com/apache/beam/pull/35901#discussion_r2377283324


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -4058,6 +4070,121 @@ public void testStuckCommit() throws Exception {
         removeDynamicFields(result.get(1L)));
   }
 
+  @Test
+  public void testSwitchStreamingWorkerHarness() throws Exception {
+    if (!streamingEngine) {
+      return;
+    }
+
+    String serverName = "StreamingDataflowWorkerTestChannel";
+    Server fakeServer =
+        grpcCleanup
+            .register(
+                InProcessServerBuilder.forName(serverName)
+                    .directExecutor()
+                    .addService(new 
FakeWindmillServer.FakeWindmillMetadataService(server))
+                    .addService(
+                        new CloudWindmillServiceV1Alpha1Grpc
+                            .CloudWindmillServiceV1Alpha1ImplBase() {})
+                    .build())
+            .start();
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    // Start with CloudPath.
+    DataflowWorkerHarnessOptions options =
+        
createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false");
+    options.setWindmillServiceEndpoint(serverName);
+
+    StreamingDataflowWorker worker =
+        makeWorker(
+            defaultWorkerParams()
+                .setOptions(options)
+                .setInstructions(instructions)
+                .publishCounters()
+                .build());
+
+    ArgumentCaptor<Consumer<StreamingGlobalConfig>> observerCaptor =
+        ArgumentCaptor.forClass(Consumer.class);
+
+    worker.start();
+
+    verify(mockGlobalConfigHandle, 
atLeastOnce()).registerConfigObserver(observerCaptor.capture());
+
+    List<Consumer<StreamingGlobalConfig>> observers = 
observerCaptor.getAllValues();
+
+    assertTrue(
+        "Worker should start with SingleSourceWorkerHarness",
+        worker.getStreamingWorkerHarness() instanceof 
SingleSourceWorkerHarness);
+
+    // Process some work with CloudPath.
+    server.whenGetWorkCalled().thenReturn(makeInput(1, 1000));
+    Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
+    assertEquals(1, result.size());
+    assertTrue(result.containsKey(1L));
+
+    ExecutorService harnessSwitchExecutor = worker.getHarnessSwitchExecutor();
+
+    // Prepare WorkerMetadataResponse
+    server.injectWorkerMetadata(
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(1)
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken("workerToken1")
+                    .setDirectEndpoint(
+                        
WindmillServiceAddress.create(HostAndPort.fromParts("localhost", 12345))
+                            .gcpServiceAddress()
+                            .toString())
+                    .build())
+            .build());
+
+    // Switch to Directpath.
+    StreamingGlobalConfig directPathConfig =
+        StreamingGlobalConfig.builder()
+            .setUserWorkerJobSettings(
+                Windmill.UserWorkerRunnerV1Settings.newBuilder()
+                    
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH)
+                    .build())
+            .build();
+
+    for (Consumer<StreamingGlobalConfig> observer : observers) {
+      observer.accept(directPathConfig);
+    }
+
+    // Wait for the harnessSwitchExecutor to complete the switch.
+    Future<?> directPathSwitchFuture = harnessSwitchExecutor.submit(() -> {});
+    // Wait for the dummy task to complete. The dummy task will be executed 
after
+    // switchStreamingWorkerHarness has completed.
+    directPathSwitchFuture.get(30, TimeUnit.SECONDS);
+    assertTrue(
+        "Worker should switch to FanOutStreamingEngineWorkerHarness",
+        worker.getStreamingWorkerHarness() instanceof 
FanOutStreamingEngineWorkerHarness);
+

Review Comment:
   What if we inject the same fakeServer's hostname in metadata response? Then 
we won't need more grpc mocks? Will it make it simpler?
   
   change the injectWorkerMetadata to 
   
   ```
       server.injectWorkerMetadata(
           WorkerMetadataResponse.newBuilder()
               .setMetadataVersion(1)
               .addWorkEndpoints(
                   WorkerMetadataResponse.Endpoint.newBuilder()
                       .setBackendWorkerToken("workerToken1")
                       .setDirectEndpoint(
   
WindmillServiceAddress.create(HostAndPort.fromHost("StreamingDataflowWorkerTestChannel")))
                               )
                       .build())
               .build());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to