parveensania commented on code in PR #34148: URL: https://github.com/apache/beam/pull/34148#discussion_r2124146001
########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java: ########## @@ -296,6 +326,70 @@ public void testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers() assertTrue(currentBackends.globalDataStreams().isEmpty()); } + @Test + public void testOnNewWorkerMetadata_endpointTypeChanged() throws InterruptedException { + GetWorkBudgetDistributor getWorkBudgetDistributor = mock(GetWorkBudgetDistributor.class); + fanOutStreamingEngineWorkProvider = + newFanOutStreamingEngineWorkerHarness( + GetWorkBudget.builder().setItems(1).setBytes(1).build(), + getWorkBudgetDistributor, + noOpProcessWorkItemFn()); + + String workerToken = "workerToken1"; + String workerToken2 = "workerToken2"; + + WorkerMetadataResponse firstWorkerMetadata = + WorkerMetadataResponse.newBuilder() + .setMetadataVersion(1) + .addWorkEndpoints( + WorkerMetadataResponse.Endpoint.newBuilder() + .setBackendWorkerToken(workerToken) + .build()) + .addWorkEndpoints( + WorkerMetadataResponse.Endpoint.newBuilder() + .setBackendWorkerToken(workerToken2) + .build()) + .setExternalEndpoint(AUTHENTICATING_SERVICE) + .setEndpointType(EndpointType.DIRECTPATH) + .putAllGlobalDataEndpoints(DEFAULT) + .build(); + + WorkerMetadataResponse secondWorkerMetadata = + WorkerMetadataResponse.newBuilder() + .setMetadataVersion(2) Review Comment: There is a strange behavior of StreamObserver of GetWorkerMetadataTestStub. When non-incremental metadata versions are injected (2->1 or 1->1), the second endpoint injection gets dropped and consumeWorkerMetadata is not invoked. But in the case of incremental versions (1->2), consumeWorkerMetadata is invoked both times. I verified the behavior is not because of the version check condition in consumeWorkerMetadata; the method itself does not get run. I am still debugging this and will update the test case once I identify the issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org