arunpandianp commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1800611909
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -131,46 +132,48 @@ private static GetWorkRequest withRequestBudget(GetWorkRequest request, GetWorkB return request.toBuilder().setMaxItems(budget.items()).setMaxBytes(budget.bytes()).build(); } - @SuppressWarnings("ReturnValueIgnored") - void startStreams() { - getWorkStream.get(); - getDataStream.get(); - commitWorkStream.get(); - workCommitter.get().start(); - // *stream.get() is all memoized in a threadsafe manner. - started.set(true); - } - - void closeAllStreams() { - // Supplier<Stream>.get() starts the stream which is an expensive operation as it initiates the - // streaming RPCs by possibly making calls over the network. Do not close the streams unless - // they have already been started. - if (started.get()) { - getWorkStream.get().shutdown(); - getDataStream.get().shutdown(); - workCommitter.get().stop(); - commitWorkStream.get().shutdown(); + synchronized void start() { + if (!started.get()) { + // Start these 3 streams in parallel since they each may perform blocking IO. + CompletableFuture.allOf( + CompletableFuture.runAsync(getWorkStream::start, streamStarter), + CompletableFuture.runAsync(getDataStream::start, streamStarter), + CompletableFuture.runAsync(commitWorkStream::start, streamStarter)) + .join(); + workCommitter.start(); + started.set(true); } } @Override - public void adjustBudget(long itemsDelta, long bytesDelta) { - getWorkBudget.set(getWorkBudget.get().apply(itemsDelta, bytesDelta)); + public synchronized void close() { if (started.get()) { - getWorkStream.get().adjustBudget(itemsDelta, bytesDelta); + getWorkStream.shutdown(); + getDataStream.shutdown(); + workCommitter.stop(); + commitWorkStream.shutdown(); } } @Override - public GetWorkBudget remainingBudget() { - return started.get() ? getWorkStream.get().remainingBudget() : getWorkBudget.get(); + public void setBudget(long items, long bytes) { + GetWorkBudget adjustment = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); + getWorkBudget.set(adjustment); + if (started.get()) { + getWorkStream.adjustBudget(adjustment); Review Comment: ```suggestion getWorkStream.setBudget(newBudget); ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -160,80 +179,125 @@ protected boolean isShutdown() { private StreamObserver<RequestT> requestObserver() { if (requestObserver == null) { throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + "requestObserver cannot be null. Missing a call to start() to initialize stream."); } return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { - lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { + if (isShutdown()) { + return; + } + if (streamClosed.get()) { throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { + lastSendTimeMs.set(Instant.now().getMillis()); + requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { + if (isShutdown()) { + logger.debug("Stream was closed or shutdown during send.", e); Review Comment: ```suggestion logger.debug("Stream was shutdown during send.", e); ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -216,12 +207,18 @@ static FanOutStreamingEngineWorkerHarness forTesting( return fanOutStreamingEngineWorkProvider; } - @SuppressWarnings("ReturnValueIgnored") + @SuppressWarnings("FutureReturnValueIgnored") Review Comment: assign the future to a variable named `unusedFuture` and remove the suppression? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -160,80 +179,125 @@ protected boolean isShutdown() { private StreamObserver<RequestT> requestObserver() { if (requestObserver == null) { throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + "requestObserver cannot be null. Missing a call to start() to initialize stream."); } return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { - lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { + if (isShutdown()) { + return; + } + if (streamClosed.get()) { throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { + lastSendTimeMs.set(Instant.now().getMillis()); + requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { + if (isShutdown()) { + logger.debug("Stream was closed or shutdown during send.", e); + return; + } + + requestObserver.onError(e); + } + } + } + + @Override + public final void start() { + if (!isShutdown.get() && started.compareAndSet(false, true)) { + // start() should only be executed once during the lifetime of the stream for idempotency and + // when shutdown() has not been called. + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { + if (isShutdown.get()) { + break; + } startTimeMs.set(Instant.now().getMillis()); lastResponseTimeMs.set(0); streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + requestObserver.reset(); onNewStream(); if (clientClosed.get()) { halfClose(); } return; } } catch (Exception e) { - LOG.error("Failed to create new stream, retrying: ", e); + logger.error("Failed to create new stream, retrying: ", e); try { long sleep = backoff.nextBackOffMillis(); sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); - } catch (InterruptedException | IOException i) { + sleeper.sleep(sleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.info( + "Interrupted during stream creation backoff. The stream will not be created."); Review Comment: log the stream name? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -187,13 +206,14 @@ protected void startThrottleTimer() { commitWorkThrottleTimer.start(); } - private void flushInternal(Map<Long, PendingRequest> requests) { + private void flushInternal(Map<Long, PendingRequest> requests) throws InterruptedException { Review Comment: it doesn't look like any of the method calls inside `flushInternal` are throwing InterruptedException. Can we remove the throws from here? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -317,29 +350,71 @@ public boolean commitWorkItem( if (!canAccept(commitRequest.getSerializedSize() + computation.length())) { return false; } - PendingRequest request = new PendingRequest(computation, commitRequest, onDone); + + PendingRequest request = PendingRequest.create(computation, commitRequest, onDone); add(idGenerator.incrementAndGet(), request); return true; } /** Flushes any pending work items to the wire. */ @Override public void flush() { - flushInternal(queue); - queuedBytes = 0; - queue.clear(); + try { + if (!isShutdown()) { + flushInternal(queue); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + queuedBytes = 0; + queue.clear(); + } } void add(long id, PendingRequest request) { - assert (canAccept(request.getBytes())); + Preconditions.checkState(canAccept(request.getBytes())); Review Comment: double checking, since `canAccept` depends on `isShutdown`, is canAccept still guaranteed to be true here? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -142,16 +142,7 @@ private FanOutStreamingEngineWorkerHarness( connections.get().windmillStreams().values(), totalGetWorkBudget); lastBudgetRefresh.set(Instant.now()); }); - this.getWorkerMetadataStream = - Suppliers.memoize( - () -> - streamFactory.createGetWorkerMetadataStream( - dispatcherClient.getWindmillMetadataServiceStubBlocking(), - getWorkerMetadataThrottleTimer, - endpoints -> - // Run this on a separate thread than the grpc stream thread. - newWorkerMetadataPublisher.submit( - () -> newWindmillEndpoints.add(endpoints)))); + this.getWorkerMetadataStream = null; Review Comment: does anything prevent us from creating the stream here and starting it in start? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -160,80 +179,125 @@ protected boolean isShutdown() { private StreamObserver<RequestT> requestObserver() { if (requestObserver == null) { throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + "requestObserver cannot be null. Missing a call to start() to initialize stream."); } return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { - lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { + if (isShutdown()) { + return; + } + if (streamClosed.get()) { throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { + lastSendTimeMs.set(Instant.now().getMillis()); + requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { + if (isShutdown()) { + logger.debug("Stream was closed or shutdown during send.", e); + return; + } + + requestObserver.onError(e); + } + } + } + + @Override + public final void start() { + if (!isShutdown.get() && started.compareAndSet(false, true)) { + // start() should only be executed once during the lifetime of the stream for idempotency and + // when shutdown() has not been called. + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { + if (isShutdown.get()) { + break; + } startTimeMs.set(Instant.now().getMillis()); lastResponseTimeMs.set(0); streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + requestObserver.reset(); onNewStream(); if (clientClosed.get()) { halfClose(); } return; Review Comment: Will it be better to do `streamRegistry.add(this);` here? and remove `streamRegistry.remove(this);`? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -244,21 +308,26 @@ public final void appendSummaryHtml(PrintWriter writer) { writer.format(", %dms backoff remaining", sleepLeft); } writer.format( - ", current stream is %dms old, last send %dms, last response %dms, closed: %s", + ", current stream is %dms old, last send %dms, last response %dms, closed: %s, " + + "isShutdown: %s, shutdown time: %s", debugDuration(nowMs, startTimeMs.get()), debugDuration(nowMs, lastSendTimeMs.get()), debugDuration(nowMs, lastResponseTimeMs.get()), - streamClosed.get()); + streamClosed.get(), + isShutdown.get(), + shutdownTime.get()); } - // Don't require synchronization on stream, see the appendSummaryHtml comment. + /** + * @implNote Don't require synchronization on stream, see the {@link + * #appendSummaryHtml(PrintWriter)} comment. + */ protected abstract void appendSpecificHtml(PrintWriter writer); @Override public final synchronized void halfClose() { - // Synchronization of close and onCompleted necessary for correct retry logic in onNewStream. Review Comment: is this not true anymore? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java: ########## @@ -47,16 +53,21 @@ public interface WindmillStream { Instant startTime(); /** - * Shutdown the stream. There should be no further interactions with the stream once this has been - * called. + * Shuts down the stream. No further interactions should be made with the stream, and the stream + * will no longer try to connect internally. Any pending retries or in-flight requests will be + * cancelled and all responses dropped and considered invalid. */ void shutdown(); /** Handle representing a stream of GetWork responses. */ @ThreadSafe interface GetWorkStream extends WindmillStream { /** Adjusts the {@link GetWorkBudget} for the stream. */ - void adjustBudget(long itemsDelta, long bytesDelta); + void adjustBudget(long newItems, long newBytes); Review Comment: ```suggestion void setBudget(long newItems, long newBytes); ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -160,80 +179,125 @@ protected boolean isShutdown() { private StreamObserver<RequestT> requestObserver() { if (requestObserver == null) { throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + "requestObserver cannot be null. Missing a call to start() to initialize stream."); } return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { - lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { + if (isShutdown()) { + return; + } + if (streamClosed.get()) { throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { + lastSendTimeMs.set(Instant.now().getMillis()); + requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { + if (isShutdown()) { + logger.debug("Stream was closed or shutdown during send.", e); + return; + } + + requestObserver.onError(e); + } + } + } + + @Override + public final void start() { + if (!isShutdown.get() && started.compareAndSet(false, true)) { Review Comment: 1. isShutdown returns false 2. A different thread calls Shutdown() and isShutdown becomes true 3. started is set to true and startStream is called Is this a valid sequence? do we need to prevent it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org