scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1829093210
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
- }
-
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ protected boolean hasReceivedShutdownSignal() {
+ synchronized (shutdownLock) {
+ return isShutdown;
}
-
- return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
- if (streamClosed.get()) {
+ if (hasReceivedShutdownSignal()) {
+ return;
+ }
+
+ if (streamClosed) {
+ // TODO(m-trieu): throw a more specific exception here (i.e
StreamClosedException)
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ try {
+ verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be
held during send.");
Review Comment:
I would remove this since it seems likely expensive.
Instead you could verify with a test:
- setup requestObserver that blocks until notified
- one thread calls send and starts blocking
- main test thread calls shutdown() and verifies the method returns
- main test thread unblocks the requestObserver
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
- }
-
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ protected boolean hasReceivedShutdownSignal() {
+ synchronized (shutdownLock) {
+ return isShutdown;
}
-
- return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
- if (streamClosed.get()) {
+ if (hasReceivedShutdownSignal()) {
+ return;
+ }
+
+ if (streamClosed) {
+ // TODO(m-trieu): throw a more specific exception here (i.e
StreamClosedException)
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ try {
+ verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be
held during send.");
+ debugMetrics.recordSend();
+ requestObserver.onNext(request);
+ } catch (StreamObserverCancelledException e) {
+ if (hasReceivedShutdownSignal()) {
+ logger.debug("Stream was shutdown during send.", e);
+ return;
+ }
+
+ requestObserver.onError(e);
+ }
+ }
+ }
+
+ @Override
+ public final void start() {
+ boolean shouldStartStream = false;
+ synchronized (shutdownLock) {
+ if (!isShutdown && !started) {
+ started = true;
+ shouldStartStream = true;
+ }
+ }
+
+ if (shouldStartStream) {
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
- startTimeMs.set(Instant.now().getMillis());
- lastResponseTimeMs.set(0);
- streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ if (hasReceivedShutdownSignal()) {
Review Comment:
this seems racy since shutdown coudl happen right after, the requestObserver
poisoning already handles it below.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
- }
-
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ protected boolean hasReceivedShutdownSignal() {
+ synchronized (shutdownLock) {
+ return isShutdown;
}
-
- return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
- if (streamClosed.get()) {
+ if (hasReceivedShutdownSignal()) {
+ return;
+ }
+
+ if (streamClosed) {
Review Comment:
can streamClosed be moved into the ResettableRequestObserver? Then you
wouldn't have to keep trying to keep it in sync here when we reset() and it
could just be beneath that synchronization.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
- }
-
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ protected boolean hasReceivedShutdownSignal() {
+ synchronized (shutdownLock) {
+ return isShutdown;
}
-
- return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
- if (streamClosed.get()) {
+ if (hasReceivedShutdownSignal()) {
+ return;
+ }
+
+ if (streamClosed) {
+ // TODO(m-trieu): throw a more specific exception here (i.e
StreamClosedException)
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ try {
+ verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be
held during send.");
+ debugMetrics.recordSend();
+ requestObserver.onNext(request);
+ } catch (StreamObserverCancelledException e) {
+ if (hasReceivedShutdownSignal()) {
+ logger.debug("Stream was shutdown during send.", e);
+ return;
+ }
+
+ requestObserver.onError(e);
+ }
+ }
+ }
+
+ @Override
+ public final void start() {
+ boolean shouldStartStream = false;
+ synchronized (shutdownLock) {
+ if (!isShutdown && !started) {
+ started = true;
+ shouldStartStream = true;
+ }
+ }
+
+ if (shouldStartStream) {
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
- startTimeMs.set(Instant.now().getMillis());
- lastResponseTimeMs.set(0);
- streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ if (hasReceivedShutdownSignal()) {
+ break;
+ }
+ debugMetrics.recordStart();
+ streamClosed = false;
+ requestObserver.reset();
onNewStream();
- if (clientClosed.get()) {
+ if (clientClosed) {
halfClose();
}
return;
}
+ } catch (WindmillStreamShutdownException e) {
+ logger.debug("Stream was shutdown waiting to start.", e);
} catch (Exception e) {
- LOG.error("Failed to create new stream, retrying: ", e);
+ logger.error("Failed to create new stream, retrying: ", e);
try {
long sleep = backoff.nextBackOffMillis();
- sleepUntil.set(Instant.now().getMillis() + sleep);
- Thread.sleep(sleep);
- } catch (InterruptedException | IOException i) {
+ debugMetrics.recordSleep(sleep);
+ sleeper.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ logger.info(
+ "Interrupted during {} creation backoff. The stream will not be
created.",
+ getClass());
+ break;
+ } catch (IOException ioe) {
// Keep trying to create the stream.
}
}
}
+
+ // We were never able to start the stream, remove it from the stream
registry. Otherwise, it is
+ // removed when closed.
+ streamRegistry.remove(this);
}
- protected final Executor executor() {
- return executor;
+ /**
+ * Execute the runnable using the {@link #executor} handling the executor
being in a shutdown
+ * state.
+ */
+ protected final void executeSafely(Runnable runnable) {
+ try {
+ executor.execute(runnable);
+ } catch (RejectedExecutionException e) {
+ logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+ }
}
- public final synchronized void maybeSendHealthCheck(Instant
lastSendThreshold) {
- if (lastSendTimeMs.get() < lastSendThreshold.getMillis() &&
!clientClosed.get()) {
+ public final void maybeSendHealthCheck(Instant lastSendThreshold) {
+ if (!clientClosed && debugMetrics.getLastSendTimeMs() <
lastSendThreshold.getMillis()) {
try {
sendHealthCheck();
} catch (RuntimeException e) {
- LOG.debug("Received exception sending health check.", e);
+ logger.debug("Received exception sending health check.", e);
}
}
}
protected abstract void sendHealthCheck();
- // Care is taken that synchronization on this is unnecessary for all status
page information.
- // Blocking sends are made beneath this stream object's lock which could
block status page
- // rendering.
+ /**
+ * @implNote Care is taken that synchronization on this is unnecessary for
all status page
+ * information. Blocking sends are made beneath this stream object's
lock which could block
+ * status page rendering.
+ */
public final void appendSummaryHtml(PrintWriter writer) {
appendSpecificHtml(writer);
- if (errorCount.get() > 0) {
- writer.format(
- ", %d errors, last error [ %s ] at [%s]",
- errorCount.get(), lastError.get(), lastErrorTime.get());
- }
- if (clientClosed.get()) {
+ StreamDebugMetrics.Snapshot summaryMetrics =
debugMetrics.getSummaryMetrics();
+ summaryMetrics
+ .restartMetrics()
+ .ifPresent(
+ metrics ->
+ writer.format(
+ ", %d restarts, last restart reason [ %s ] at [%s], %d
errors",
+ metrics.restartCount(),
+ metrics.lastRestartReason(),
+ metrics.lastRestartTime(),
+ metrics.errorCount()));
+
+ if (clientClosed) {
writer.write(", client closed");
}
- long nowMs = Instant.now().getMillis();
- long sleepLeft = sleepUntil.get() - nowMs;
- if (sleepLeft > 0) {
- writer.format(", %dms backoff remaining", sleepLeft);
+
+ if (summaryMetrics.sleepLeft() > 0) {
+ writer.format(", %dms backoff remaining", summaryMetrics.sleepLeft());
}
+
writer.format(
- ", current stream is %dms old, last send %dms, last response %dms,
closed: %s",
- debugDuration(nowMs, startTimeMs.get()),
- debugDuration(nowMs, lastSendTimeMs.get()),
- debugDuration(nowMs, lastResponseTimeMs.get()),
- streamClosed.get());
+ ", current stream is %dms old, last send %dms, last response %dms,
closed: %s, "
+ + "isShutdown: %s, shutdown time: %s",
+ summaryMetrics.streamAge(),
+ summaryMetrics.timeSinceLastSend(),
+ summaryMetrics.timeSinceLastResponse(),
+ streamClosed,
+ hasReceivedShutdownSignal(),
+ summaryMetrics.shutdownTime().orElse(null));
}
- // Don't require synchronization on stream, see the appendSummaryHtml
comment.
+ /**
+ * @implNote Don't require synchronization on stream, see the {@link
+ * #appendSummaryHtml(PrintWriter)} comment.
+ */
protected abstract void appendSpecificHtml(PrintWriter writer);
@Override
public final synchronized void halfClose() {
// Synchronization of close and onCompleted necessary for correct retry
logic in onNewStream.
- clientClosed.set(true);
- requestObserver().onCompleted();
- streamClosed.set(true);
+ clientClosed = true;
+ requestObserver.onCompleted();
Review Comment:
catch the windmill shutdown exception?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -121,32 +129,43 @@ public static GrpcGetDataStream create(
int streamingRpcBatchLimit,
boolean sendKeyedGetDataRequests,
Consumer<List<Windmill.ComputationHeartbeatResponse>>
processHeartbeatResponses) {
- GrpcGetDataStream getDataStream =
- new GrpcGetDataStream(
- backendWorkerToken,
- startGetDataRpcFn,
- backoff,
- streamObserverFactory,
- streamRegistry,
- logEveryNStreamFailures,
- getDataThrottleTimer,
- jobHeader,
- idGenerator,
- streamingRpcBatchLimit,
- sendKeyedGetDataRequests,
- processHeartbeatResponses);
- getDataStream.startStream();
- return getDataStream;
+ return new GrpcGetDataStream(
+ backendWorkerToken,
+ startGetDataRpcFn,
+ backoff,
+ streamObserverFactory,
+ streamRegistry,
+ logEveryNStreamFailures,
+ getDataThrottleTimer,
+ jobHeader,
+ idGenerator,
+ streamingRpcBatchLimit,
+ sendKeyedGetDataRequests,
+ processHeartbeatResponses);
+ }
+
+ private static WindmillStreamShutdownException shutdownException(QueuedBatch
batch) {
+ return new WindmillStreamShutdownException(
+ "Stream was closed when attempting to send " + batch.requestsCount() +
" requests.");
+ }
+
+ private static WindmillStreamShutdownException
shutdownException(QueuedRequest request) {
+ return new WindmillStreamShutdownException(
+ "Cannot send request=[" + request + "] on closed stream.");
}
@Override
protected synchronized void onNewStream() {
+ if (hasReceivedShutdownSignal()) {
Review Comment:
similar comments, this check for shutdown seems racy here. Seems better to
just handle the exception due to poisoning as that handles all cases
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -342,62 +391,88 @@ private void queueRequestAndWait(QueuedRequest request)
throws InterruptedExcept
batch.addRequest(request);
}
if (responsibleForSend) {
- if (waitForSendLatch == null) {
+ if (prevBatch == null) {
// If there was not a previous batch wait a little while to improve
// batching.
- Thread.sleep(1);
+ sleeper.sleep(1);
} else {
- waitForSendLatch.await();
+ prevBatch.waitForSendOrFailNotification();
}
// Finalize the batch so that no additional requests will be added.
Leave the batch in the
// queue so that a subsequent batch will wait for its completion.
- synchronized (batches) {
- verify(batch == batches.peekFirst());
+ synchronized (shutdownLock) {
+ if (hasReceivedShutdownSignal()) {
+ throw shutdownException(batch);
+ }
+
+ verify(batch == batches.peekFirst(), "GetDataStream request batch
removed before send().");
batch.markFinalized();
}
- sendBatch(batch.requests());
- synchronized (batches) {
- verify(batch == batches.pollFirst());
+ trySendBatch(batch);
+ } else {
+ // Wait for this batch to be sent before parsing the response.
+ batch.waitForSendOrFailNotification();
+ }
+ }
+
+ void trySendBatch(QueuedBatch batch) {
+ try {
+ sendBatch(batch);
+ synchronized (shutdownLock) {
+ if (hasReceivedShutdownSignal()) {
+ throw shutdownException(batch);
+ }
+
+ verify(
+ batch == batches.pollFirst(),
+ "Sent GetDataStream request batch removed before send() was
complete.");
}
// Notify all waiters with requests in this batch as well as the sender
// of the next batch (if one exists).
- batch.countDown();
- } else {
- // Wait for this batch to be sent before parsing the response.
- batch.await();
+ batch.notifySent();
+ } catch (Exception e) {
+ // Free waiters if the send() failed.
+ batch.notifyFailed();
+ // Propagate the exception to the calling thread.
+ throw e;
}
}
- @SuppressWarnings("NullableProblems")
- private void sendBatch(List<QueuedRequest> requests) {
- StreamingGetDataRequest batchedRequest = flushToBatch(requests);
+ private void sendBatch(QueuedBatch batch) {
+ if (batch.isEmpty()) {
+ return;
+ }
+
+ // Synchronization of pending inserts is necessary with send to ensure
duplicates are not
+ // sent on stream reconnect.
synchronized (this) {
- // Synchronization of pending inserts is necessary with send to ensure
duplicates are not
- // sent on stream reconnect.
- for (QueuedRequest request : requests) {
+ synchronized (shutdownLock) {
+ // shutdown() clears pending, once the stream is shutdown, prevent
values from being added
Review Comment:
since you drop the lock here before putting in pending, shutdown could just
run after you drop it and clear and then you add stuff
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * Request observer that allows resetting its internal delegate using the
given {@link
+ * #streamObserverFactory}.
+ *
+ * @implNote {@link StreamObserver}s generated by {@link
#streamObserverFactory} are expected to be
+ * {@link ThreadSafe}.
+ */
+@ThreadSafe
+@Internal
+final class ResettableStreamObserver<T> implements StreamObserver<T> {
+ private final Supplier<StreamObserver<T>> streamObserverFactory;
+
+ @GuardedBy("this")
+ private @Nullable StreamObserver<T> delegateStreamObserver;
+
+ @GuardedBy("this")
+ private boolean isPoisoned;
+
+ ResettableStreamObserver(Supplier<StreamObserver<T>> streamObserverFactory) {
+ this.streamObserverFactory = streamObserverFactory;
+ this.delegateStreamObserver = null;
+ this.isPoisoned = false;
+ }
+
+ private synchronized StreamObserver<T> delegate() {
+ if (isPoisoned) {
+ throw new WindmillStreamShutdownException("Explicit call to shutdown
stream.");
+ }
+
+ return Preconditions.checkNotNull(
+ delegateStreamObserver,
+ "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ }
+
+ /** Creates a new delegate to use for future {@link StreamObserver} methods.
*/
+ synchronized void reset() {
+ if (isPoisoned) {
+ throw new WindmillStreamShutdownException("Explicit call to shutdown
stream.");
+ }
+
+ delegateStreamObserver = streamObserverFactory.get();
+ }
+
+ /**
+ * Indicates that the request observer should no longer be used. Attempts to
perform operations on
+ * the request observer will throw an {@link
WindmillStreamShutdownException}.
+ */
+ synchronized void poison() {
+ if (!isPoisoned) {
+ isPoisoned = true;
+ if (delegateStreamObserver != null) {
+ delegateStreamObserver.onError(
+ new WindmillStreamShutdownException("Explicit call to shutdown
stream."));
+ }
Review Comment:
set delegateStreamObserver to null? might as well let it be gc'd
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
- }
-
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ protected boolean hasReceivedShutdownSignal() {
+ synchronized (shutdownLock) {
+ return isShutdown;
}
-
- return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
- if (streamClosed.get()) {
+ if (hasReceivedShutdownSignal()) {
+ return;
+ }
+
+ if (streamClosed) {
+ // TODO(m-trieu): throw a more specific exception here (i.e
StreamClosedException)
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ try {
+ verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be
held during send.");
+ debugMetrics.recordSend();
+ requestObserver.onNext(request);
+ } catch (StreamObserverCancelledException e) {
+ if (hasReceivedShutdownSignal()) {
+ logger.debug("Stream was shutdown during send.", e);
+ return;
+ }
+
+ requestObserver.onError(e);
+ }
+ }
+ }
+
+ @Override
+ public final void start() {
+ boolean shouldStartStream = false;
+ synchronized (shutdownLock) {
+ if (!isShutdown && !started) {
+ started = true;
+ shouldStartStream = true;
+ }
+ }
+
+ if (shouldStartStream) {
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
- startTimeMs.set(Instant.now().getMillis());
- lastResponseTimeMs.set(0);
- streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ if (hasReceivedShutdownSignal()) {
+ break;
+ }
+ debugMetrics.recordStart();
+ streamClosed = false;
+ requestObserver.reset();
onNewStream();
- if (clientClosed.get()) {
+ if (clientClosed) {
halfClose();
}
return;
}
+ } catch (WindmillStreamShutdownException e) {
+ logger.debug("Stream was shutdown waiting to start.", e);
Review Comment:
"waiting to start" is a little confusing, maybe just "while creating new
stream" ?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -201,7 +198,7 @@ private void maybeSendRequestExtension(GetWorkBudget
extension) {
@Override
protected synchronized void onNewStream() {
workItemAssemblers.clear();
- if (!isShutdown()) {
+ if (!hasReceivedShutdownSignal()) {
Review Comment:
this is racy since shutdown coudl occur right after this check. can we
instead ensure that we handle stream close exception on the send below? or that
we handle it at the call site to onNewStream?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -121,32 +129,43 @@ public static GrpcGetDataStream create(
int streamingRpcBatchLimit,
boolean sendKeyedGetDataRequests,
Consumer<List<Windmill.ComputationHeartbeatResponse>>
processHeartbeatResponses) {
- GrpcGetDataStream getDataStream =
- new GrpcGetDataStream(
- backendWorkerToken,
- startGetDataRpcFn,
- backoff,
- streamObserverFactory,
- streamRegistry,
- logEveryNStreamFailures,
- getDataThrottleTimer,
- jobHeader,
- idGenerator,
- streamingRpcBatchLimit,
- sendKeyedGetDataRequests,
- processHeartbeatResponses);
- getDataStream.startStream();
- return getDataStream;
+ return new GrpcGetDataStream(
+ backendWorkerToken,
+ startGetDataRpcFn,
+ backoff,
+ streamObserverFactory,
+ streamRegistry,
+ logEveryNStreamFailures,
+ getDataThrottleTimer,
+ jobHeader,
+ idGenerator,
+ streamingRpcBatchLimit,
+ sendKeyedGetDataRequests,
+ processHeartbeatResponses);
+ }
+
+ private static WindmillStreamShutdownException shutdownException(QueuedBatch
batch) {
Review Comment:
nit: how about shutdownExceptionForBatch and shutdownExceptionForRequest, or
overload shutdownExceptionFor
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
- }
-
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ protected boolean hasReceivedShutdownSignal() {
+ synchronized (shutdownLock) {
+ return isShutdown;
}
-
- return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
- if (streamClosed.get()) {
+ if (hasReceivedShutdownSignal()) {
Review Comment:
this seems racy. Since you are dropping the shutdown mutex it could happen
right afterwards. What about removing and just relying on the poisoning causing
exception in onNext?
It doesn't appear that the shutdown cancellation is being caught or handled
yet. Perhaps that should be a non-RuntimeException thrown by
ResettableRequestObserver methods so that we handle it here?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -49,22 +49,29 @@
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataResponse;
import
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedBatch;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@ThreadSafe
final class GrpcGetDataStream
extends AbstractWindmillStream<StreamingGetDataRequest,
StreamingGetDataResponse>
implements GetDataStream {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcGetDataStream.class);
+ private static final StreamingGetDataRequest HEALTH_CHECK_REQUEST =
+ StreamingGetDataRequest.newBuilder().build();
+ /** @implNote {@link QueuedBatch} objects in the queue are is guarded by
{@link #shutdownLock} */
private final Deque<QueuedBatch> batches;
Review Comment:
can you do GuardedBy("AbstractWindmillStream.this.shutdownLock")? See
ClassName.this.fieldName from https://errorprone.info/bugpattern/GuardedBy
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]