arunpandianp commented on code in PR #38249:
URL: https://github.com/apache/beam/pull/38249#discussion_r3114262604


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -288,11 +296,17 @@ private synchronized void 
consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
       }
     }
 
-    LOG.debug(
-        "Consuming new endpoints: {}. previous metadata version: {}, current 
metadata version: {}",
+    WindmillEndpoints.Type previousType;
+    synchronized (metadataLock) {

Review Comment:
   merge it with the synchronized block in line 293?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java:
##########
@@ -348,6 +348,48 @@ public void testOnNewWorkerMetadata_redistributesBudget() 
throws InterruptedExce
     TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
   }
 
+  @Test
+  public void 
testOnNewWorkerMetadata_consumesEndpointsOnConnectivityTypeChange()

Review Comment:
   can you add a test case like 
testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers for cloudpath -> 
directpath -> cloudpath and directpath -> cloudpath -> directpath? 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -271,7 +275,11 @@ private void consumeWorkerMetadata(WindmillEndpoints 
windmillEndpoints) {
     synchronized (metadataLock) {
       // Only process versions greater than what we currently have to prevent 
double processing of
       // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
-      if (windmillEndpoints.version() > pendingMetadataVersion) {
+      // The endpoints are also consumed if the version is the same but the 
type of endpoints
+      // sent by the server has changed.
+      if (windmillEndpoints.version() > pendingMetadataVersion
+          || (windmillEndpoints.version() == pendingMetadataVersion
+              && windmillEndpoints.type() != activeMetadataType)) {

Review Comment:
   i think we need to check against a `pendingMetadataType` like 
`pendingMetadataVersion`
   
   Consider a case the following case,
   1. active is cloudpath
   1. we receive a directpath config and enter here since the types don't match
   1. we receive a cloudpath config, before the activeMetadataType is modified 
to directpath, now the check against activeMetadataType will prevent the 
cloudpath config from taking effect.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java:
##########
@@ -348,6 +348,48 @@ public void testOnNewWorkerMetadata_redistributesBudget() 
throws InterruptedExce
     TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
   }
 
+  @Test
+  public void 
testOnNewWorkerMetadata_consumesEndpointsOnConnectivityTypeChange()
+      throws InterruptedException {
+    String workerToken = "workerToken1";
+
+    WorkerMetadataResponse firstWorkerMetadata =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(1)
+            
.setEndpointType(Windmill.WorkerMetadataResponse.EndpointType.CLOUDPATH)
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken(workerToken)
+                    .build())
+            .putAllGlobalDataEndpoints(DEFAULT)
+            .build();
+    WorkerMetadataResponse secondWorkerMetadata =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(1)
+            
.setEndpointType(Windmill.WorkerMetadataResponse.EndpointType.DIRECTPATH)
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken(workerToken)
+                    .build())
+            .putAllGlobalDataEndpoints(DEFAULT)
+            .build();
+
+    TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new 
TestGetWorkBudgetDistributor());
+    fanOutStreamingEngineWorkProvider =
+        newFanOutStreamingEngineWorkerHarness(
+            GetWorkBudget.builder().setItems(1).setBytes(1).build(),
+            getWorkBudgetDistributor,
+            noOpProcessWorkItemFn());
+
+    fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata);

Review Comment:
   check if `fanOutStreamingEngineWorkProvider.currentBackends()` has expected 
values?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -39,13 +39,34 @@
  */
 @AutoValue
 public abstract class WindmillEndpoints {
+  public enum Type {
+    UNKNOWN,
+    CLOUDPATH,
+    BORG,

Review Comment:
   remove?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -106,6 +106,9 @@ public final class FanOutStreamingEngineWorkerHarness 
implements StreamingWorker
   @GuardedBy("this")
   private long activeMetadataVersion;
 
+  @GuardedBy("metadataLock")

Review Comment:
   can change this to be guarded by `this` after adding a pendingMetaDataType 
guarded by `metadataLock`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to