scwhittle commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1776964691
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +232,132 @@ private GetDataStream getGlobalDataStream(String
globalDataKey) {
dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
}
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
- }
-
@VisibleForTesting
@Override
public synchronized void shutdown() {
Preconditions.checkState(started, "StreamingEngineClient never started.");
getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ workerMetadataConsumer.shutdownNow();
channelCachingStubFactory.shutdown();
}
- /**
- * {@link java.util.function.Consumer<WindmillEndpoints>} used to update
{@link #connections} on
- * new backend worker metadata.
- */
+ @SuppressWarnings("methodref.receiver.bound")
+ private Supplier<GetWorkerMetadataStream> createGetWorkerMetadataStream(
+ @UnderInitialization FanOutStreamingEngineWorkerHarness this) {
+ // Checker Framework complains about reference to "this" in the
constructor since the instance
+ // is "UnderInitialization" here, which we pass as a lambda to
GetWorkerMetadataStream for
+ // processing new worker metadata. Supplier.get() is only called in
start(), after we have
+ // constructed the FanOutStreamingEngineWorkerHarness.
+ return () ->
+ checkNotNull(streamFactory)
+ .createGetWorkerMetadataStream(
+
checkNotNull(dispatcherClient).getWindmillMetadataServiceStubBlocking(),
+ checkNotNull(getWorkerMetadataThrottleTimer),
+ this::consumeWorkerMetadata);
+ }
+
+ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+ synchronized (metadataLock) {
+ // Only process versions greater than what we currently have to prevent
double processing of
+ // metadata.
+ if (windmillEndpoints.version() > metadataVersion) {
+ metadataVersion = windmillEndpoints.version();
Review Comment:
maybe it woudl be good to have pendingMetadataVersion (updated here) and
activeMetadataVersion (which you update after consuming). That could help
debugging since it could show if we're stuck on some old version.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -402,14 +455,15 @@ public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions o
options,
new HotKeyLogger(),
clock,
- workerStatusReporter,
+ workerStatusReporterFactory,
Review Comment:
should we just pass in the builder prepopulated instead of a factory?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -155,9 +155,11 @@ void closeAllStreams() {
@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {
- getWorkBudget.set(getWorkBudget.get().apply(itemsDelta, bytesDelta));
+ GetWorkBudget adjustment =
+
GetWorkBudget.builder().setItems(itemsDelta).setBytes(bytesDelta).build();
+ getWorkBudget.getAndSet(adjustment);
Review Comment:
use set instead of getAndSet
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -123,6 +124,8 @@ private static Optional<HostAndPort>
tryParseDirectEndpointIntoIpV6Address(
directEndpointAddress.getHostAddress(), (int)
endpointProto.getPort()));
}
+ public abstract long version();
Review Comment:
// The metadata version increases with every modification.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -249,13 +275,12 @@ private void issueBatchedRequest(Map<Long,
PendingRequest> requests) {
}
private void issueMultiChunkRequest(final long id, PendingRequest
pendingRequest) {
- checkNotNull(pendingRequest.computation);
- final ByteString serializedCommit = pendingRequest.request.toByteString();
-
+ checkNotNull(pendingRequest.computationId());
+ final ByteString serializedCommit = pendingRequest.serializedCommit();
synchronized (this) {
pending.put(id, pendingRequest);
for (int i = 0;
- i < serializedCommit.size();
+ i < serializedCommit.size() && !isShutdown();
Review Comment:
I'd remove the shutdown check since it's not consistent with other paths and
probably won't matter if the send just won't do anything anyway.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +232,132 @@ private GetDataStream getGlobalDataStream(String
globalDataKey) {
dispatcherClient.getWindmillServiceStub(), new
ThrottleTimer()));
}
- @SuppressWarnings("FutureReturnValueIgnored")
- private void startWorkerMetadataConsumer() {
- newWorkerMetadataConsumer.submit(
- () -> {
- while (true) {
- Optional.ofNullable(newWindmillEndpoints.poll())
- .ifPresent(this::consumeWindmillWorkerEndpoints);
- }
- });
- }
-
@VisibleForTesting
@Override
public synchronized void shutdown() {
Preconditions.checkState(started, "StreamingEngineClient never started.");
getWorkerMetadataStream.get().halfClose();
- getWorkBudgetRefresher.stop();
- newWorkerMetadataPublisher.shutdownNow();
- newWorkerMetadataConsumer.shutdownNow();
+ workerMetadataConsumer.shutdownNow();
channelCachingStubFactory.shutdown();
}
- /**
- * {@link java.util.function.Consumer<WindmillEndpoints>} used to update
{@link #connections} on
- * new backend worker metadata.
- */
+ @SuppressWarnings("methodref.receiver.bound")
+ private Supplier<GetWorkerMetadataStream> createGetWorkerMetadataStream(
+ @UnderInitialization FanOutStreamingEngineWorkerHarness this) {
+ // Checker Framework complains about reference to "this" in the
constructor since the instance
+ // is "UnderInitialization" here, which we pass as a lambda to
GetWorkerMetadataStream for
+ // processing new worker metadata. Supplier.get() is only called in
start(), after we have
+ // constructed the FanOutStreamingEngineWorkerHarness.
+ return () ->
+ checkNotNull(streamFactory)
+ .createGetWorkerMetadataStream(
+
checkNotNull(dispatcherClient).getWindmillMetadataServiceStubBlocking(),
+ checkNotNull(getWorkerMetadataThrottleTimer),
+ this::consumeWorkerMetadata);
+ }
+
+ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+ synchronized (metadataLock) {
+ // Only process versions greater than what we currently have to prevent
double processing of
+ // metadata.
Review Comment:
add comemnt: the consumer is single-threaded so we maintain ordering.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -155,9 +155,11 @@ void closeAllStreams() {
@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {
Review Comment:
name setBudget as well?
update variables to not be delta, adjustment if it is just a total budget
request
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -278,24 +334,73 @@ public String backendWorkerToken() {
}
@Override
- public void shutdown() {
+ public final void shutdown() {
+ // Don't lock here as isShutdown checks are used in the stream to free
blocked
+ // threads or as exit conditions to loops.
if (isShutdown.compareAndSet(false, true)) {
requestObserver()
Review Comment:
I don't think requestObserver is thread-safe though? do we need to
synchronize it?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -243,37 +223,129 @@ private StreamingDataflowWorker(
WorkerStatusPages workerStatusPages =
WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
StreamingWorkerStatusPages.Builder statusPagesBuilder =
StreamingWorkerStatusPages.builder();
- int stuckCommitDurationMillis;
- GetDataClient getDataClient;
- HeartbeatSender heartbeatSender;
- if (windmillServiceEnabled) {
- WindmillStreamPool<GetDataStream> getDataStreamPool =
- WindmillStreamPool.create(
- Math.max(1, options.getWindmillGetDataStreamCount()),
- GET_DATA_STREAM_TIMEOUT,
- windmillServer::getDataStream);
- getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
- heartbeatSender =
- new StreamPoolHeartbeatSender(
- options.getUseSeparateWindmillHeartbeatStreams()
- ? WindmillStreamPool.create(
- 1, GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream)
- : getDataStreamPool);
- stuckCommitDurationMillis =
- options.getStuckCommitDurationMillis() > 0 ?
options.getStuckCommitDurationMillis() : 0;
+ ChannelCachingStubFactory stubFactory = createStubFactory(options);
+ int stuckCommitDurationMillis =
+ options.isEnableStreamingEngine() ?
Math.max(options.getStuckCommitDurationMillis(), 0) : 0;
+ this.numCommitThreads =
+ options.isEnableStreamingEngine()
+ ? Math.max(options.getWindmillServiceCommitThreads(), 1)
+ : 1;
+
+ if (isDirectPathPipeline(options)) {
+ FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+ FanOutStreamingEngineWorkerHarness.create(
+ createJobHeader(options, clientId),
+ GetWorkBudget.builder()
+ .setItems(chooseMaxBundlesOutstanding(options))
+ .setBytes(MAX_GET_WORK_FETCH_BYTES)
+ .build(),
+ windmillStreamFactory,
+ (workItem, watermarks, processingContext,
getWorkStreamLatencies) ->
+ computationStateCache
+ .get(processingContext.computationId())
+ .ifPresent(
+ computationState -> {
+ memoryMonitor.waitForResources("GetWork");
+ streamingWorkScheduler.scheduleWork(
+ computationState,
+ workItem,
+ watermarks,
+ processingContext,
+ getWorkStreamLatencies);
+ }),
+ stubFactory,
+ GetWorkBudgetDistributors.distributeEvenly(),
+ Preconditions.checkNotNull(dispatcherClient),
+ commitWorkStream ->
+ StreamingEngineWorkCommitter.builder()
+
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+ .setOnCommitComplete(this::onCompleteCommit)
+
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+ .setCommitWorkStreamFactory(
+ () -> CloseableStream.create(commitWorkStream, () ->
{}))
+ .build(),
+ getDataMetricTracker);
statusPagesBuilder
+ .setGetDataStatusProvider(getDataMetricTracker::printHtml)
+
.setCurrentActiveCommitBytes(fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes)
.setDebugCapture(
new DebugCapture.Manager(options,
workerStatusPages.getDebugCapturePages()))
.setChannelzServlet(
new ChannelzServlet(
- CHANNELZ_PATH, options,
windmillServer::getWindmillServiceEndpoints))
+ CHANNELZ_PATH,
+ options,
+
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints))
.setWindmillStreamFactory(windmillStreamFactory);
+ this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
} else {
- getDataClient = new ApplianceGetDataClient(windmillServer,
getDataMetricTracker);
- heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData);
- stuckCommitDurationMillis = 0;
+ Windmill.GetWorkRequest request = createGetWorkRequest(clientId,
options);
+ GetDataClient getDataClient;
+ HeartbeatSender heartbeatSender;
+ WorkCommitter workCommitter;
+ GetWorkSender getWorkSender;
+ if (options.isEnableStreamingEngine()) {
+ WindmillStreamPool<GetDataStream> getDataStreamPool =
+ WindmillStreamPool.create(
+ Math.max(1, options.getWindmillGetDataStreamCount()),
+ GET_DATA_STREAM_TIMEOUT,
+ windmillServer::getDataStream);
+ getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
+ heartbeatSender =
+ new StreamPoolHeartbeatSender(
+ options.getUseSeparateWindmillHeartbeatStreams()
+ ? WindmillStreamPool.create(
+ 1, GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream)
+ : getDataStreamPool);
+ statusPagesBuilder
Review Comment:
can the status page stuff be moved after this block so it can be shared for
the SE paths? It seems like same stuff is duplicated for direct and not
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -185,55 +214,77 @@ protected final void startStream() {
while (true) {
try {
synchronized (this) {
+ if (isShutdown.get()) {
+ break;
+ }
startTimeMs.set(Instant.now().getMillis());
lastResponseTimeMs.set(0);
streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ requestObserver.reset();
onNewStream();
if (clientClosed.get()) {
halfClose();
}
return;
}
} catch (Exception e) {
- LOG.error("Failed to create new stream, retrying: ", e);
+ logger.error("Failed to create new stream, retrying: ", e);
try {
long sleep = backoff.nextBackOffMillis();
sleepUntil.set(Instant.now().getMillis() + sleep);
- Thread.sleep(sleep);
- } catch (InterruptedException | IOException i) {
+ sleeper.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ logger.info(
+ "Interrupted during stream creation backoff. The stream will not
be created.");
+ break;
+ } catch (IOException ioe) {
// Keep trying to create the stream.
}
}
}
+
+ // We were never able to start the stream, remove it from the stream
registry.
Review Comment:
// Otherwise it is removed when closed.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -278,24 +329,72 @@ public String backendWorkerToken() {
}
@Override
- public void shutdown() {
+ public final void shutdown() {
+ // Don't lock here as isShutdown checks are used in the stream to free
blocked
+ // threads or as exit conditions to loops.
if (isShutdown.compareAndSet(false, true)) {
requestObserver()
.onError(new WindmillStreamShutdownException("Explicit call to
shutdown stream."));
+ shutdownInternal();
}
}
- private void setLastError(String error) {
- lastError.set(error);
- lastErrorTime.set(DateTime.now());
+ private void recordRestartReason(String error) {
+ lastRestartReason.set(error);
+ lastRestartTime.set(DateTime.now());
}
+ protected abstract void shutdownInternal();
+
public static class WindmillStreamShutdownException extends RuntimeException
{
public WindmillStreamShutdownException(String message) {
super(message);
}
}
+ /**
+ * Request observer that allows resetting its internal delegate using the
given {@link
+ * #requestObserverSupplier}.
+ */
+ @ThreadSafe
+ private static class ResettableRequestObserver<RequestT> implements
StreamObserver<RequestT> {
+
+ private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+ @GuardedBy("this")
+ private volatile @Nullable StreamObserver<RequestT>
delegateRequestObserver;
+
+ private ResettableRequestObserver(Supplier<StreamObserver<RequestT>>
requestObserverSupplier) {
+ this.requestObserverSupplier = requestObserverSupplier;
+ this.delegateRequestObserver = null;
+ }
+
+ private synchronized StreamObserver<RequestT> delegate() {
+ return Preconditions.checkNotNull(
+ delegateRequestObserver,
+ "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ }
+
+ private synchronized void reset() {
+ delegateRequestObserver = requestObserverSupplier.get();
+ }
+
+ @Override
+ public void onNext(RequestT requestT) {
+ delegate().onNext(requestT);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ delegate().onError(throwable);
+ }
+
+ @Override
+ public synchronized void onCompleted() {
Review Comment:
still synchronized
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]