m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1824903706
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -49,46 +44,64 @@
* stream if it is broken. Subclasses are responsible for retrying requests
that have been lost on a
* broken stream.
*
- * <p>Subclasses should override onResponse to handle responses from the
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as
sending headers or
- * retrying requests.
+ * <p>Subclasses should override {@link #onResponse(ResponseT)} to handle
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new
stream is created,
+ * such as sending headers or retrying requests.
*
- * <p>send and startStream should not be called from onResponse; use
executor() instead.
+ * <p>{@link #send(RequestT)} and {@link #startStream()} should not be called
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
*
* <p>Synchronization on this is used to synchronize the gRpc stream state and
internal data
* structures. Since grpc channel operations may block, synchronization on
this stream may also
* block. This is generally not a problem since streams are used in a
single-threaded manner.
* However, some accessors used for status page and other debugging need to
take care not to require
* synchronizing on this.
+ *
+ * <p>{@link #start()} and {@link #shutdown()} are called once in the lifetime
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be
restarted/reused.
*/
public abstract class AbstractWindmillStream<RequestT, ResponseT> implements
WindmillStream {
- public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
// Default gRPC streams to 2MB chunks, which has shown to be a large enough
chunk size to reduce
// per-chunk overhead, and small enough that we can still perform granular
flow-control.
protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractWindmillStream.class);
- protected final AtomicBoolean clientClosed;
- private final AtomicBoolean isShutdown;
- private final AtomicLong lastSendTimeMs;
- private final Executor executor;
+ // Indicates that the logical stream has been half-closed and is waiting for
clean server
+ // shutdown.
+ private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+ protected final Sleeper sleeper;
+
+ /**
+ * Used to guard {@link #start()} and {@link #shutdown()} behavior.
+ *
+ * @implNote Do not hold when performing IO. If also locking on {@code this}
in the same context,
+ * should acquire shutdownLock first to prevent deadlocks.
+ */
+ protected final Object shutdownLock = new Object();
+
+ private final Logger logger;
+ private final ExecutorService executor;
private final BackOff backoff;
- private final AtomicLong startTimeMs;
- private final AtomicLong lastResponseTimeMs;
- private final AtomicInteger errorCount;
- private final AtomicReference<String> lastError;
- private final AtomicReference<DateTime> lastErrorTime;
- private final AtomicLong sleepUntil;
private final CountDownLatch finishLatch;
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
private final int logEveryNStreamFailures;
- private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
- // Indicates if the current stream in requestObserver is closed by calling
close() method
- private final AtomicBoolean streamClosed;
private final String backendWorkerToken;
- private @Nullable StreamObserver<RequestT> requestObserver;
+ private final ResettableStreamObserver<RequestT> requestObserver;
+ private final StreamDebugMetrics debugMetrics;
+ protected volatile boolean clientClosed;
+
+ /**
+ * Indicates if the current {@link ResettableStreamObserver} was closed by
calling {@link
+ * #halfClose()}. Separate from {@link #clientClosed} as this is specific to
the requestObserver
+ * and is initially false on retry.
+ */
+ @GuardedBy("this")
+ private boolean streamClosed;
+
+ private volatile boolean isShutdown;
+ private volatile boolean started;
Review Comment:
done
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -49,46 +44,64 @@
* stream if it is broken. Subclasses are responsible for retrying requests
that have been lost on a
* broken stream.
*
- * <p>Subclasses should override onResponse to handle responses from the
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as
sending headers or
- * retrying requests.
+ * <p>Subclasses should override {@link #onResponse(ResponseT)} to handle
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new
stream is created,
+ * such as sending headers or retrying requests.
*
- * <p>send and startStream should not be called from onResponse; use
executor() instead.
+ * <p>{@link #send(RequestT)} and {@link #startStream()} should not be called
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
*
* <p>Synchronization on this is used to synchronize the gRpc stream state and
internal data
* structures. Since grpc channel operations may block, synchronization on
this stream may also
* block. This is generally not a problem since streams are used in a
single-threaded manner.
* However, some accessors used for status page and other debugging need to
take care not to require
* synchronizing on this.
+ *
+ * <p>{@link #start()} and {@link #shutdown()} are called once in the lifetime
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be
restarted/reused.
*/
public abstract class AbstractWindmillStream<RequestT, ResponseT> implements
WindmillStream {
- public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
// Default gRPC streams to 2MB chunks, which has shown to be a large enough
chunk size to reduce
// per-chunk overhead, and small enough that we can still perform granular
flow-control.
protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractWindmillStream.class);
- protected final AtomicBoolean clientClosed;
- private final AtomicBoolean isShutdown;
- private final AtomicLong lastSendTimeMs;
- private final Executor executor;
+ // Indicates that the logical stream has been half-closed and is waiting for
clean server
+ // shutdown.
+ private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+ protected final Sleeper sleeper;
+
+ /**
+ * Used to guard {@link #start()} and {@link #shutdown()} behavior.
+ *
+ * @implNote Do not hold when performing IO. If also locking on {@code this}
in the same context,
+ * should acquire shutdownLock first to prevent deadlocks.
+ */
+ protected final Object shutdownLock = new Object();
+
+ private final Logger logger;
+ private final ExecutorService executor;
private final BackOff backoff;
- private final AtomicLong startTimeMs;
- private final AtomicLong lastResponseTimeMs;
- private final AtomicInteger errorCount;
- private final AtomicReference<String> lastError;
- private final AtomicReference<DateTime> lastErrorTime;
- private final AtomicLong sleepUntil;
private final CountDownLatch finishLatch;
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
private final int logEveryNStreamFailures;
- private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
- // Indicates if the current stream in requestObserver is closed by calling
close() method
- private final AtomicBoolean streamClosed;
private final String backendWorkerToken;
- private @Nullable StreamObserver<RequestT> requestObserver;
+ private final ResettableStreamObserver<RequestT> requestObserver;
+ private final StreamDebugMetrics debugMetrics;
+ protected volatile boolean clientClosed;
+
+ /**
+ * Indicates if the current {@link ResettableStreamObserver} was closed by
calling {@link
+ * #halfClose()}. Separate from {@link #clientClosed} as this is specific to
the requestObserver
+ * and is initially false on retry.
+ */
+ @GuardedBy("this")
+ private boolean streamClosed;
+
+ private volatile boolean isShutdown;
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]