m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1806042439
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
private StreamObserver<RequestT> requestObserver() {
if (requestObserver == null) {
throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ "requestObserver cannot be null. Missing a call to start() to
initialize stream.");
}
return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
+ if (isShutdown()) {
+ return;
+ }
+
if (streamClosed.get()) {
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ try {
+ lastSendTimeMs.set(Instant.now().getMillis());
+ requestObserver.onNext(request);
+ } catch (StreamObserverCancelledException e) {
+ if (isShutdown()) {
+ logger.debug("Stream was closed or shutdown during send.", e);
+ return;
+ }
+
+ requestObserver.onError(e);
+ }
+ }
+ }
+
+ @Override
+ public final void start() {
+ if (!isShutdown.get() && started.compareAndSet(false, true)) {
+ // start() should only be executed once during the lifetime of the
stream for idempotency and
+ // when shutdown() has not been called.
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
+ if (isShutdown.get()) {
+ break;
+ }
startTimeMs.set(Instant.now().getMillis());
lastResponseTimeMs.set(0);
streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ requestObserver.reset();
onNewStream();
if (clientClosed.get()) {
halfClose();
}
return;
Review Comment:
the issue tho is we want to show that once we enter this loop we are
attempting to create the stream since `onNewStream()` for all of the streams
performs a blocking call that sends a header. If there are issues here
(sometimes initially when the stream starts up there are), we want the stream
to still be in the registry so that the errors/stream issues starting up show
up in debug capture.
if we do not add it to the streamRegistry here and add it after in the happy
path, it would be silently spinning until the header is successfully sent..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]