parveensania commented on code in PR #34148:
URL: https://github.com/apache/beam/pull/34148#discussion_r2124146001


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java:
##########
@@ -296,6 +326,70 @@ public void 
testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers()
     assertTrue(currentBackends.globalDataStreams().isEmpty());
   }
 
+  @Test
+  public void testOnNewWorkerMetadata_endpointTypeChanged() throws 
InterruptedException {
+    GetWorkBudgetDistributor getWorkBudgetDistributor = 
mock(GetWorkBudgetDistributor.class);
+    fanOutStreamingEngineWorkProvider =
+        newFanOutStreamingEngineWorkerHarness(
+            GetWorkBudget.builder().setItems(1).setBytes(1).build(),
+            getWorkBudgetDistributor,
+            noOpProcessWorkItemFn());
+
+    String workerToken = "workerToken1";
+    String workerToken2 = "workerToken2";
+
+    WorkerMetadataResponse firstWorkerMetadata =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(1)
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken(workerToken)
+                    .build())
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken(workerToken2)
+                    .build())
+            .setExternalEndpoint(AUTHENTICATING_SERVICE)
+            .setEndpointType(EndpointType.DIRECTPATH)
+            .putAllGlobalDataEndpoints(DEFAULT)
+            .build();
+
+    WorkerMetadataResponse secondWorkerMetadata =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(2)

Review Comment:
   There is a strange behavior of StreamObserver of GetWorkerMetadataTestStub. 
When non-incremental metadata versions are injected (2->1 or 1->1), the second 
endpoint injection gets dropped and consumeWorkerMetadata is not invoked.
   
   But in the case of incremental versions (1->2), consumeWorkerMetadata is 
invoked both times.
   
   I verified the behavior is not because of the version check condition in 
consumeWorkerMetadata; the method itself does not get run. I am still debugging 
this and will update the test case once I identify the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to