parveensania commented on code in PR #35901:
URL: https://github.com/apache/beam/pull/35901#discussion_r2377154839


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -4058,6 +4070,121 @@ public void testStuckCommit() throws Exception {
         removeDynamicFields(result.get(1L)));
   }
 
+  @Test
+  public void testSwitchStreamingWorkerHarness() throws Exception {
+    if (!streamingEngine) {
+      return;
+    }
+
+    String serverName = "StreamingDataflowWorkerTestChannel";
+    Server fakeServer =
+        grpcCleanup
+            .register(
+                InProcessServerBuilder.forName(serverName)
+                    .directExecutor()
+                    .addService(new 
FakeWindmillServer.FakeWindmillMetadataService(server))
+                    .addService(
+                        new CloudWindmillServiceV1Alpha1Grpc
+                            .CloudWindmillServiceV1Alpha1ImplBase() {})
+                    .build())
+            .start();
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    // Start with CloudPath.
+    DataflowWorkerHarnessOptions options =
+        
createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false");
+    options.setWindmillServiceEndpoint(serverName);
+
+    StreamingDataflowWorker worker =
+        makeWorker(
+            defaultWorkerParams()
+                .setOptions(options)
+                .setInstructions(instructions)
+                .publishCounters()
+                .build());
+
+    ArgumentCaptor<Consumer<StreamingGlobalConfig>> observerCaptor =
+        ArgumentCaptor.forClass(Consumer.class);
+
+    worker.start();
+
+    verify(mockGlobalConfigHandle, 
atLeastOnce()).registerConfigObserver(observerCaptor.capture());
+
+    List<Consumer<StreamingGlobalConfig>> observers = 
observerCaptor.getAllValues();
+
+    assertTrue(
+        "Worker should start with SingleSourceWorkerHarness",
+        worker.getStreamingWorkerHarness() instanceof 
SingleSourceWorkerHarness);
+
+    // Process some work with CloudPath.
+    server.whenGetWorkCalled().thenReturn(makeInput(1, 1000));
+    Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
+    assertEquals(1, result.size());
+    assertTrue(result.containsKey(1L));
+
+    ExecutorService harnessSwitchExecutor = worker.getHarnessSwitchExecutor();
+
+    // Prepare WorkerMetadataResponse
+    server.injectWorkerMetadata(
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(1)
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken("workerToken1")
+                    .setDirectEndpoint(
+                        
WindmillServiceAddress.create(HostAndPort.fromParts("localhost", 12345))
+                            .gcpServiceAddress()
+                            .toString())
+                    .build())
+            .build());
+
+    // Switch to Directpath.
+    StreamingGlobalConfig directPathConfig =
+        StreamingGlobalConfig.builder()
+            .setUserWorkerJobSettings(
+                Windmill.UserWorkerRunnerV1Settings.newBuilder()
+                    
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH)
+                    .build())
+            .build();
+
+    for (Consumer<StreamingGlobalConfig> observer : observers) {
+      observer.accept(directPathConfig);
+    }
+
+    // Wait for the harnessSwitchExecutor to complete the switch.
+    Future<?> directPathSwitchFuture = harnessSwitchExecutor.submit(() -> {});
+    // Wait for the dummy task to complete. The dummy task will be executed 
after
+    // switchStreamingWorkerHarness has completed.
+    directPathSwitchFuture.get(30, TimeUnit.SECONDS);
+    assertTrue(
+        "Worker should switch to FanOutStreamingEngineWorkerHarness",
+        worker.getStreamingWorkerHarness() instanceof 
FanOutStreamingEngineWorkerHarness);
+

Review Comment:
   I've added check that work is processed later with cloudpath. . In the test 
the SingleSourceWorkerHarness uses fakes that bypass gRPC for work streams. But 
FanOutStreamingEngineWorkerHarness internally creates real gRPC stubs based on 
the endpoints returned from metadata service.  So would require more changes to 
handle real grpc calls.  I am leaving it upto the underlying class 
FanOutStreamingEngineWorkerHarness test suit to test grpc connection is setup 
correctly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to