parveensania commented on code in PR #35901: URL: https://github.com/apache/beam/pull/35901#discussion_r2377154839
########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java: ########## @@ -4058,6 +4070,121 @@ public void testStuckCommit() throws Exception { removeDynamicFields(result.get(1L))); } + @Test + public void testSwitchStreamingWorkerHarness() throws Exception { + if (!streamingEngine) { + return; + } + + String serverName = "StreamingDataflowWorkerTestChannel"; + Server fakeServer = + grpcCleanup + .register( + InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(new FakeWindmillServer.FakeWindmillMetadataService(server)) + .addService( + new CloudWindmillServiceV1Alpha1Grpc + .CloudWindmillServiceV1Alpha1ImplBase() {}) + .build()) + .start(); + + List<ParallelInstruction> instructions = + Arrays.asList( + makeSourceInstruction(StringUtf8Coder.of()), + makeSinkInstruction(StringUtf8Coder.of(), 0)); + + // Start with CloudPath. + DataflowWorkerHarnessOptions options = + createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false"); + options.setWindmillServiceEndpoint(serverName); + + StreamingDataflowWorker worker = + makeWorker( + defaultWorkerParams() + .setOptions(options) + .setInstructions(instructions) + .publishCounters() + .build()); + + ArgumentCaptor<Consumer<StreamingGlobalConfig>> observerCaptor = + ArgumentCaptor.forClass(Consumer.class); + + worker.start(); + + verify(mockGlobalConfigHandle, atLeastOnce()).registerConfigObserver(observerCaptor.capture()); + + List<Consumer<StreamingGlobalConfig>> observers = observerCaptor.getAllValues(); + + assertTrue( + "Worker should start with SingleSourceWorkerHarness", + worker.getStreamingWorkerHarness() instanceof SingleSourceWorkerHarness); + + // Process some work with CloudPath. + server.whenGetWorkCalled().thenReturn(makeInput(1, 1000)); + Map<Long, Windmill.WorkItemCommitRequest> result = server.waitForAndGetCommits(1); + assertEquals(1, result.size()); + assertTrue(result.containsKey(1L)); + + ExecutorService harnessSwitchExecutor = worker.getHarnessSwitchExecutor(); + + // Prepare WorkerMetadataResponse + server.injectWorkerMetadata( + WorkerMetadataResponse.newBuilder() + .setMetadataVersion(1) + .addWorkEndpoints( + WorkerMetadataResponse.Endpoint.newBuilder() + .setBackendWorkerToken("workerToken1") + .setDirectEndpoint( + WindmillServiceAddress.create(HostAndPort.fromParts("localhost", 12345)) + .gcpServiceAddress() + .toString()) + .build()) + .build()); + + // Switch to Directpath. + StreamingGlobalConfig directPathConfig = + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) + .build()) + .build(); + + for (Consumer<StreamingGlobalConfig> observer : observers) { + observer.accept(directPathConfig); + } + + // Wait for the harnessSwitchExecutor to complete the switch. + Future<?> directPathSwitchFuture = harnessSwitchExecutor.submit(() -> {}); + // Wait for the dummy task to complete. The dummy task will be executed after + // switchStreamingWorkerHarness has completed. + directPathSwitchFuture.get(30, TimeUnit.SECONDS); + assertTrue( + "Worker should switch to FanOutStreamingEngineWorkerHarness", + worker.getStreamingWorkerHarness() instanceof FanOutStreamingEngineWorkerHarness); + Review Comment: I've added check that work is processed later with cloudpath. . In the test the SingleSourceWorkerHarness uses fakes that bypass gRPC for work streams. But FanOutStreamingEngineWorkerHarness internally creates real gRPC stubs based on the endpoints returned from metadata service. So would require more changes to handle real grpc calls. I am leaving it upto the underlying class FanOutStreamingEngineWorkerHarness test suit to test grpc connection is setup correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org