parveensania commented on code in PR #34148:
URL: https://github.com/apache/beam/pull/34148#discussion_r2124095393


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -281,11 +293,14 @@ private void consumeWorkerMetadata(WindmillEndpoints 
windmillEndpoints) {
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
     // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
     // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
-    // the endpoints if they are the most current version.
+    // the endpoints if they are the most current version, or if the endpoint 
type is different
+    // from currently active endpoints.
     synchronized (metadataLock) {
-      if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion
+          && newWindmillEndpoints.endpointType() == activeEndpointType) {

Review Comment:
   We can't add the check version==pending here. As in the caller function 
pending version gets set to new endpoints version. So in this function for the 
most latest endpoints, pending version will always be equal to new endpoints 
version(irrespective of the enpoint type)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to