arunpandianp commented on code in PR #35901: URL: https://github.com/apache/beam/pull/35901#discussion_r2373093036
########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java: ########## @@ -4058,6 +4070,121 @@ public void testStuckCommit() throws Exception { removeDynamicFields(result.get(1L))); } + @Test + public void testSwitchStreamingWorkerHarness() throws Exception { + if (!streamingEngine) { + return; + } + + String serverName = "StreamingDataflowWorkerTestChannel"; + Server fakeServer = + grpcCleanup + .register( + InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(new FakeWindmillServer.FakeWindmillMetadataService(server)) + .addService( + new CloudWindmillServiceV1Alpha1Grpc + .CloudWindmillServiceV1Alpha1ImplBase() {}) + .build()) + .start(); + + List<ParallelInstruction> instructions = + Arrays.asList( + makeSourceInstruction(StringUtf8Coder.of()), + makeSinkInstruction(StringUtf8Coder.of(), 0)); + + // Start with CloudPath. + DataflowWorkerHarnessOptions options = + createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false"); + options.setWindmillServiceEndpoint(serverName); + + StreamingDataflowWorker worker = + makeWorker( + defaultWorkerParams() + .setOptions(options) + .setInstructions(instructions) + .publishCounters() + .build()); + + ArgumentCaptor<Consumer<StreamingGlobalConfig>> observerCaptor = + ArgumentCaptor.forClass(Consumer.class); + + worker.start(); + + verify(mockGlobalConfigHandle, atLeastOnce()).registerConfigObserver(observerCaptor.capture()); + + List<Consumer<StreamingGlobalConfig>> observers = observerCaptor.getAllValues(); + + assertTrue( + "Worker should start with SingleSourceWorkerHarness", + worker.getStreamingWorkerHarness() instanceof SingleSourceWorkerHarness); + + // Process some work with CloudPath. + server.whenGetWorkCalled().thenReturn(makeInput(1, 1000)); + Map<Long, Windmill.WorkItemCommitRequest> result = server.waitForAndGetCommits(1); + assertEquals(1, result.size()); + assertTrue(result.containsKey(1L)); + + ExecutorService harnessSwitchExecutor = worker.getHarnessSwitchExecutor(); + + // Prepare WorkerMetadataResponse + server.injectWorkerMetadata( + WorkerMetadataResponse.newBuilder() + .setMetadataVersion(1) + .addWorkEndpoints( + WorkerMetadataResponse.Endpoint.newBuilder() + .setBackendWorkerToken("workerToken1") + .setDirectEndpoint( + WindmillServiceAddress.create(HostAndPort.fromParts("localhost", 12345)) + .gcpServiceAddress() + .toString()) + .build()) + .build()); + + // Switch to Directpath. + StreamingGlobalConfig directPathConfig = + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) + .build()) + .build(); + + for (Consumer<StreamingGlobalConfig> observer : observers) { + observer.accept(directPathConfig); + } + + // Wait for the harnessSwitchExecutor to complete the switch. + Future<?> directPathSwitchFuture = harnessSwitchExecutor.submit(() -> {}); + // Wait for the dummy task to complete. The dummy task will be executed after + // switchStreamingWorkerHarness has completed. + directPathSwitchFuture.get(30, TimeUnit.SECONDS); + assertTrue( + "Worker should switch to FanOutStreamingEngineWorkerHarness", + worker.getStreamingWorkerHarness() instanceof FanOutStreamingEngineWorkerHarness); + Review Comment: add more work and assert if they commit with direct path and then later with cloudpath? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org