arunpandianp commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1801056528
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
private StreamObserver<RequestT> requestObserver() {
if (requestObserver == null) {
throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ "requestObserver cannot be null. Missing a call to start() to
initialize stream.");
Review Comment:
requestObserver is now created in constructor, we don't need the null check
here.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -278,24 +347,73 @@ public String backendWorkerToken() {
}
@Override
- public void shutdown() {
+ public final void shutdown() {
+ // Don't lock here as isShutdown checks are used in the stream to free
blocked
+ // threads or as exit conditions to loops.
if (isShutdown.compareAndSet(false, true)) {
requestObserver()
.onError(new WindmillStreamShutdownException("Explicit call to
shutdown stream."));
+ shutdownInternal();
+ shutdownTime.set(DateTime.now());
}
}
- private void setLastError(String error) {
- lastError.set(error);
- lastErrorTime.set(DateTime.now());
+ private void recordRestartReason(String error) {
+ lastRestartReason.set(error);
+ lastRestartTime.set(DateTime.now());
}
+ protected abstract void shutdownInternal();
+
public static class WindmillStreamShutdownException extends RuntimeException
{
public WindmillStreamShutdownException(String message) {
super(message);
}
}
+ /**
+ * Request observer that allows resetting its internal delegate using the
given {@link
+ * #requestObserverSupplier}.
+ */
+ @ThreadSafe
+ private static class ResettableRequestObserver<RequestT> implements
StreamObserver<RequestT> {
+
+ private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+ @GuardedBy("this")
+ private @Nullable StreamObserver<RequestT> delegateRequestObserver;
+
+ private ResettableRequestObserver(Supplier<StreamObserver<RequestT>>
requestObserverSupplier) {
+ this.requestObserverSupplier = requestObserverSupplier;
+ this.delegateRequestObserver = null;
Review Comment:
can we initialize delegateRequestObserver with
`requestObserverSupplier.get()` here and remove the null checks in delegate()?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
private StreamObserver<RequestT> requestObserver() {
if (requestObserver == null) {
throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ "requestObserver cannot be null. Missing a call to start() to
initialize stream.");
}
return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
+ if (isShutdown()) {
+ return;
+ }
+
if (streamClosed.get()) {
throw new IllegalStateException("Send called on a client closed
stream.");
Review Comment:
this is not new code, we should change `IllegalStateException` to a specific
StreamClosedException?
don't have to change it in this PR.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -74,69 +76,104 @@ public void onNext(T value) {
while (true) {
try {
synchronized (lock) {
+ // If we awaited previously and timed out, wait for the same phase.
Otherwise we're
+ // careful to observe the phase before observing isReady.
+ if (awaitPhase < 0) {
+ awaitPhase = isReadyNotifier.getPhase();
+ // If getPhase() returns a value less than 0, the phaser has been
terminated.
+ if (awaitPhase < 0) {
+ return;
+ }
+ }
+
// We only check isReady periodically to effectively allow for
increasing the outbound
// buffer periodically. This reduces the overhead of blocking while
still restricting
// memory because there is a limited # of streams, and we have a max
messages size of 2MB.
if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
outboundObserver.onNext(value);
return;
}
- // If we awaited previously and timed out, wait for the same phase.
Otherwise we're
- // careful to observe the phase before observing isReady.
- if (awaitPhase < 0) {
- awaitPhase = phaser.getPhase();
- }
+
if (outboundObserver.isReady()) {
messagesSinceReady = 0;
outboundObserver.onNext(value);
return;
}
}
+
// A callback has been registered to advance the phaser whenever the
observer
// transitions to is ready. Since we are waiting for a phase observed
before the
// outboundObserver.isReady() returned false, we expect it to advance
after the
// channel has become ready. This doesn't always seem to be the case
(despite
// documentation stating otherwise) so we poll periodically and
enforce an overall
// timeout related to the stream deadline.
- phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds,
TimeUnit.SECONDS);
+ int nextPhase =
+ isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds,
TimeUnit.SECONDS);
+ // If nextPhase is a value less than 0, the phaser has been terminated.
+ if (nextPhase < 0) {
+ return;
Review Comment:
how will the caller know if the send didn't go through in this case?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -273,10 +288,24 @@ public void
onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp
@Override
public void sendHealthCheck() {
if (hasPendingRequests()) {
- send(StreamingGetDataRequest.newBuilder().build());
+ send(HEALTH_CHECK_REQUEST);
}
}
+ @Override
+ protected void shutdownInternal() {
+ // Stream has been explicitly closed. Drain pending input streams and
request batches.
+ // Future calls to send RPCs will fail.
+ pending.values().forEach(AppendableInputStream::cancel);
Review Comment:
what prevents a different thread from populating pending map/ batches after
we clear them here?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -302,38 +331,57 @@ public void appendSpecificHtml(PrintWriter writer) {
}
private <ResponseT> ResponseT issueRequest(QueuedRequest request,
ParseFn<ResponseT> parseFn) {
- while (true) {
+ while (!isShutdown()) {
request.resetResponseStream();
try {
queueRequestAndWait(request);
return parseFn.parse(request.getResponseStream());
- } catch (CancellationException e) {
- // Retry issuing the request since the response stream was cancelled.
- continue;
+ } catch (AppendableInputStream.InvalidInputStreamStateException
+ | VerifyException
+ | CancellationException e) {
+ handleShutdown(request, e);
+ if (!(e instanceof CancellationException)) {
+ throw e;
+ }
} catch (IOException e) {
LOG.error("Parsing GetData response failed: ", e);
- continue;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ handleShutdown(request, e);
throw new RuntimeException(e);
} finally {
pending.remove(request.id());
}
}
+
+ // If we have exited the loop here, the stream has been shutdown. Cancel
the response stream.
+ request.getResponseStream().cancel();
Review Comment:
do we need this? only the `parseFn.parse` above was reading the
responseStream and nothing should be blocked on the response stream at this
point?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
private StreamObserver<RequestT> requestObserver() {
if (requestObserver == null) {
throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ "requestObserver cannot be null. Missing a call to start() to
initialize stream.");
}
return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
+ if (isShutdown()) {
+ return;
+ }
+
if (streamClosed.get()) {
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ try {
+ lastSendTimeMs.set(Instant.now().getMillis());
+ requestObserver.onNext(request);
+ } catch (StreamObserverCancelledException e) {
+ if (isShutdown()) {
+ logger.debug("Stream was closed or shutdown during send.", e);
+ return;
+ }
+
+ requestObserver.onError(e);
+ }
+ }
+ }
+
+ @Override
+ public final void start() {
+ if (!isShutdown.get() && started.compareAndSet(false, true)) {
+ // start() should only be executed once during the lifetime of the
stream for idempotency and
+ // when shutdown() has not been called.
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
+ if (isShutdown.get()) {
+ break;
+ }
startTimeMs.set(Instant.now().getMillis());
lastResponseTimeMs.set(0);
streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ requestObserver.reset();
onNewStream();
if (clientClosed.get()) {
halfClose();
}
return;
}
} catch (Exception e) {
- LOG.error("Failed to create new stream, retrying: ", e);
+ logger.error("Failed to create new stream, retrying: ", e);
try {
long sleep = backoff.nextBackOffMillis();
sleepUntil.set(Instant.now().getMillis() + sleep);
- Thread.sleep(sleep);
- } catch (InterruptedException | IOException i) {
+ sleeper.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ logger.info(
+ "Interrupted during stream creation backoff. The stream will not
be created.");
+ break;
+ } catch (IOException ioe) {
// Keep trying to create the stream.
}
}
}
+
+ // We were never able to start the stream, remove it from the stream
registry. Otherwise, it is
+ // removed when closed.
+ streamRegistry.remove(this);
}
- protected final Executor executor() {
- return executor;
+ /**
+ * Execute the runnable using the {@link #executor} handling the executor
being in a shutdown
+ * state.
+ */
+ protected final void executeSafely(Runnable runnable) {
+ try {
+ executor.execute(runnable);
+ } catch (RejectedExecutionException e) {
+ logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+ } catch (IllegalStateException e) {
Review Comment:
`executor.execute` won't propagate the `IllegalStateException` from closed
stream to here.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -361,10 +409,10 @@ private void queueRequestAndWait(QueuedRequest request)
throws InterruptedExcept
}
// Notify all waiters with requests in this batch as well as the sender
// of the next batch (if one exists).
- batch.countDown();
+ batch.notifySent();
Review Comment:
I think this is an existing bug. What happens if/when
`sendBatch(batch.requests());` throws? Other threads waiting on the sender
thread won't get notified and they'll eventually timeout with poison pill.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -278,24 +347,73 @@ public String backendWorkerToken() {
}
@Override
- public void shutdown() {
+ public final void shutdown() {
+ // Don't lock here as isShutdown checks are used in the stream to free
blocked
+ // threads or as exit conditions to loops.
if (isShutdown.compareAndSet(false, true)) {
requestObserver()
.onError(new WindmillStreamShutdownException("Explicit call to
shutdown stream."));
+ shutdownInternal();
+ shutdownTime.set(DateTime.now());
Review Comment:
can we move `shutdownTime.set(DateTime.now());` to line 354 before calling
`requestObserver().onError`?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -302,38 +331,57 @@ public void appendSpecificHtml(PrintWriter writer) {
}
private <ResponseT> ResponseT issueRequest(QueuedRequest request,
ParseFn<ResponseT> parseFn) {
- while (true) {
+ while (!isShutdown()) {
request.resetResponseStream();
try {
queueRequestAndWait(request);
return parseFn.parse(request.getResponseStream());
- } catch (CancellationException e) {
- // Retry issuing the request since the response stream was cancelled.
- continue;
+ } catch (AppendableInputStream.InvalidInputStreamStateException
+ | VerifyException
Review Comment:
when is the VerifyException thrown?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -278,24 +347,73 @@ public String backendWorkerToken() {
}
@Override
- public void shutdown() {
+ public final void shutdown() {
Review Comment:
can we synchronize shutdown and start on `this`? Right now, there is nothing
preventing start and shutdown from interleaving and they look racy (hard for me
to say one way or another).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]