parveensania commented on code in PR #34148:
URL: https://github.com/apache/beam/pull/34148#discussion_r2124095393
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -281,11 +293,14 @@ private void consumeWorkerMetadata(WindmillEndpoints
windmillEndpoints) {
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints
newWindmillEndpoints) {
// Since this is run on a single threaded executor, multiple versions of
the metadata maybe
// queued up while a previous version of the windmillEndpoints were being
consumed. Only consume
- // the endpoints if they are the most current version.
+ // the endpoints if they are the most current version, or if the endpoint
type is different
+ // from currently active endpoints.
synchronized (metadataLock) {
- if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+ if (newWindmillEndpoints.version() < pendingMetadataVersion
+ && newWindmillEndpoints.endpointType() == activeEndpointType) {
Review Comment:
We can't add the check version==pending here. As in the caller function
pending version gets set to new endpoints version. So in this function for the
most latest endpoints, pending version will always be equal to new endpoints
version(irrespective of the enpoint type)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]