scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830671651
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
+ protected synchronized boolean hasReceivedShutdownSignal() {
+ return isShutdown;
}
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
- }
-
- return requestObserver;
+ /** Send a request to the server. */
+ protected final synchronized void send(RequestT request)
+ throws StreamClosedException, WindmillStreamShutdownException {
+ debugMetrics.recordSend();
+ requestObserver.onNext(request);
}
- /** Send a request to the server. */
- protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
+ @Override
+ public final void start() {
+ boolean shouldStartStream = false;
synchronized (this) {
- if (streamClosed.get()) {
- throw new IllegalStateException("Send called on a client closed
stream.");
+ if (!isShutdown && !started) {
+ started = true;
+ shouldStartStream = true;
}
+ }
- requestObserver().onNext(request);
+ if (shouldStartStream) {
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
- startTimeMs.set(Instant.now().getMillis());
- lastResponseTimeMs.set(0);
- streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ debugMetrics.recordStart();
+ requestObserver.reset();
onNewStream();
- if (clientClosed.get()) {
+ if (clientClosed) {
halfClose();
}
return;
}
+ } catch (WindmillStreamShutdownException e) {
+ logger.debug("Stream was shutdown while creating new stream.", e);
Review Comment:
// shutdown() is responsible for cleaning up pending requests.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -49,46 +44,52 @@
* stream if it is broken. Subclasses are responsible for retrying requests
that have been lost on a
* broken stream.
*
- * <p>Subclasses should override onResponse to handle responses from the
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as
sending headers or
- * retrying requests.
+ * <p>Subclasses should override {@link #onResponse(ResponseT)} to handle
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new
stream is created,
+ * such as sending headers or retrying requests.
*
- * <p>send and startStream should not be called from onResponse; use
executor() instead.
+ * <p>{@link #send(RequestT)} and {@link #startStream()} should not be called
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
*
* <p>Synchronization on this is used to synchronize the gRpc stream state and
internal data
* structures. Since grpc channel operations may block, synchronization on
this stream may also
* block. This is generally not a problem since streams are used in a
single-threaded manner.
* However, some accessors used for status page and other debugging need to
take care not to require
* synchronizing on this.
+ *
+ * <p>{@link #start()} and {@link #shutdown()} are called once in the lifetime
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be
restarted/reused.
*/
public abstract class AbstractWindmillStream<RequestT, ResponseT> implements
WindmillStream {
- public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
// Default gRPC streams to 2MB chunks, which has shown to be a large enough
chunk size to reduce
// per-chunk overhead, and small enough that we can still perform granular
flow-control.
protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractWindmillStream.class);
- protected final AtomicBoolean clientClosed;
- private final AtomicBoolean isShutdown;
- private final AtomicLong lastSendTimeMs;
- private final Executor executor;
+ // Indicates that the logical stream has been half-closed and is waiting for
clean server
+ // shutdown.
+ private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+ private static final String NEVER_RECEIVED_RESPONSE_LOG_STRING = "never
received response";
+ protected final Sleeper sleeper;
+
+ private final Logger logger;
+ private final ExecutorService executor;
private final BackOff backoff;
- private final AtomicLong startTimeMs;
- private final AtomicLong lastResponseTimeMs;
- private final AtomicInteger errorCount;
- private final AtomicReference<String> lastError;
- private final AtomicReference<DateTime> lastErrorTime;
- private final AtomicLong sleepUntil;
private final CountDownLatch finishLatch;
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
private final int logEveryNStreamFailures;
- private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
- // Indicates if the current stream in requestObserver is closed by calling
close() method
- private final AtomicBoolean streamClosed;
private final String backendWorkerToken;
- private @Nullable StreamObserver<RequestT> requestObserver;
+ private final ResettableThrowingStreamObserver<RequestT> requestObserver;
+ private final StreamDebugMetrics debugMetrics;
+ protected volatile boolean clientClosed;
Review Comment:
can this be non-volatile guardedby now? think it's simpler if we just hold
synchronized block for sendHeartbeats etc instead of dealing with volatile and
interleaving with other stuff under synchronized blocks
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
+ protected synchronized boolean hasReceivedShutdownSignal() {
Review Comment:
maybe remove this method that it is single synchronize object? I'm guessing
most cases will have synchronization already (or perhaps should be changed to
keeping it longer than just checking the bool?)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -139,4 +166,29 @@ public void onCompleted() {
outboundObserver.onCompleted();
}
}
+
+ @Override
+ public void terminate(Throwable terminationException) {
+ // Free the blocked threads in onNext().
+ isReadyNotifier.forceTermination();
Review Comment:
i think we might as well do this cancellation in the case onError is called
as well
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -139,4 +166,29 @@ public void onCompleted() {
outboundObserver.onCompleted();
}
}
+
+ @Override
+ public void terminate(Throwable terminationException) {
+ // Free the blocked threads in onNext().
+ isReadyNotifier.forceTermination();
+ try {
+ onError(terminationException);
+ } catch (RuntimeException e) {
+ // If onError or onComplete was previously called, this will throw.
Review Comment:
can we keep track if onError or onComplete was called with a boolean instead
of triggering runtimeexception? we had issues before where grpc would get stuck
isntead of throwign exception if we misuse api
then for onError/onCompleted you can verify it is not set and set it. And
in terminate you can call onError only if it isn't set and remove this catch.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -67,61 +69,86 @@ public DirectStreamObserver(
}
@Override
- public void onNext(T value) {
+ public void onNext(T value) throws StreamObserverCancelledException {
int awaitPhase = -1;
long totalSecondsWaited = 0;
long waitSeconds = 1;
while (true) {
try {
synchronized (lock) {
+ int currentPhase = isReadyNotifier.getPhase();
+ // Phaser is terminated so don't use the outboundObserver. Since
onError and onCompleted
+ // are synchronized after terminating the phaser if we observe that
the phaser is not
+ // terminated the onNext calls below are guaranteed to not be called
on a closed observer.
+ if (currentPhase < 0) return;
+
+ // If we awaited previously and timed out, wait for the same phase.
Otherwise we're
+ // careful to observe the phase before observing isReady.
+ if (awaitPhase < 0) {
+ awaitPhase = isReadyNotifier.getPhase();
+ // If getPhase() returns a value less than 0, the phaser has been
terminated.
+ if (awaitPhase < 0) {
+ return;
+ }
+ }
+
// We only check isReady periodically to effectively allow for
increasing the outbound
// buffer periodically. This reduces the overhead of blocking while
still restricting
// memory because there is a limited # of streams, and we have a max
messages size of 2MB.
if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
outboundObserver.onNext(value);
return;
}
- // If we awaited previously and timed out, wait for the same phase.
Otherwise we're
- // careful to observe the phase before observing isReady.
- if (awaitPhase < 0) {
- awaitPhase = phaser.getPhase();
- }
+
if (outboundObserver.isReady()) {
messagesSinceReady = 0;
outboundObserver.onNext(value);
return;
}
}
+
// A callback has been registered to advance the phaser whenever the
observer
// transitions to is ready. Since we are waiting for a phase observed
before the
// outboundObserver.isReady() returned false, we expect it to advance
after the
// channel has become ready. This doesn't always seem to be the case
(despite
// documentation stating otherwise) so we poll periodically and
enforce an overall
// timeout related to the stream deadline.
- phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds,
TimeUnit.SECONDS);
+ int nextPhase =
+ isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds,
TimeUnit.SECONDS);
+ // If nextPhase is a value less than 0, the phaser has been terminated.
+ if (nextPhase < 0) {
+ return;
Review Comment:
throw new StreamObserverCancelledException(e);
seems better to surface that we didn't send message just in case to avoid
race where we were blocked on send, terminate cancels phaser, but onComplete
happens to run before terminates onError. Though see below we can improve
onComplete to check for that race.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -121,32 +130,49 @@ public static GrpcGetDataStream create(
int streamingRpcBatchLimit,
boolean sendKeyedGetDataRequests,
Consumer<List<Windmill.ComputationHeartbeatResponse>>
processHeartbeatResponses) {
- GrpcGetDataStream getDataStream =
- new GrpcGetDataStream(
- backendWorkerToken,
- startGetDataRpcFn,
- backoff,
- streamObserverFactory,
- streamRegistry,
- logEveryNStreamFailures,
- getDataThrottleTimer,
- jobHeader,
- idGenerator,
- streamingRpcBatchLimit,
- sendKeyedGetDataRequests,
- processHeartbeatResponses);
- getDataStream.startStream();
- return getDataStream;
+ return new GrpcGetDataStream(
+ backendWorkerToken,
+ startGetDataRpcFn,
+ backoff,
+ streamObserverFactory,
+ streamRegistry,
+ logEveryNStreamFailures,
+ getDataThrottleTimer,
+ jobHeader,
+ idGenerator,
+ streamingRpcBatchLimit,
+ sendKeyedGetDataRequests,
+ processHeartbeatResponses);
+ }
+
+ private static WindmillStreamShutdownException
shutdownExceptionFor(QueuedBatch batch) {
+ return new WindmillStreamShutdownException(
+ "Stream was closed when attempting to send " + batch.requestsCount() +
" requests.");
+ }
+
+ private static WindmillStreamShutdownException
shutdownExceptionFor(QueuedRequest request) {
+ return new WindmillStreamShutdownException(
+ "Cannot send request=[" + request + "] on closed stream.");
+ }
+
+ private void sendIgnoringClosed(StreamingGetDataRequest getDataRequest)
+ throws WindmillStreamShutdownException {
+ try {
+ send(getDataRequest);
+ } catch (StreamClosedException e) {
+ // Stream was closed on send, will be retried on stream restart.
+ }
}
@Override
- protected synchronized void onNewStream() {
+ protected synchronized void onNewStream()
+ throws StreamClosedException, WindmillStreamShutdownException {
send(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
- if (clientClosed.get()) {
+ if (clientClosed && !hasReceivedShutdownSignal()) {
Review Comment:
remove? above send will fail if shutdown
though I think we can also guarantee at higher level that onNewStream isn't
called if shutdown
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -314,24 +417,35 @@ private Batcher() {
@Override
public boolean commitWorkItem(
String computation, WorkItemCommitRequest commitRequest,
Consumer<CommitStatus> onDone) {
- if (!canAccept(commitRequest.getSerializedSize() +
computation.length())) {
+ if (!canAccept(commitRequest.getSerializedSize() + computation.length())
+ || hasReceivedShutdownSignal()) {
Review Comment:
can we remove the shutdown check here? seems easier to just handle it later.
Otherwise if we return false here, caller thinks batcher is full and new
batchers might be created and repeat.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -301,39 +343,59 @@ public void appendSpecificHtml(PrintWriter writer) {
writer.append("]");
}
- private <ResponseT> ResponseT issueRequest(QueuedRequest request,
ParseFn<ResponseT> parseFn) {
- while (true) {
+ private <ResponseT> ResponseT issueRequest(QueuedRequest request,
ParseFn<ResponseT> parseFn)
+ throws WindmillStreamShutdownException {
+ while (!hasReceivedShutdownSignal()) {
request.resetResponseStream();
try {
queueRequestAndWait(request);
return parseFn.parse(request.getResponseStream());
- } catch (CancellationException e) {
- // Retry issuing the request since the response stream was cancelled.
- continue;
+ } catch (AppendableInputStream.InvalidInputStreamStateException |
CancellationException e) {
+ handleShutdown(request, e);
+ if (!(e instanceof CancellationException)) {
+ throw e;
+ }
} catch (IOException e) {
LOG.error("Parsing GetData response failed: ", e);
- continue;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ handleShutdown(request, e);
throw new RuntimeException(e);
} finally {
pending.remove(request.id());
}
}
+
+ throw new WindmillStreamShutdownException(
Review Comment:
use shutdownForRequest?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
+ protected synchronized boolean hasReceivedShutdownSignal() {
+ return isShutdown;
}
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
- }
-
- return requestObserver;
+ /** Send a request to the server. */
+ protected final synchronized void send(RequestT request)
+ throws StreamClosedException, WindmillStreamShutdownException {
+ debugMetrics.recordSend();
+ requestObserver.onNext(request);
}
- /** Send a request to the server. */
- protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
+ @Override
+ public final void start() {
+ boolean shouldStartStream = false;
synchronized (this) {
- if (streamClosed.get()) {
- throw new IllegalStateException("Send called on a client closed
stream.");
+ if (!isShutdown && !started) {
+ started = true;
+ shouldStartStream = true;
}
+ }
- requestObserver().onNext(request);
+ if (shouldStartStream) {
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
- startTimeMs.set(Instant.now().getMillis());
- lastResponseTimeMs.set(0);
- streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ debugMetrics.recordStart();
+ requestObserver.reset();
onNewStream();
- if (clientClosed.get()) {
+ if (clientClosed) {
halfClose();
}
return;
}
+ } catch (WindmillStreamShutdownException e) {
+ logger.debug("Stream was shutdown while creating new stream.", e);
} catch (Exception e) {
- LOG.error("Failed to create new stream, retrying: ", e);
+ logger.error("Failed to create new stream, retrying: ", e);
try {
long sleep = backoff.nextBackOffMillis();
- sleepUntil.set(Instant.now().getMillis() + sleep);
- Thread.sleep(sleep);
- } catch (InterruptedException | IOException i) {
+ debugMetrics.recordSleep(sleep);
+ sleeper.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ logger.info(
+ "Interrupted during {} creation backoff. The stream will not be
created.",
+ getClass());
+ break;
+ } catch (IOException ioe) {
// Keep trying to create the stream.
}
}
}
+
+ // We were never able to start the stream, remove it from the stream
registry. Otherwise, it is
+ // removed when closed.
+ streamRegistry.remove(this);
}
- protected final Executor executor() {
- return executor;
+ /**
+ * Execute the runnable using the {@link #executor} handling the executor
being in a shutdown
+ * state.
+ */
+ protected final void executeSafely(Runnable runnable) {
+ try {
+ executor.execute(runnable);
+ } catch (RejectedExecutionException e) {
+ logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+ }
}
- public final synchronized void maybeSendHealthCheck(Instant
lastSendThreshold) {
- if (lastSendTimeMs.get() < lastSendThreshold.getMillis() &&
!clientClosed.get()) {
+ public final void maybeSendHealthCheck(Instant lastSendThreshold) {
+ if (!clientClosed && debugMetrics.getLastSendTimeMs() <
lastSendThreshold.getMillis()) {
try {
sendHealthCheck();
- } catch (RuntimeException e) {
- LOG.debug("Received exception sending health check.", e);
+ } catch (Exception e) {
+ logger.debug("Received exception sending health check.", e);
}
}
}
- protected abstract void sendHealthCheck();
+ protected abstract void sendHealthCheck()
+ throws WindmillStreamShutdownException, StreamClosedException;
- // Care is taken that synchronization on this is unnecessary for all status
page information.
- // Blocking sends are made beneath this stream object's lock which could
block status page
- // rendering.
+ /**
+ * @implNote Care is taken that synchronization on this is unnecessary for
all status page
+ * information. Blocking sends are made beneath this stream object's
lock which could block
+ * status page rendering.
+ */
public final void appendSummaryHtml(PrintWriter writer) {
appendSpecificHtml(writer);
- if (errorCount.get() > 0) {
- writer.format(
- ", %d errors, last error [ %s ] at [%s]",
- errorCount.get(), lastError.get(), lastErrorTime.get());
- }
- if (clientClosed.get()) {
+ StreamDebugMetrics.Snapshot summaryMetrics =
debugMetrics.getSummaryMetrics();
+ summaryMetrics
+ .restartMetrics()
+ .ifPresent(
+ metrics ->
+ writer.format(
+ ", %d restarts, last restart reason [ %s ] at [%s], %d
errors",
+ metrics.restartCount(),
+ metrics.lastRestartReason(),
+ metrics.lastRestartTime(),
+ metrics.errorCount()));
+
+ if (clientClosed) {
Review Comment:
will need copy in debug metrics if removing volatile
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
+ protected synchronized boolean hasReceivedShutdownSignal() {
+ return isShutdown;
}
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
- }
-
- return requestObserver;
+ /** Send a request to the server. */
+ protected final synchronized void send(RequestT request)
+ throws StreamClosedException, WindmillStreamShutdownException {
+ debugMetrics.recordSend();
+ requestObserver.onNext(request);
}
- /** Send a request to the server. */
- protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
+ @Override
+ public final void start() {
+ boolean shouldStartStream = false;
synchronized (this) {
- if (streamClosed.get()) {
- throw new IllegalStateException("Send called on a client closed
stream.");
+ if (!isShutdown && !started) {
+ started = true;
+ shouldStartStream = true;
}
+ }
- requestObserver().onNext(request);
+ if (shouldStartStream) {
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
- startTimeMs.set(Instant.now().getMillis());
- lastResponseTimeMs.set(0);
- streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ debugMetrics.recordStart();
+ requestObserver.reset();
onNewStream();
- if (clientClosed.get()) {
+ if (clientClosed) {
halfClose();
}
return;
}
+ } catch (WindmillStreamShutdownException e) {
+ logger.debug("Stream was shutdown while creating new stream.", e);
} catch (Exception e) {
- LOG.error("Failed to create new stream, retrying: ", e);
+ logger.error("Failed to create new stream, retrying: ", e);
try {
long sleep = backoff.nextBackOffMillis();
- sleepUntil.set(Instant.now().getMillis() + sleep);
- Thread.sleep(sleep);
- } catch (InterruptedException | IOException i) {
+ debugMetrics.recordSleep(sleep);
+ sleeper.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ logger.info(
+ "Interrupted during {} creation backoff. The stream will not be
created.",
Review Comment:
when does this interruption happen? is it expected at all?
I'm worried abou tnot starting the stream since any pending stuff is just
going to never finish. Should we shutdown the stream at least so that senders
get errors? Or if we can remove this special case if we dont' think interrupt
should occur that could be simpler too.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long
startMs) {
protected abstract void startThrottleTimer();
/** Reflects that {@link #shutdown()} was explicitly called. */
- protected boolean isShutdown() {
- return isShutdown.get();
+ protected synchronized boolean hasReceivedShutdownSignal() {
+ return isShutdown;
}
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
- }
-
- return requestObserver;
+ /** Send a request to the server. */
+ protected final synchronized void send(RequestT request)
+ throws StreamClosedException, WindmillStreamShutdownException {
+ debugMetrics.recordSend();
+ requestObserver.onNext(request);
}
- /** Send a request to the server. */
- protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
+ @Override
+ public final void start() {
+ boolean shouldStartStream = false;
synchronized (this) {
- if (streamClosed.get()) {
- throw new IllegalStateException("Send called on a client closed
stream.");
+ if (!isShutdown && !started) {
+ started = true;
+ shouldStartStream = true;
}
+ }
- requestObserver().onNext(request);
+ if (shouldStartStream) {
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
- startTimeMs.set(Instant.now().getMillis());
- lastResponseTimeMs.set(0);
- streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ debugMetrics.recordStart();
+ requestObserver.reset();
onNewStream();
- if (clientClosed.get()) {
+ if (clientClosed) {
halfClose();
}
return;
}
+ } catch (WindmillStreamShutdownException e) {
+ logger.debug("Stream was shutdown while creating new stream.", e);
} catch (Exception e) {
- LOG.error("Failed to create new stream, retrying: ", e);
+ logger.error("Failed to create new stream, retrying: ", e);
try {
long sleep = backoff.nextBackOffMillis();
- sleepUntil.set(Instant.now().getMillis() + sleep);
- Thread.sleep(sleep);
- } catch (InterruptedException | IOException i) {
+ debugMetrics.recordSleep(sleep);
+ sleeper.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ logger.info(
+ "Interrupted during {} creation backoff. The stream will not be
created.",
+ getClass());
+ break;
+ } catch (IOException ioe) {
// Keep trying to create the stream.
}
}
}
+
+ // We were never able to start the stream, remove it from the stream
registry. Otherwise, it is
+ // removed when closed.
+ streamRegistry.remove(this);
}
- protected final Executor executor() {
- return executor;
+ /**
+ * Execute the runnable using the {@link #executor} handling the executor
being in a shutdown
+ * state.
+ */
+ protected final void executeSafely(Runnable runnable) {
+ try {
+ executor.execute(runnable);
+ } catch (RejectedExecutionException e) {
+ logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+ }
}
- public final synchronized void maybeSendHealthCheck(Instant
lastSendThreshold) {
- if (lastSendTimeMs.get() < lastSendThreshold.getMillis() &&
!clientClosed.get()) {
+ public final void maybeSendHealthCheck(Instant lastSendThreshold) {
Review Comment:
leave synchronized? (assuming we change clientClosed above)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+/** Thrown when operations are requested on a {@link WindmillStream} has been
shutdown/closed. */
+public final class WindmillStreamShutdownException extends Exception {
Review Comment:
it's not particularly clear what the difference between
StreamClosedException and WIndmillStreamShutdownException are from the comments.
I was wondering too if we need different types of exceptions to distinguish
between shutdown and stream being closed when handling them. IF not we could
just have one class with different messages.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +163,44 @@ public void sendHealthCheck() {
protected void onResponse(StreamingCommitResponse response) {
commitWorkThrottleTimer.stop();
- RuntimeException finalException = null;
+ CommitCompletionException failures = new CommitCompletionException();
for (int i = 0; i < response.getRequestIdCount(); ++i) {
long requestId = response.getRequestId(i);
if (requestId == HEARTBEAT_REQUEST_ID) {
continue;
}
- PendingRequest done = pending.remove(requestId);
- if (done == null) {
- LOG.error("Got unknown commit request ID: {}", requestId);
+ PendingRequest pendingRequest = pending.remove(requestId);
+ CommitStatus commitStatus =
+ i < response.getStatusCount() ? response.getStatus(i) :
CommitStatus.OK;
+ if (pendingRequest == null) {
+ if (!hasReceivedShutdownSignal()) {
+ // Skip responses when the stream is shutdown since they are now
invalid.
Review Comment:
nit: skip responses is unclear, maybe somethign like
// Missing responses is expected after shutdown because it removes them.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -139,4 +166,29 @@ public void onCompleted() {
outboundObserver.onCompleted();
Review Comment:
should we check if phaser has been terminated beneath the lock here and if
so call onError? that seems like it would enforce that if send didn't send
something due to phaser cancellation that we wouldn't complete successfully.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +163,44 @@ public void sendHealthCheck() {
protected void onResponse(StreamingCommitResponse response) {
commitWorkThrottleTimer.stop();
- RuntimeException finalException = null;
+ CommitCompletionException failures = new CommitCompletionException();
Review Comment:
nit: how about a builder for this since we won't want to add exceptions in
other places.
then the final line of method can be
builder.throwIfNonEmpty();
and interrnally it builds exception and throws if needed?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -314,24 +417,35 @@ private Batcher() {
@Override
public boolean commitWorkItem(
String computation, WorkItemCommitRequest commitRequest,
Consumer<CommitStatus> onDone) {
- if (!canAccept(commitRequest.getSerializedSize() +
computation.length())) {
+ if (!canAccept(commitRequest.getSerializedSize() + computation.length())
+ || hasReceivedShutdownSignal()) {
return false;
}
- PendingRequest request = new PendingRequest(computation, commitRequest,
onDone);
+
+ PendingRequest request = PendingRequest.create(computation,
commitRequest, onDone);
add(idGenerator.incrementAndGet(), request);
return true;
}
/** Flushes any pending work items to the wire. */
@Override
public void flush() {
- flushInternal(queue);
- queuedBytes = 0;
- queue.clear();
+ try {
+ if (!hasReceivedShutdownSignal()) {
Review Comment:
can we remove? it is handled in stream methods already and doing it here
leaves a gap between shutdown check and flush anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]