m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1837621434
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -305,77 +341,96 @@ public void onNext(ResponseT response) {
} catch (IOException e) {
// Ignore.
}
- lastResponseTimeMs.set(Instant.now().getMillis());
+ debugMetrics.recordResponse();
onResponse(response);
}
@Override
public void onError(Throwable t) {
- onStreamFinished(t);
+ if (maybeTeardownStream()) {
+ return;
+ }
+
+ recordStreamStatus(Status.fromThrowable(t));
+
+ try {
+ long sleep = backoff.nextBackOffMillis();
+ debugMetrics.recordSleep(sleep);
+ sleeper.sleep(sleep);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (IOException e) {
+ // Ignore.
+ }
+
+ executeSafely(AbstractWindmillStream.this::startStream);
}
@Override
public void onCompleted() {
- onStreamFinished(null);
+ if (maybeTeardownStream()) {
+ return;
+ }
+ recordStreamStatus(OK_STATUS);
+ executeSafely(AbstractWindmillStream.this::startStream);
}
- private void onStreamFinished(@Nullable Throwable t) {
- synchronized (this) {
- if (isShutdown.get() || (clientClosed.get() && !hasPendingRequests()))
{
- streamRegistry.remove(AbstractWindmillStream.this);
- finishLatch.countDown();
- return;
- }
- }
- if (t != null) {
- Status status = null;
- if (t instanceof StatusRuntimeException) {
- status = ((StatusRuntimeException) t).getStatus();
- }
- String statusError = status == null ? "" : status.toString();
- setLastError(statusError);
- if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
+ private void recordStreamStatus(Status status) {
+ int currentRestartCount = debugMetrics.incrementAndGetRestarts();
+ if (status.isOk()) {
+ String restartReason =
+ "Stream completed successfully but did not complete requested
operations, "
+ + "recreating";
+ logger.warn(restartReason);
+ debugMetrics.recordRestartReason(restartReason);
+ } else {
+ int currentErrorCount = debugMetrics.incrementAndGetErrors();
+ debugMetrics.recordRestartReason(status.toString());
+ Throwable t = status.getCause();
+ if (t instanceof StreamObserverCancelledException) {
+ logger.error(
+ "StreamObserver was unexpectedly cancelled for stream={},
worker={}. stacktrace={}",
+ getClass(),
+ backendWorkerToken,
+ t.getStackTrace(),
+ t);
+ } else if (currentRestartCount % logEveryNStreamFailures == 0) {
+ // Don't log every restart since it will get noisy, and many errors
transient.
long nowMillis = Instant.now().getMillis();
- String responseDebug;
- if (lastResponseTimeMs.get() == 0) {
- responseDebug = "never received response";
- } else {
- responseDebug =
- "received response " + (nowMillis - lastResponseTimeMs.get())
+ "ms ago";
- }
- LOG.debug(
- "{} streaming Windmill RPC errors for {}, last was: {} with
status {}."
- + " created {}ms ago, {}. This is normal with autoscaling.",
+ logger.debug(
+ "{} has been restarted {} times. Streaming Windmill RPC Error
Count: {}; last was: {}"
+ + " with status: {}. created {}ms ago; {}. This is normal
with autoscaling.",
AbstractWindmillStream.this.getClass(),
- errorCount.get(),
+ currentRestartCount,
+ currentErrorCount,
t,
- statusError,
- nowMillis - startTimeMs.get(),
- responseDebug);
+ status,
+ nowMillis - debugMetrics.getStartTimeMs(),
+ debugMetrics
+ .responseDebugString(nowMillis)
+ .orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING));
}
+
// If the stream was stopped due to a resource exhausted error then we
are throttled.
- if (status != null && status.getCode() ==
Status.Code.RESOURCE_EXHAUSTED) {
+ if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]