scwhittle commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1690287323
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -151,41 +154,41 @@ private static long debugDuration(long nowMs, long
startMs) {
*/
protected abstract void startThrottleTimer();
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
- }
-
- return requestObserver;
- }
-
/** Send a request to the server. */
protected final void send(RequestT request) {
lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
+ // Check if we should send after we acquire the lock.
+ if (isShutdown()) {
+ LOG.warn("Send called on a shutdown stream.");
+ return;
+ }
+
if (streamClosed.get()) {
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ requestObserver.onNext(request);
}
}
/** Starts the underlying stream. */
protected final void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
- while (true) {
+ while (!isShutdown.get()) {
Review Comment:
I'd just remove this check since you do it first thing below
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -303,71 +368,78 @@ public void onNext(ResponseT response) {
@Override
public void onError(Throwable t) {
- onStreamFinished(t);
+ if (isStreamDone()) {
+ return;
+ }
+
+ Status status = Status.fromThrowable(t);
+ setLastError(status.toString());
+ // Don't log every error since it will get noisy, and many errors
transient.
+ if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
+ long nowMillis = Instant.now().getMillis();
+ String responseDebug;
+ if (lastResponseTimeMs.get() == 0) {
+ responseDebug = "never received response";
+ } else {
+ responseDebug = "received response " + (nowMillis -
lastResponseTimeMs.get()) + "ms ago";
+ }
+ LOG.debug(
+ "{} streaming Windmill RPC errors for {}, last was: {} with status
{}."
+ + " created {}ms ago, {}. This is normal with autoscaling.",
+ AbstractWindmillStream.this.getClass(),
+ errorCount.get(),
+ t,
+ status,
+ nowMillis - startTimeMs.get(),
+ responseDebug);
+ }
+
+ // If the stream was stopped due to a resource exhausted error then we
are throttled.
+ if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
+ startThrottleTimer();
+ }
+
+ try {
+ long sleep = backoff.nextBackOffMillis();
+ sleepUntil.set(Instant.now().getMillis() + sleep);
+ sleeper.sleep(sleep);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ // Ignore.
+ }
+
+ tryRestartStream();
}
@Override
public void onCompleted() {
- onStreamFinished(null);
+ if (isStreamDone()) {
+ return;
+ }
+ errorCount.incrementAndGet();
+ String error =
+ "Stream completed successfully but did not complete requested
operations, "
+ + "recreating";
+ LOG.warn(error);
+ setLastError(error);
+ tryRestartStream();
}
- private void onStreamFinished(@Nullable Throwable t) {
- synchronized (this) {
- if (clientClosed.get() && !hasPendingRequests()) {
- streamRegistry.remove(AbstractWindmillStream.this);
- finishLatch.countDown();
- return;
- }
+ private void tryRestartStream() {
+ if (!isShutdown()) {
+ executeSafely(AbstractWindmillStream.this::startStream);
}
- if (t != null) {
- Status status = null;
- if (t instanceof StatusRuntimeException) {
- status = ((StatusRuntimeException) t).getStatus();
- }
- String statusError = status == null ? "" : status.toString();
- setLastError(statusError);
- if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
- long nowMillis = Instant.now().getMillis();
- String responseDebug;
- if (lastResponseTimeMs.get() == 0) {
- responseDebug = "never received response";
- } else {
- responseDebug =
- "received response " + (nowMillis - lastResponseTimeMs.get())
+ "ms ago";
- }
- LOG.debug(
- "{} streaming Windmill RPC errors for {}, last was: {} with
status {}."
- + " created {}ms ago, {}. This is normal with autoscaling.",
- AbstractWindmillStream.this.getClass(),
- errorCount.get(),
- t,
- statusError,
- nowMillis - startTimeMs.get(),
- responseDebug);
- }
- // If the stream was stopped due to a resource exhausted error then we
are throttled.
- if (status != null && status.getCode() ==
Status.Code.RESOURCE_EXHAUSTED) {
- startThrottleTimer();
- }
+ }
- try {
- long sleep = backoff.nextBackOffMillis();
- sleepUntil.set(Instant.now().getMillis() + sleep);
- Thread.sleep(sleep);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (IOException e) {
- // Ignore.
- }
- } else {
- errorCount.incrementAndGet();
- String error =
- "Stream completed successfully but did not complete requested
operations, "
- + "recreating";
- LOG.warn(error);
- setLastError(error);
+ private synchronized boolean isStreamDone() {
+ if (isShutdown() || (clientClosed.get() && !hasPendingRequests())) {
+ streamRegistry.remove(AbstractWindmillStream.this);
Review Comment:
I think this kind of side effect is confusing in method that just sounds
like an accessor
how about maybeTeardownStream()?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -295,34 +367,13 @@ private StreamingDataflowWorker(
.setStatusPages(workerStatusPages)
.setStateCache(stateCache)
.setComputationStateCache(this.computationStateCache)
-
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
- .setGetDataStatusProvider(getDataClient::printHtml)
+ .setCurrentActiveCommitBytes(currentActiveCommitBytes)
Review Comment:
could set these above directly and get rid of currentActiveCommitBytes and
getDataStatusProvider variables
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -176,7 +180,7 @@ private StreamingDataflowWorker(
DataflowWorkerHarnessOptions options,
HotKeyLogger hotKeyLogger,
Supplier<Instant> clock,
- StreamingWorkerStatusReporter workerStatusReporter,
+ Function<Supplier<Long>, StreamingWorkerStatusReporter>
streamingWorkerStatusReporterFactory,
Review Comment:
should this be a functional interface so you can document? it's unclear what
the long supplier is
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -151,41 +154,41 @@ private static long debugDuration(long nowMs, long
startMs) {
*/
protected abstract void startThrottleTimer();
- private StreamObserver<RequestT> requestObserver() {
- if (requestObserver == null) {
- throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
- }
-
- return requestObserver;
- }
-
/** Send a request to the server. */
protected final void send(RequestT request) {
lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
+ // Check if we should send after we acquire the lock.
+ if (isShutdown()) {
+ LOG.warn("Send called on a shutdown stream.");
Review Comment:
if this is possible, don't log as customers don't like warning logs and open
issues about them
if this should not be possible, perhaps better to throw an exception so that
we notice and fix it.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -303,71 +368,78 @@ public void onNext(ResponseT response) {
@Override
public void onError(Throwable t) {
- onStreamFinished(t);
+ if (isStreamDone()) {
+ return;
+ }
+
+ Status status = Status.fromThrowable(t);
+ setLastError(status.toString());
+ // Don't log every error since it will get noisy, and many errors
transient.
+ if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
+ long nowMillis = Instant.now().getMillis();
+ String responseDebug;
+ if (lastResponseTimeMs.get() == 0) {
+ responseDebug = "never received response";
+ } else {
+ responseDebug = "received response " + (nowMillis -
lastResponseTimeMs.get()) + "ms ago";
+ }
+ LOG.debug(
+ "{} streaming Windmill RPC errors for {}, last was: {} with status
{}."
+ + " created {}ms ago, {}. This is normal with autoscaling.",
+ AbstractWindmillStream.this.getClass(),
+ errorCount.get(),
+ t,
+ status,
+ nowMillis - startTimeMs.get(),
+ responseDebug);
+ }
+
+ // If the stream was stopped due to a resource exhausted error then we
are throttled.
+ if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
+ startThrottleTimer();
+ }
+
+ try {
+ long sleep = backoff.nextBackOffMillis();
+ sleepUntil.set(Instant.now().getMillis() + sleep);
+ sleeper.sleep(sleep);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ // Ignore.
+ }
+
+ tryRestartStream();
}
@Override
public void onCompleted() {
- onStreamFinished(null);
+ if (isStreamDone()) {
+ return;
+ }
+ errorCount.incrementAndGet();
+ String error =
+ "Stream completed successfully but did not complete requested
operations, "
+ + "recreating";
+ LOG.warn(error);
+ setLastError(error);
+ tryRestartStream();
}
- private void onStreamFinished(@Nullable Throwable t) {
- synchronized (this) {
- if (clientClosed.get() && !hasPendingRequests()) {
- streamRegistry.remove(AbstractWindmillStream.this);
- finishLatch.countDown();
- return;
- }
+ private void tryRestartStream() {
+ if (!isShutdown()) {
Review Comment:
this looks racy, we check shutdown above in isStreamDone with
synchronization, but then if it is shutdown before here, we end up with an
error but won't restart the stream or remove it from the registry.
I would remove this one
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +333,83 @@ private void setLastError(String error) {
lastErrorTime.set(DateTime.now());
}
+ protected abstract void shutdownInternal();
+
public static class WindmillStreamShutdownException extends RuntimeException
{
public WindmillStreamShutdownException(String message) {
super(message);
}
}
+ /**
+ * Request observer that allows resetting its internal delegate using the
given {@link
+ * #requestObserverSupplier}.
+ */
+ @ThreadSafe
+ private static class ResettableRequestObserver<RequestT> implements
StreamObserver<RequestT> {
+ private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+ @GuardedBy("delegateRequestObserver")
+ private final AtomicReference<StreamObserver<RequestT>>
delegateRequestObserver;
Review Comment:
All of the reads appear to be guarded, seems like you coudl either not have
the atomicref or change delegate() to just return atomic ref without locking.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -166,7 +170,6 @@ private static Watermarks createWatermarks(
}
private void sendRequestExtension(GetWorkBudget adjustment) {
- inFlightBudget.getAndUpdate(budget -> budget.apply(adjustment));
Review Comment:
what is inflight budget tracking if we're not incrementing it here?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -69,6 +70,7 @@ private GrpcCommitWorkStream(
AtomicLong idGenerator,
int streamingRpcBatchLimit) {
super(
+ LOG,
Review Comment:
can you change type of pending to ConcurrentHashMap above, no reason to use
Map if we're relying on concurrency and not exposing it directly.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -163,7 +165,7 @@ protected void onResponse(StreamingCommitResponse response)
{
continue;
}
PendingRequest done = pending.remove(requestId);
- if (done == null) {
+ if (done == null && !isShutdown()) {
Review Comment:
this is going to get nullptr exception below, instead move the isShutdown
check to whether or not to log inside this if
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -175,36 +178,38 @@ private void sendRequestExtension(GetWorkBudget
adjustment) {
.setMaxBytes(adjustment.bytes()))
.build();
- executor()
- .execute(
- () -> {
- try {
- send(extension);
- } catch (IllegalStateException e) {
- // Stream was closed.
- }
- });
+ executeSafely(
+ () -> {
+ try {
+ send(extension);
+ } catch (IllegalStateException e) {
Review Comment:
should we handle illegalstateexception internally? or rely on executeSafely
to catch it?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +333,83 @@ private void setLastError(String error) {
lastErrorTime.set(DateTime.now());
}
+ protected abstract void shutdownInternal();
+
public static class WindmillStreamShutdownException extends RuntimeException
{
public WindmillStreamShutdownException(String message) {
super(message);
}
}
+ /**
+ * Request observer that allows resetting its internal delegate using the
given {@link
+ * #requestObserverSupplier}.
+ */
+ @ThreadSafe
+ private static class ResettableRequestObserver<RequestT> implements
StreamObserver<RequestT> {
+ private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+ @GuardedBy("delegateRequestObserver")
+ private final AtomicReference<StreamObserver<RequestT>>
delegateRequestObserver;
+
+ @GuardedBy("delegateRequestObserver")
+ /* Indicates if onCompleted() has been called for the current
delegateRequestObserver instance.
+ Reset to false when reset() is called. */
+ private volatile boolean isClosed;
Review Comment:
ditto seems like this could be just a normal guarded boolean not volatile
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -175,36 +178,38 @@ private void sendRequestExtension(GetWorkBudget
adjustment) {
.setMaxBytes(adjustment.bytes()))
.build();
- executor()
- .execute(
- () -> {
- try {
- send(extension);
- } catch (IllegalStateException e) {
- // Stream was closed.
- }
- });
+ executeSafely(
+ () -> {
+ try {
+ send(extension);
+ } catch (IllegalStateException e) {
+ // Stream was closed.
+ }
+ });
}
@Override
protected synchronized void onNewStream() {
workItemAssemblers.clear();
- // Add the current in-flight budget to the next adjustment. Only positive
values are allowed
- // here
- // with negatives defaulting to 0, since GetWorkBudgets cannot be created
with negative values.
- GetWorkBudget budgetAdjustment =
nextBudgetAdjustment.get().apply(inFlightBudget.get());
- inFlightBudget.set(budgetAdjustment);
- send(
- StreamingGetWorkRequest.newBuilder()
- .setRequest(
- request
- .toBuilder()
- .setMaxBytes(budgetAdjustment.bytes())
- .setMaxItems(budgetAdjustment.items()))
- .build());
+ if (!isShutdown()) {
+ // Add the current in-flight budget to the next adjustment. Only
positive values are allowed
+ // here with negatives defaulting to 0, since GetWorkBudgets cannot be
created with negative
+ // values. We just sent the budget, reset it.
+ GetWorkBudget currentBudgetAdjustment =
+ nextBudgetAdjustment.getAndUpdate(ignored ->
GetWorkBudget.noBudget());
+ GetWorkBudget budgetAdjustment =
currentBudgetAdjustment.apply(inFlightBudget.get());
- // We just sent the budget, reset it.
- nextBudgetAdjustment.set(GetWorkBudget.noBudget());
+ inFlightBudget.updateAndGet(budget ->
budget.apply(currentBudgetAdjustment));
Review Comment:
this doesn't make sense to me, currentBudgetAdjustment is
(previous nextBudgetAdjustment + inflightBudget) so this is 2*inflightBudget
+ nextBudgetAdjustment
Also it would be better to use the result of this instead of a separate
inflightBudget.get()
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -117,7 +119,7 @@ public void appendSpecificHtml(PrintWriter writer) {
}
@Override
- protected synchronized void onNewStream() {
+ protected void onNewStream() {
Review Comment:
why remove the synchronized? it is beneath synchronized block in superclass
anyway so I think it executes equivalently but it is clearer since we want
send/pendign to be consistent to match the sends below.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +333,83 @@ private void setLastError(String error) {
lastErrorTime.set(DateTime.now());
}
+ protected abstract void shutdownInternal();
+
public static class WindmillStreamShutdownException extends RuntimeException
{
public WindmillStreamShutdownException(String message) {
super(message);
}
}
+ /**
+ * Request observer that allows resetting its internal delegate using the
given {@link
+ * #requestObserverSupplier}.
+ */
+ @ThreadSafe
+ private static class ResettableRequestObserver<RequestT> implements
StreamObserver<RequestT> {
+ private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+ @GuardedBy("delegateRequestObserver")
+ private final AtomicReference<StreamObserver<RequestT>>
delegateRequestObserver;
+
+ @GuardedBy("delegateRequestObserver")
+ /* Indicates if onCompleted() has been called for the current
delegateRequestObserver instance.
+ Reset to false when reset() is called. */
+ private volatile boolean isClosed;
+
+ private ResettableRequestObserver(Supplier<StreamObserver<RequestT>>
requestObserverSupplier) {
+ this.requestObserverSupplier = requestObserverSupplier;
+ this.delegateRequestObserver = new AtomicReference<>();
+ this.isClosed = false;
+ }
+
+ private StreamObserver<RequestT> delegate() {
+ synchronized (delegateRequestObserver) {
Review Comment:
can you just simplify on this and use synchronized methods?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -167,67 +170,103 @@ private StreamObserver<RequestT> requestObserver() {
}
/** Send a request to the server. */
- protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
- synchronized (this) {
- if (streamClosed.get()) {
- throw new IllegalStateException("Send called on a client closed
stream.");
+ protected final synchronized void send(RequestT request) {
+ if (isShutdown()) {
+ logger.debug(
+ "Send called on a shutdown stream={} to worker{}.", getClass(),
backendWorkerToken);
+ return;
+ }
+
+ if (requestObserver.isClosed()) {
+ throw new IllegalStateException("Send called on a client closed
stream.");
+ }
+
+ try {
+ lastSendTimeMs.set(Instant.now().getMillis());
+ requestObserver.onNext(request);
+ } catch (StreamObserverCancelledException e) {
+ if (isShutdown()) {
+ logger.debug("Stream was closed or shutdown during send.", e);
+ return;
}
- requestObserver().onNext(request);
+ logger.error(
+ "StreamObserver was unexpectedly cancelled for stream={}, worker={}.
stacktrace={}",
+ getClass(),
+ backendWorkerToken,
+ e.getStackTrace(),
+ e);
+ throw e;
}
}
/** Starts the underlying stream. */
protected final void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
- while (true) {
+ while (!isShutdown.get()) {
try {
synchronized (this) {
+ if (isShutdown.get()) {
+ break;
+ }
startTimeMs.set(Instant.now().getMillis());
lastResponseTimeMs.set(0);
- streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ requestObserver.reset();
onNewStream();
- if (clientClosed.get()) {
+ if (clientClosed.get() && !isShutdown()) {
Review Comment:
whatever isShutdown() is protecting against it seems racy because it could
become true right after you observe it.
seems like we could attempt to close the response observer that shutdown
called onError on.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -117,8 +121,8 @@ private GrpcDirectGetWorkStream(
this.heartbeatSender = Suppliers.memoize(heartbeatSender::get);
this.workCommitter = Suppliers.memoize(workCommitter::get);
this.getDataClient = Suppliers.memoize(getDataClient::get);
- this.inFlightBudget = new AtomicReference<>(GetWorkBudget.noBudget());
this.nextBudgetAdjustment = new
AtomicReference<>(GetWorkBudget.noBudget());
+ this.inFlightBudget = new AtomicReference<>(GetWorkBudget.noBudget());
Review Comment:
revert? better to avoid no-op changes
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -290,12 +333,83 @@ private void setLastError(String error) {
lastErrorTime.set(DateTime.now());
}
+ protected abstract void shutdownInternal();
+
public static class WindmillStreamShutdownException extends RuntimeException
{
public WindmillStreamShutdownException(String message) {
super(message);
}
}
+ /**
+ * Request observer that allows resetting its internal delegate using the
given {@link
+ * #requestObserverSupplier}.
+ */
+ @ThreadSafe
+ private static class ResettableRequestObserver<RequestT> implements
StreamObserver<RequestT> {
+ private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+ @GuardedBy("delegateRequestObserver")
+ private final AtomicReference<StreamObserver<RequestT>>
delegateRequestObserver;
+
+ @GuardedBy("delegateRequestObserver")
+ /* Indicates if onCompleted() has been called for the current
delegateRequestObserver instance.
+ Reset to false when reset() is called. */
+ private volatile boolean isClosed;
+
+ private ResettableRequestObserver(Supplier<StreamObserver<RequestT>>
requestObserverSupplier) {
+ this.requestObserverSupplier = requestObserverSupplier;
+ this.delegateRequestObserver = new AtomicReference<>();
+ this.isClosed = false;
+ }
+
+ private StreamObserver<RequestT> delegate() {
+ synchronized (delegateRequestObserver) {
+ if (delegateRequestObserver.get() == null) {
+ throw new NullPointerException(
+ "requestObserver cannot be null. Missing a call to startStream()
to initialize.");
+ }
+
+ return delegateRequestObserver.get();
+ }
+ }
+
+ private void reset() {
+ synchronized (delegateRequestObserver) {
+ delegateRequestObserver.set(requestObserverSupplier.get());
+ isClosed = false;
+ }
+ }
+
+ @Override
+ public void onNext(RequestT requestT) {
+ delegate().onNext(requestT);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ delegate().onError(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ synchronized (delegateRequestObserver) {
+ if (!isClosed) {
+ // onCompleted() can only be called once for each StreamObserver
instance, or else an
+ // IllegalStateException is thrown.
+ delegate().onCompleted();
+ isClosed = true;
+ }
+ }
+ }
+
+ public boolean isClosed() {
Review Comment:
exposing this for non-status pages seems risky since the synchronized state
can change between when this is checked and another method is called.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -182,6 +184,12 @@ protected void onResponse(StreamingCommitResponse
response) {
}
}
+ @Override
+ protected void shutdownInternal() {
+ pending.values().forEach(pendingRequest ->
pendingRequest.onDone.accept(CommitStatus.ABORTED));
Review Comment:
I'm worried if pending has somethign inserted between iterating and clear
can you instead use an iterator where you remove as you go so everything
removed is guaranteed to be aborted?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]