m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1801515609
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -244,21 +308,26 @@ public final void appendSummaryHtml(PrintWriter writer) {
writer.format(", %dms backoff remaining", sleepLeft);
}
writer.format(
- ", current stream is %dms old, last send %dms, last response %dms,
closed: %s",
+ ", current stream is %dms old, last send %dms, last response %dms,
closed: %s, "
+ + "isShutdown: %s, shutdown time: %s",
debugDuration(nowMs, startTimeMs.get()),
debugDuration(nowMs, lastSendTimeMs.get()),
debugDuration(nowMs, lastResponseTimeMs.get()),
- streamClosed.get());
+ streamClosed.get(),
+ isShutdown.get(),
+ shutdownTime.get());
}
- // Don't require synchronization on stream, see the appendSummaryHtml
comment.
+ /**
+ * @implNote Don't require synchronization on stream, see the {@link
+ * #appendSummaryHtml(PrintWriter)} comment.
+ */
protected abstract void appendSpecificHtml(PrintWriter writer);
@Override
public final synchronized void halfClose() {
- // Synchronization of close and onCompleted necessary for correct retry
logic in onNewStream.
Review Comment:
done
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java:
##########
@@ -47,16 +53,21 @@ public interface WindmillStream {
Instant startTime();
/**
- * Shutdown the stream. There should be no further interactions with the
stream once this has been
- * called.
+ * Shuts down the stream. No further interactions should be made with the
stream, and the stream
+ * will no longer try to connect internally. Any pending retries or
in-flight requests will be
+ * cancelled and all responses dropped and considered invalid.
*/
void shutdown();
/** Handle representing a stream of GetWork responses. */
@ThreadSafe
interface GetWorkStream extends WindmillStream {
/** Adjusts the {@link GetWorkBudget} for the stream. */
- void adjustBudget(long itemsDelta, long bytesDelta);
+ void adjustBudget(long newItems, long newBytes);
Review Comment:
rebased onto https://github.com/apache/beam/pull/32775
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]