m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1801515609


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -244,21 +308,26 @@ public final void appendSummaryHtml(PrintWriter writer) {
       writer.format(", %dms backoff remaining", sleepLeft);
     }
     writer.format(
-        ", current stream is %dms old, last send %dms, last response %dms, 
closed: %s",
+        ", current stream is %dms old, last send %dms, last response %dms, 
closed: %s, "
+            + "isShutdown: %s, shutdown time: %s",
         debugDuration(nowMs, startTimeMs.get()),
         debugDuration(nowMs, lastSendTimeMs.get()),
         debugDuration(nowMs, lastResponseTimeMs.get()),
-        streamClosed.get());
+        streamClosed.get(),
+        isShutdown.get(),
+        shutdownTime.get());
   }
 
-  // Don't require synchronization on stream, see the appendSummaryHtml 
comment.
+  /**
+   * @implNote Don't require synchronization on stream, see the {@link
+   *     #appendSummaryHtml(PrintWriter)} comment.
+   */
   protected abstract void appendSpecificHtml(PrintWriter writer);
 
   @Override
   public final synchronized void halfClose() {
-    // Synchronization of close and onCompleted necessary for correct retry 
logic in onNewStream.

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java:
##########
@@ -47,16 +53,21 @@ public interface WindmillStream {
   Instant startTime();
 
   /**
-   * Shutdown the stream. There should be no further interactions with the 
stream once this has been
-   * called.
+   * Shuts down the stream. No further interactions should be made with the 
stream, and the stream
+   * will no longer try to connect internally. Any pending retries or 
in-flight requests will be
+   * cancelled and all responses dropped and considered invalid.
    */
   void shutdown();
 
   /** Handle representing a stream of GetWork responses. */
   @ThreadSafe
   interface GetWorkStream extends WindmillStream {
     /** Adjusts the {@link GetWorkBudget} for the stream. */
-    void adjustBudget(long itemsDelta, long bytesDelta);
+    void adjustBudget(long newItems, long newBytes);

Review Comment:
   rebased onto https://github.com/apache/beam/pull/32775



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to