m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1801515609
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -244,21 +308,26 @@ public final void appendSummaryHtml(PrintWriter writer) { writer.format(", %dms backoff remaining", sleepLeft); } writer.format( - ", current stream is %dms old, last send %dms, last response %dms, closed: %s", + ", current stream is %dms old, last send %dms, last response %dms, closed: %s, " + + "isShutdown: %s, shutdown time: %s", debugDuration(nowMs, startTimeMs.get()), debugDuration(nowMs, lastSendTimeMs.get()), debugDuration(nowMs, lastResponseTimeMs.get()), - streamClosed.get()); + streamClosed.get(), + isShutdown.get(), + shutdownTime.get()); } - // Don't require synchronization on stream, see the appendSummaryHtml comment. + /** + * @implNote Don't require synchronization on stream, see the {@link + * #appendSummaryHtml(PrintWriter)} comment. + */ protected abstract void appendSpecificHtml(PrintWriter writer); @Override public final synchronized void halfClose() { - // Synchronization of close and onCompleted necessary for correct retry logic in onNewStream. Review Comment: done ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java: ########## @@ -47,16 +53,21 @@ public interface WindmillStream { Instant startTime(); /** - * Shutdown the stream. There should be no further interactions with the stream once this has been - * called. + * Shuts down the stream. No further interactions should be made with the stream, and the stream + * will no longer try to connect internally. Any pending retries or in-flight requests will be + * cancelled and all responses dropped and considered invalid. */ void shutdown(); /** Handle representing a stream of GetWork responses. */ @ThreadSafe interface GetWorkStream extends WindmillStream { /** Adjusts the {@link GetWorkBudget} for the stream. */ - void adjustBudget(long itemsDelta, long bytesDelta); + void adjustBudget(long newItems, long newBytes); Review Comment: rebased onto https://github.com/apache/beam/pull/32775 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org