arunpandianp commented on code in PR #38249:
URL: https://github.com/apache/beam/pull/38249#discussion_r3114262604
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -288,11 +296,17 @@ private synchronized void
consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
}
}
- LOG.debug(
- "Consuming new endpoints: {}. previous metadata version: {}, current
metadata version: {}",
+ WindmillEndpoints.Type previousType;
+ synchronized (metadataLock) {
Review Comment:
merge it with the synchronized block in line 293?
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java:
##########
@@ -348,6 +348,48 @@ public void testOnNewWorkerMetadata_redistributesBudget()
throws InterruptedExce
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
}
+ @Test
+ public void
testOnNewWorkerMetadata_consumesEndpointsOnConnectivityTypeChange()
Review Comment:
can you add a test case like
testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers for cloudpath ->
directpath -> cloudpath and directpath -> cloudpath -> directpath?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -271,7 +275,11 @@ private void consumeWorkerMetadata(WindmillEndpoints
windmillEndpoints) {
synchronized (metadataLock) {
// Only process versions greater than what we currently have to prevent
double processing of
// metadata. workerMetadataConsumer is single-threaded so we maintain
ordering.
- if (windmillEndpoints.version() > pendingMetadataVersion) {
+ // The endpoints are also consumed if the version is the same but the
type of endpoints
+ // sent by the server has changed.
+ if (windmillEndpoints.version() > pendingMetadataVersion
+ || (windmillEndpoints.version() == pendingMetadataVersion
+ && windmillEndpoints.type() != activeMetadataType)) {
Review Comment:
i think we need to check against a `pendingMetadataType` like
`pendingMetadataVersion`
Consider a case the following case,
1. active is cloudpath
1. we receive a directpath config and enter here since the types don't match
1. we receive a cloudpath config, before the activeMetadataType is modified
to directpath, now the check against activeMetadataType will prevent the
cloudpath config from taking effect.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java:
##########
@@ -348,6 +348,48 @@ public void testOnNewWorkerMetadata_redistributesBudget()
throws InterruptedExce
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
}
+ @Test
+ public void
testOnNewWorkerMetadata_consumesEndpointsOnConnectivityTypeChange()
+ throws InterruptedException {
+ String workerToken = "workerToken1";
+
+ WorkerMetadataResponse firstWorkerMetadata =
+ WorkerMetadataResponse.newBuilder()
+ .setMetadataVersion(1)
+
.setEndpointType(Windmill.WorkerMetadataResponse.EndpointType.CLOUDPATH)
+ .addWorkEndpoints(
+ WorkerMetadataResponse.Endpoint.newBuilder()
+ .setBackendWorkerToken(workerToken)
+ .build())
+ .putAllGlobalDataEndpoints(DEFAULT)
+ .build();
+ WorkerMetadataResponse secondWorkerMetadata =
+ WorkerMetadataResponse.newBuilder()
+ .setMetadataVersion(1)
+
.setEndpointType(Windmill.WorkerMetadataResponse.EndpointType.DIRECTPATH)
+ .addWorkEndpoints(
+ WorkerMetadataResponse.Endpoint.newBuilder()
+ .setBackendWorkerToken(workerToken)
+ .build())
+ .putAllGlobalDataEndpoints(DEFAULT)
+ .build();
+
+ TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new
TestGetWorkBudgetDistributor());
+ fanOutStreamingEngineWorkProvider =
+ newFanOutStreamingEngineWorkerHarness(
+ GetWorkBudget.builder().setItems(1).setBytes(1).build(),
+ getWorkBudgetDistributor,
+ noOpProcessWorkItemFn());
+
+ fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata);
Review Comment:
check if `fanOutStreamingEngineWorkProvider.currentBackends()` has expected
values?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -39,13 +39,34 @@
*/
@AutoValue
public abstract class WindmillEndpoints {
+ public enum Type {
+ UNKNOWN,
+ CLOUDPATH,
+ BORG,
Review Comment:
remove?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -106,6 +106,9 @@ public final class FanOutStreamingEngineWorkerHarness
implements StreamingWorker
@GuardedBy("this")
private long activeMetadataVersion;
+ @GuardedBy("metadataLock")
Review Comment:
can change this to be guarded by `this` after adding a pendingMetaDataType
guarded by `metadataLock`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]