arunpandianp commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1794868915
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -220,15 +203,16 @@ static FanOutStreamingEngineWorkerHarness forTesting(
@Override
public synchronized void start() {
Preconditions.checkState(!started, "StreamingEngineClient cannot start
twice.");
- // Starts the stream, this value is memoized.
- getWorkerMetadataStream.get();
- startWorkerMetadataConsumer();
- getWorkBudgetRefresher.start();
+ getWorkerMetadataStream =
+ streamFactory.createGetWorkerMetadataStream(
+ dispatcherClient.getWindmillMetadataServiceStubBlocking(),
+ getWorkerMetadataThrottleTimer,
+ this::consumeWorkerMetadata);
started = true;
}
public ImmutableSet<HostAndPort> currentWindmillEndpoints() {
- return connections.get().windmillConnections().keySet().stream()
+ return connections.get().windmillStreams().keySet().stream()
.map(Endpoint::directEndpoint)
.filter(Optional::isPresent)
.map(Optional::get)
Review Comment:
Will filtering out IPV6 below prevent direct path endpoints from showing up
in channelz?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String
globalDataKey) {
dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
}
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
- }
-
@VisibleForTesting
@Override
public synchronized void shutdown() {
Preconditions.checkState(started, "StreamingEngineClient never started.");
- getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+ workerMetadataConsumer.shutdownNow();
channelCachingStubFactory.shutdown();
}
- /**
- * {@link java.util.function.Consumer<WindmillEndpoints>} used to update
{@link #connections} on
- * new backend worker metadata.
- */
+ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+ synchronized (metadataLock) {
+ // Only process versions greater than what we currently have to prevent
double processing of
+ // metadata. workerMetadataConsumer is single-threaded so we maintain
ordering.
+ if (windmillEndpoints.version() > pendingMetadataVersion) {
+ pendingMetadataVersion = windmillEndpoints.version();
+ workerMetadataConsumer.execute(() ->
consumeWindmillWorkerEndpoints(windmillEndpoints));
+ }
+ }
+ }
+
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints
newWindmillEndpoints) {
- isBudgetRefreshPaused.set(true);
- LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
- ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
- createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+ // Since this is run on a single threaded executor, multiple versions of
the metadata maybe
+ // queued up while a previous version of the windmillEndpoints were being
consumed. Only consume
+ // the endpoints if they are the most current version.
+ synchronized (metadataLock) {
+ if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+ return;
+ }
+ }
+ long previousMetadataVersion = activeMetadataVersion;
+ LOG.debug(
+ "Consuming new endpoints: {}. previous metadata version: {}, current
metadata version: {}",
+ newWindmillEndpoints,
+ previousMetadataVersion,
+ activeMetadataVersion);
+ closeStaleStreams(
+ newWindmillEndpoints.windmillEndpoints(),
connections.get().windmillStreams());
+ ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
- .setWindmillConnections(newWindmillConnections)
- .setWindmillStreams(
-
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+ .setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
-
- LOG.info(
- "Setting new connections: {}. Previous connections: {}.",
- newConnectionsState,
- connections.get());
connections.set(newConnectionsState);
- isBudgetRefreshPaused.set(false);
- getWorkBudgetRefresher.requestBudgetRefresh();
+ getWorkBudgetDistributor.distributeBudget(newStreams.values(),
totalGetWorkBudget);
+ activeMetadataVersion = newWindmillEndpoints.version();
+ }
+
+ /** Close the streams that are no longer valid asynchronously. */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void closeStaleStreams(
+ Collection<Endpoint> newWindmillConnections,
+ ImmutableMap<Endpoint, WindmillStreamSender> currentStreams) {
+ currentStreams.entrySet().stream()
+ .filter(
+ connectionAndStream ->
!newWindmillConnections.contains(connectionAndStream.getKey()))
+ .forEach(
+ entry ->
+ CompletableFuture.runAsync(
+ () -> {
+ LOG.debug("Closing streams to {}", entry);
+ try {
+ entry.getValue().closeAllStreams();
+ entry
+ .getKey()
+ .directEndpoint()
+ .ifPresent(channelCachingStubFactory::remove);
+ LOG.debug("Successfully closed streams to {}", entry);
+ } catch (Exception e) {
+ LOG.error("Error closing streams to {}", entry);
+ }
+ },
+ windmillStreamManager));
+ }
+
+ private synchronized CompletableFuture<ImmutableMap<Endpoint,
WindmillStreamSender>>
+ createAndStartNewStreams(Collection<Endpoint> newWindmillConnections) {
Review Comment:
```suggestion
createAndStartNewStreams(ImmutableSet<Endpoint>
newWindmillConnections) {
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String
globalDataKey) {
dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
}
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
- }
-
@VisibleForTesting
@Override
public synchronized void shutdown() {
Preconditions.checkState(started, "StreamingEngineClient never started.");
- getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+ workerMetadataConsumer.shutdownNow();
channelCachingStubFactory.shutdown();
}
- /**
- * {@link java.util.function.Consumer<WindmillEndpoints>} used to update
{@link #connections} on
- * new backend worker metadata.
- */
+ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+ synchronized (metadataLock) {
+ // Only process versions greater than what we currently have to prevent
double processing of
+ // metadata. workerMetadataConsumer is single-threaded so we maintain
ordering.
+ if (windmillEndpoints.version() > pendingMetadataVersion) {
+ pendingMetadataVersion = windmillEndpoints.version();
+ workerMetadataConsumer.execute(() ->
consumeWindmillWorkerEndpoints(windmillEndpoints));
+ }
+ }
+ }
+
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints
newWindmillEndpoints) {
- isBudgetRefreshPaused.set(true);
- LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
- ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
- createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+ // Since this is run on a single threaded executor, multiple versions of
the metadata maybe
+ // queued up while a previous version of the windmillEndpoints were being
consumed. Only consume
+ // the endpoints if they are the most current version.
+ synchronized (metadataLock) {
+ if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+ return;
+ }
+ }
+ long previousMetadataVersion = activeMetadataVersion;
+ LOG.debug(
+ "Consuming new endpoints: {}. previous metadata version: {}, current
metadata version: {}",
+ newWindmillEndpoints,
+ previousMetadataVersion,
+ activeMetadataVersion);
Review Comment:
previousMetadataVersion and activeMetadataVersion are same here.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,47 @@ public void sendHealthCheck() {
protected void onResponse(StreamingCommitResponse response) {
commitWorkThrottleTimer.stop();
- RuntimeException finalException = null;
+ @Nullable RuntimeException failure = null;
for (int i = 0; i < response.getRequestIdCount(); ++i) {
long requestId = response.getRequestId(i);
if (requestId == HEARTBEAT_REQUEST_ID) {
continue;
}
- PendingRequest done = pending.remove(requestId);
- if (done == null) {
- LOG.error("Got unknown commit request ID: {}", requestId);
+ PendingRequest pendingRequest = pending.remove(requestId);
+ if (pendingRequest == null) {
+ if (!isShutdown()) {
+ // Skip responses when the stream is shutdown since they are now
invalid.
+ LOG.error("Got unknown commit request ID: {}", requestId);
+ }
} else {
try {
- done.onDone.accept(
+ pendingRequest.completeWithStatus(
(i < response.getStatusCount()) ? response.getStatus(i) :
CommitStatus.OK);
} catch (RuntimeException e) {
// Catch possible exceptions to ensure that an exception for one
commit does not prevent
- // other commits from being processed.
+ // other commits from being processed. Aggregate all the failures to
throw after
+ // processing the response if they exist.
LOG.warn("Exception while processing commit response.", e);
- finalException = e;
+ if (failure == null) {
+ failure = e;
+ } else {
+ failure.addSuppressed(e);
Review Comment:
There could be 1000s of requests in the queue, do we want to add all of them
here? Maybe only the finalException or a subset is enough?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -381,29 +380,20 @@ private ImmutableMap<String, Supplier<GetDataStream>>
createNewGlobalDataStreams
private Supplier<GetDataStream> existingOrNewGetDataStreamFor(
Entry<String, Endpoint> keyedEndpoint,
ImmutableMap<String, Supplier<GetDataStream>> currentGlobalDataStreams) {
- return Preconditions.checkNotNull(
+ return checkNotNull(
currentGlobalDataStreams.getOrDefault(
keyedEndpoint.getKey(),
() ->
streamFactory.createGetDataStream(
- newOrExistingStubFor(keyedEndpoint.getValue()), new
ThrottleTimer())));
- }
-
- private CloudWindmillServiceV1Alpha1Stub newOrExistingStubFor(Endpoint
endpoint) {
- return
Optional.ofNullable(connections.get().windmillConnections().get(endpoint))
- .map(WindmillConnection::stub)
- .orElseGet(() -> createWindmillStub(endpoint));
+ createWindmillStub(keyedEndpoint.getValue()), new
ThrottleTimer())));
}
- private WindmillStreamSender createAndStartWindmillStreamSenderFor(
- WindmillConnection connection) {
- // Initially create each stream with no budget. The budget will be
eventually assigned by the
- // GetWorkBudgetDistributor.
+ private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint
connection) {
WindmillStreamSender windmillStreamSender =
Review Comment:
Why do we need a dispatcher fallback in `createWindmillStub`?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String
globalDataKey) {
dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
}
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
- }
-
@VisibleForTesting
@Override
public synchronized void shutdown() {
Preconditions.checkState(started, "StreamingEngineClient never started.");
- getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+ workerMetadataConsumer.shutdownNow();
channelCachingStubFactory.shutdown();
}
- /**
- * {@link java.util.function.Consumer<WindmillEndpoints>} used to update
{@link #connections} on
- * new backend worker metadata.
- */
+ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+ synchronized (metadataLock) {
+ // Only process versions greater than what we currently have to prevent
double processing of
+ // metadata. workerMetadataConsumer is single-threaded so we maintain
ordering.
+ if (windmillEndpoints.version() > pendingMetadataVersion) {
+ pendingMetadataVersion = windmillEndpoints.version();
+ workerMetadataConsumer.execute(() ->
consumeWindmillWorkerEndpoints(windmillEndpoints));
+ }
+ }
+ }
+
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints
newWindmillEndpoints) {
- isBudgetRefreshPaused.set(true);
- LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
- ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
- createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+ // Since this is run on a single threaded executor, multiple versions of
the metadata maybe
+ // queued up while a previous version of the windmillEndpoints were being
consumed. Only consume
+ // the endpoints if they are the most current version.
+ synchronized (metadataLock) {
+ if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+ return;
+ }
+ }
+ long previousMetadataVersion = activeMetadataVersion;
+ LOG.debug(
+ "Consuming new endpoints: {}. previous metadata version: {}, current
metadata version: {}",
+ newWindmillEndpoints,
+ previousMetadataVersion,
+ activeMetadataVersion);
+ closeStaleStreams(
+ newWindmillEndpoints.windmillEndpoints(),
connections.get().windmillStreams());
+ ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
- .setWindmillConnections(newWindmillConnections)
- .setWindmillStreams(
-
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+ .setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
-
- LOG.info(
- "Setting new connections: {}. Previous connections: {}.",
- newConnectionsState,
- connections.get());
connections.set(newConnectionsState);
- isBudgetRefreshPaused.set(false);
- getWorkBudgetRefresher.requestBudgetRefresh();
+ getWorkBudgetDistributor.distributeBudget(newStreams.values(),
totalGetWorkBudget);
+ activeMetadataVersion = newWindmillEndpoints.version();
+ }
+
+ /** Close the streams that are no longer valid asynchronously. */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void closeStaleStreams(
+ Collection<Endpoint> newWindmillConnections,
+ ImmutableMap<Endpoint, WindmillStreamSender> currentStreams) {
+ currentStreams.entrySet().stream()
+ .filter(
+ connectionAndStream ->
!newWindmillConnections.contains(connectionAndStream.getKey()))
+ .forEach(
+ entry ->
+ CompletableFuture.runAsync(
+ () -> {
+ LOG.debug("Closing streams to {}", entry);
+ try {
+ entry.getValue().closeAllStreams();
+ entry
+ .getKey()
+ .directEndpoint()
+ .ifPresent(channelCachingStubFactory::remove);
+ LOG.debug("Successfully closed streams to {}", entry);
+ } catch (Exception e) {
+ LOG.error("Error closing streams to {}", entry);
+ }
+ },
+ windmillStreamManager));
+ }
+
+ private synchronized CompletableFuture<ImmutableMap<Endpoint,
WindmillStreamSender>>
+ createAndStartNewStreams(Collection<Endpoint> newWindmillConnections) {
+ ImmutableMap<Endpoint, WindmillStreamSender> currentStreams =
+ connections.get().windmillStreams();
+ CompletionStage<List<Pair<Endpoint, WindmillStreamSender>>>
connectionAndSenderFuture =
+ MoreFutures.allAsList(
+ newWindmillConnections.stream()
+ .map(
+ connection ->
+ MoreFutures.supplyAsync(
+ () ->
+ Pair.of(
+ connection,
+
Optional.ofNullable(currentStreams.get(connection))
+ .orElseGet(
+ () ->
createAndStartWindmillStreamSender(connection))),
Review Comment:
```suggestion
MoreFutures.allAsList(
newWindmillEndpoints.stream()
.map(
endpoint ->
MoreFutures.supplyAsync(
() ->
Pair.of(
endpoint,
Optional.ofNullable(currentStreams.get(endpoint))
.orElseGet(
() ->
createAndStartWindmillStreamSender(endpoint))),
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -381,29 +380,20 @@ private ImmutableMap<String, Supplier<GetDataStream>>
createNewGlobalDataStreams
private Supplier<GetDataStream> existingOrNewGetDataStreamFor(
Entry<String, Endpoint> keyedEndpoint,
ImmutableMap<String, Supplier<GetDataStream>> currentGlobalDataStreams) {
- return Preconditions.checkNotNull(
+ return checkNotNull(
currentGlobalDataStreams.getOrDefault(
keyedEndpoint.getKey(),
() ->
streamFactory.createGetDataStream(
- newOrExistingStubFor(keyedEndpoint.getValue()), new
ThrottleTimer())));
- }
-
- private CloudWindmillServiceV1Alpha1Stub newOrExistingStubFor(Endpoint
endpoint) {
- return
Optional.ofNullable(connections.get().windmillConnections().get(endpoint))
- .map(WindmillConnection::stub)
- .orElseGet(() -> createWindmillStub(endpoint));
+ createWindmillStub(keyedEndpoint.getValue()), new
ThrottleTimer())));
}
- private WindmillStreamSender createAndStartWindmillStreamSenderFor(
- WindmillConnection connection) {
- // Initially create each stream with no budget. The budget will be
eventually assigned by the
- // GetWorkBudgetDistributor.
+ private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint
connection) {
Review Comment:
```suggestion
private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint
endpoint) {
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +242,132 @@ private GetDataStream getGlobalDataStream(String
globalDataKey) {
dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
}
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
- }
-
@VisibleForTesting
@Override
public synchronized void shutdown() {
Preconditions.checkState(started, "StreamingEngineClient never started.");
getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ workerMetadataConsumer.shutdownNow();
channelCachingStubFactory.shutdown();
}
- /**
- * {@link java.util.function.Consumer<WindmillEndpoints>} used to update
{@link #connections} on
- * new backend worker metadata.
- */
+ @SuppressWarnings("methodref.receiver.bound")
+ private Supplier<GetWorkerMetadataStream> createGetWorkerMetadataStream(
+ @UnderInitialization FanOutStreamingEngineWorkerHarness this) {
+ // Checker Framework complains about reference to "this" in the
constructor since the instance
+ // is "UnderInitialization" here, which we pass as a lambda to
GetWorkerMetadataStream for
+ // processing new worker metadata. Supplier.get() is only called in
start(), after we have
+ // constructed the FanOutStreamingEngineWorkerHarness.
+ return () ->
+ checkNotNull(streamFactory)
+ .createGetWorkerMetadataStream(
+
checkNotNull(dispatcherClient).getWindmillMetadataServiceStubBlocking(),
+ checkNotNull(getWorkerMetadataThrottleTimer),
+ this::consumeWorkerMetadata);
+ }
+
+ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+ synchronized (metadataLock) {
+ // Only process versions greater than what we currently have to prevent
double processing of
+ // metadata. workerMetadataConsumer is single-threaded so we maintain
ordering.
+ if (windmillEndpoints.version() > pendingMetadataVersion) {
+ pendingMetadataVersion = windmillEndpoints.version();
+ workerMetadataConsumer.execute(() ->
consumeWindmillWorkerEndpoints(windmillEndpoints));
+ }
+ }
+ }
+
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints
newWindmillEndpoints) {
- isBudgetRefreshPaused.set(true);
+ // Since this is run on a single threaded executor, multiple versions of
the metadata maybe
+ // queued up while a previous version of the windmillEndpoints were being
consumed. Only consume
+ // the endpoints if they are the most current version.
+ synchronized (metadataLock) {
+ if (newWindmillEndpoints.version() < pendingMetadataVersion) {
Review Comment:
Is the idea to track the latest received version? If so, the check inside
`consumeWindmillWorkerEndpoints` can still be based on `activeMetadataVersion`.
The check before queuing based on `pendingMetadataVersion` should be enough.
Also we need to log/record `pendingMetadataVersion` somewhere, currently it
is not recorded.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String
globalDataKey) {
dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
}
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
- }
-
@VisibleForTesting
@Override
public synchronized void shutdown() {
Preconditions.checkState(started, "StreamingEngineClient never started.");
- getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+ workerMetadataConsumer.shutdownNow();
channelCachingStubFactory.shutdown();
}
- /**
- * {@link java.util.function.Consumer<WindmillEndpoints>} used to update
{@link #connections} on
- * new backend worker metadata.
- */
+ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+ synchronized (metadataLock) {
+ // Only process versions greater than what we currently have to prevent
double processing of
+ // metadata. workerMetadataConsumer is single-threaded so we maintain
ordering.
+ if (windmillEndpoints.version() > pendingMetadataVersion) {
+ pendingMetadataVersion = windmillEndpoints.version();
+ workerMetadataConsumer.execute(() ->
consumeWindmillWorkerEndpoints(windmillEndpoints));
+ }
+ }
+ }
+
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints
newWindmillEndpoints) {
- isBudgetRefreshPaused.set(true);
- LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
- ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
- createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+ // Since this is run on a single threaded executor, multiple versions of
the metadata maybe
+ // queued up while a previous version of the windmillEndpoints were being
consumed. Only consume
+ // the endpoints if they are the most current version.
+ synchronized (metadataLock) {
+ if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+ return;
+ }
+ }
+ long previousMetadataVersion = activeMetadataVersion;
+ LOG.debug(
+ "Consuming new endpoints: {}. previous metadata version: {}, current
metadata version: {}",
+ newWindmillEndpoints,
+ previousMetadataVersion,
+ activeMetadataVersion);
+ closeStaleStreams(
+ newWindmillEndpoints.windmillEndpoints(),
connections.get().windmillStreams());
+ ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
- .setWindmillConnections(newWindmillConnections)
- .setWindmillStreams(
-
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+ .setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
Review Comment:
should we create globalDataStreams in parallel? i'm not sure if the code
closes the stale globalDataStreams
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String
globalDataKey) {
dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
}
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
- }
-
@VisibleForTesting
@Override
public synchronized void shutdown() {
Preconditions.checkState(started, "StreamingEngineClient never started.");
- getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+ workerMetadataConsumer.shutdownNow();
channelCachingStubFactory.shutdown();
}
- /**
- * {@link java.util.function.Consumer<WindmillEndpoints>} used to update
{@link #connections} on
- * new backend worker metadata.
- */
+ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+ synchronized (metadataLock) {
+ // Only process versions greater than what we currently have to prevent
double processing of
+ // metadata. workerMetadataConsumer is single-threaded so we maintain
ordering.
+ if (windmillEndpoints.version() > pendingMetadataVersion) {
+ pendingMetadataVersion = windmillEndpoints.version();
+ workerMetadataConsumer.execute(() ->
consumeWindmillWorkerEndpoints(windmillEndpoints));
+ }
+ }
+ }
+
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints
newWindmillEndpoints) {
- isBudgetRefreshPaused.set(true);
- LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
- ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
- createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+ // Since this is run on a single threaded executor, multiple versions of
the metadata maybe
+ // queued up while a previous version of the windmillEndpoints were being
consumed. Only consume
+ // the endpoints if they are the most current version.
+ synchronized (metadataLock) {
+ if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+ return;
+ }
+ }
+ long previousMetadataVersion = activeMetadataVersion;
+ LOG.debug(
+ "Consuming new endpoints: {}. previous metadata version: {}, current
metadata version: {}",
+ newWindmillEndpoints,
+ previousMetadataVersion,
+ activeMetadataVersion);
+ closeStaleStreams(
+ newWindmillEndpoints.windmillEndpoints(),
connections.get().windmillStreams());
+ ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
Review Comment:
We can defer the `join()` to be after creating the globalDataStreams.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,47 @@ public void sendHealthCheck() {
protected void onResponse(StreamingCommitResponse response) {
commitWorkThrottleTimer.stop();
- RuntimeException finalException = null;
+ @Nullable RuntimeException failure = null;
Review Comment:
Create a new Exception here and attach all failures as suppressed?
Want to avoid marking failure from one request as suppressed of another.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -187,13 +211,14 @@ protected void startThrottleTimer() {
commitWorkThrottleTimer.start();
}
- private void flushInternal(Map<Long, PendingRequest> requests) {
+ private void flushInternal(Map<Long, PendingRequest> requests) throws
InterruptedException {
Review Comment:
do we need `throws InterruptedException` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]