m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1837621434


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -305,77 +341,96 @@ public void onNext(ResponseT response) {
       } catch (IOException e) {
         // Ignore.
       }
-      lastResponseTimeMs.set(Instant.now().getMillis());
+      debugMetrics.recordResponse();
       onResponse(response);
     }
 
     @Override
     public void onError(Throwable t) {
-      onStreamFinished(t);
+      if (maybeTeardownStream()) {
+        return;
+      }
+
+      recordStreamStatus(Status.fromThrowable(t));
+
+      try {
+        long sleep = backoff.nextBackOffMillis();
+        debugMetrics.recordSleep(sleep);
+        sleeper.sleep(sleep);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return;
+      } catch (IOException e) {
+        // Ignore.
+      }
+
+      executeSafely(AbstractWindmillStream.this::startStream);
     }
 
     @Override
     public void onCompleted() {
-      onStreamFinished(null);
+      if (maybeTeardownStream()) {
+        return;
+      }
+      recordStreamStatus(OK_STATUS);
+      executeSafely(AbstractWindmillStream.this::startStream);
     }
 
-    private void onStreamFinished(@Nullable Throwable t) {
-      synchronized (this) {
-        if (isShutdown.get() || (clientClosed.get() && !hasPendingRequests())) 
{
-          streamRegistry.remove(AbstractWindmillStream.this);
-          finishLatch.countDown();
-          return;
-        }
-      }
-      if (t != null) {
-        Status status = null;
-        if (t instanceof StatusRuntimeException) {
-          status = ((StatusRuntimeException) t).getStatus();
-        }
-        String statusError = status == null ? "" : status.toString();
-        setLastError(statusError);
-        if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
+    private void recordStreamStatus(Status status) {
+      int currentRestartCount = debugMetrics.incrementAndGetRestarts();
+      if (status.isOk()) {
+        String restartReason =
+            "Stream completed successfully but did not complete requested 
operations, "
+                + "recreating";
+        logger.warn(restartReason);
+        debugMetrics.recordRestartReason(restartReason);
+      } else {
+        int currentErrorCount = debugMetrics.incrementAndGetErrors();
+        debugMetrics.recordRestartReason(status.toString());
+        Throwable t = status.getCause();
+        if (t instanceof StreamObserverCancelledException) {
+          logger.error(
+              "StreamObserver was unexpectedly cancelled for stream={}, 
worker={}. stacktrace={}",
+              getClass(),
+              backendWorkerToken,
+              t.getStackTrace(),
+              t);
+        } else if (currentRestartCount % logEveryNStreamFailures == 0) {
+          // Don't log every restart since it will get noisy, and many errors 
transient.
           long nowMillis = Instant.now().getMillis();
-          String responseDebug;
-          if (lastResponseTimeMs.get() == 0) {
-            responseDebug = "never received response";
-          } else {
-            responseDebug =
-                "received response " + (nowMillis - lastResponseTimeMs.get()) 
+ "ms ago";
-          }
-          LOG.debug(
-              "{} streaming Windmill RPC errors for {}, last was: {} with 
status {}."
-                  + " created {}ms ago, {}. This is normal with autoscaling.",
+          logger.debug(
+              "{} has been restarted {} times. Streaming Windmill RPC Error 
Count: {}; last was: {}"
+                  + " with status: {}. created {}ms ago; {}. This is normal 
with autoscaling.",
               AbstractWindmillStream.this.getClass(),
-              errorCount.get(),
+              currentRestartCount,
+              currentErrorCount,
               t,
-              statusError,
-              nowMillis - startTimeMs.get(),
-              responseDebug);
+              status,
+              nowMillis - debugMetrics.getStartTimeMs(),
+              debugMetrics
+                  .responseDebugString(nowMillis)
+                  .orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING));
         }
+
         // If the stream was stopped due to a resource exhausted error then we 
are throttled.
-        if (status != null && status.getCode() == 
Status.Code.RESOURCE_EXHAUSTED) {
+        if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to