arunpandianp commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1800611909


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -131,46 +132,48 @@ private static GetWorkRequest 
withRequestBudget(GetWorkRequest request, GetWorkB
     return 
request.toBuilder().setMaxItems(budget.items()).setMaxBytes(budget.bytes()).build();
   }
 
-  @SuppressWarnings("ReturnValueIgnored")
-  void startStreams() {
-    getWorkStream.get();
-    getDataStream.get();
-    commitWorkStream.get();
-    workCommitter.get().start();
-    // *stream.get() is all memoized in a threadsafe manner.
-    started.set(true);
-  }
-
-  void closeAllStreams() {
-    // Supplier<Stream>.get() starts the stream which is an expensive 
operation as it initiates the
-    // streaming RPCs by possibly making calls over the network. Do not close 
the streams unless
-    // they have already been started.
-    if (started.get()) {
-      getWorkStream.get().shutdown();
-      getDataStream.get().shutdown();
-      workCommitter.get().stop();
-      commitWorkStream.get().shutdown();
+  synchronized void start() {
+    if (!started.get()) {
+      // Start these 3 streams in parallel since they each may perform 
blocking IO.
+      CompletableFuture.allOf(
+              CompletableFuture.runAsync(getWorkStream::start, streamStarter),
+              CompletableFuture.runAsync(getDataStream::start, streamStarter),
+              CompletableFuture.runAsync(commitWorkStream::start, 
streamStarter))
+          .join();
+      workCommitter.start();
+      started.set(true);
     }
   }
 
   @Override
-  public void adjustBudget(long itemsDelta, long bytesDelta) {
-    getWorkBudget.set(getWorkBudget.get().apply(itemsDelta, bytesDelta));
+  public synchronized void close() {
     if (started.get()) {
-      getWorkStream.get().adjustBudget(itemsDelta, bytesDelta);
+      getWorkStream.shutdown();
+      getDataStream.shutdown();
+      workCommitter.stop();
+      commitWorkStream.shutdown();
     }
   }
 
   @Override
-  public GetWorkBudget remainingBudget() {
-    return started.get() ? getWorkStream.get().remainingBudget() : 
getWorkBudget.get();
+  public void setBudget(long items, long bytes) {
+    GetWorkBudget adjustment = 
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
+    getWorkBudget.set(adjustment);
+    if (started.get()) {
+      getWorkStream.adjustBudget(adjustment);

Review Comment:
   ```suggestion
         getWorkStream.setBudget(newBudget);
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
   private StreamObserver<RequestT> requestObserver() {
     if (requestObserver == null) {
       throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+          "requestObserver cannot be null. Missing a call to start() to 
initialize stream.");
     }
 
     return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
+      if (isShutdown()) {
+        return;
+      }
+
       if (streamClosed.get()) {
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      try {
+        lastSendTimeMs.set(Instant.now().getMillis());
+        requestObserver.onNext(request);
+      } catch (StreamObserverCancelledException e) {
+        if (isShutdown()) {
+          logger.debug("Stream was closed or shutdown during send.", e);

Review Comment:
   ```suggestion
             logger.debug("Stream was shutdown during send.", e);
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -216,12 +207,18 @@ static FanOutStreamingEngineWorkerHarness forTesting(
     return fanOutStreamingEngineWorkProvider;
   }
 
-  @SuppressWarnings("ReturnValueIgnored")
+  @SuppressWarnings("FutureReturnValueIgnored")

Review Comment:
   assign the future to a variable named `unusedFuture`  and remove the 
suppression?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
   private StreamObserver<RequestT> requestObserver() {
     if (requestObserver == null) {
       throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+          "requestObserver cannot be null. Missing a call to start() to 
initialize stream.");
     }
 
     return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
+      if (isShutdown()) {
+        return;
+      }
+
       if (streamClosed.get()) {
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      try {
+        lastSendTimeMs.set(Instant.now().getMillis());
+        requestObserver.onNext(request);
+      } catch (StreamObserverCancelledException e) {
+        if (isShutdown()) {
+          logger.debug("Stream was closed or shutdown during send.", e);
+          return;
+        }
+
+        requestObserver.onError(e);
+      }
+    }
+  }
+
+  @Override
+  public final void start() {
+    if (!isShutdown.get() && started.compareAndSet(false, true)) {
+      // start() should only be executed once during the lifetime of the 
stream for idempotency and
+      // when shutdown() has not been called.
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
+          if (isShutdown.get()) {
+            break;
+          }
           startTimeMs.set(Instant.now().getMillis());
           lastResponseTimeMs.set(0);
           streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          requestObserver.reset();
           onNewStream();
           if (clientClosed.get()) {
             halfClose();
           }
           return;
         }
       } catch (Exception e) {
-        LOG.error("Failed to create new stream, retrying: ", e);
+        logger.error("Failed to create new stream, retrying: ", e);
         try {
           long sleep = backoff.nextBackOffMillis();
           sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException | IOException i) {
+          sleeper.sleep(sleep);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          logger.info(
+              "Interrupted during stream creation backoff. The stream will not 
be created.");

Review Comment:
   log the stream name?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -187,13 +206,14 @@ protected void startThrottleTimer() {
     commitWorkThrottleTimer.start();
   }
 
-  private void flushInternal(Map<Long, PendingRequest> requests) {
+  private void flushInternal(Map<Long, PendingRequest> requests) throws 
InterruptedException {

Review Comment:
   it doesn't look like any of the method calls inside `flushInternal` are 
throwing InterruptedException. Can we remove the throws from here?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -317,29 +350,71 @@ public boolean commitWorkItem(
       if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
         return false;
       }
-      PendingRequest request = new PendingRequest(computation, commitRequest, 
onDone);
+
+      PendingRequest request = PendingRequest.create(computation, 
commitRequest, onDone);
       add(idGenerator.incrementAndGet(), request);
       return true;
     }
 
     /** Flushes any pending work items to the wire. */
     @Override
     public void flush() {
-      flushInternal(queue);
-      queuedBytes = 0;
-      queue.clear();
+      try {
+        if (!isShutdown()) {
+          flushInternal(queue);
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } finally {
+        queuedBytes = 0;
+        queue.clear();
+      }
     }
 
     void add(long id, PendingRequest request) {
-      assert (canAccept(request.getBytes()));
+      Preconditions.checkState(canAccept(request.getBytes()));

Review Comment:
   double checking, since `canAccept` depends on `isShutdown`, is canAccept 
still guaranteed to be true here?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -142,16 +142,7 @@ private FanOutStreamingEngineWorkerHarness(
                   connections.get().windmillStreams().values(), 
totalGetWorkBudget);
               lastBudgetRefresh.set(Instant.now());
             });
-    this.getWorkerMetadataStream =
-        Suppliers.memoize(
-            () ->
-                streamFactory.createGetWorkerMetadataStream(
-                    dispatcherClient.getWindmillMetadataServiceStubBlocking(),
-                    getWorkerMetadataThrottleTimer,
-                    endpoints ->
-                        // Run this on a separate thread than the grpc stream 
thread.
-                        newWorkerMetadataPublisher.submit(
-                            () -> newWindmillEndpoints.add(endpoints))));
+    this.getWorkerMetadataStream = null;

Review Comment:
   does anything prevent us from creating the stream here and starting it in 
start?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
   private StreamObserver<RequestT> requestObserver() {
     if (requestObserver == null) {
       throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+          "requestObserver cannot be null. Missing a call to start() to 
initialize stream.");
     }
 
     return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
+      if (isShutdown()) {
+        return;
+      }
+
       if (streamClosed.get()) {
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      try {
+        lastSendTimeMs.set(Instant.now().getMillis());
+        requestObserver.onNext(request);
+      } catch (StreamObserverCancelledException e) {
+        if (isShutdown()) {
+          logger.debug("Stream was closed or shutdown during send.", e);
+          return;
+        }
+
+        requestObserver.onError(e);
+      }
+    }
+  }
+
+  @Override
+  public final void start() {
+    if (!isShutdown.get() && started.compareAndSet(false, true)) {
+      // start() should only be executed once during the lifetime of the 
stream for idempotency and
+      // when shutdown() has not been called.
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
+          if (isShutdown.get()) {
+            break;
+          }
           startTimeMs.set(Instant.now().getMillis());
           lastResponseTimeMs.set(0);
           streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          requestObserver.reset();
           onNewStream();
           if (clientClosed.get()) {
             halfClose();
           }
           return;

Review Comment:
   Will it be better to do `streamRegistry.add(this);` here? and remove 
`streamRegistry.remove(this);`?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -244,21 +308,26 @@ public final void appendSummaryHtml(PrintWriter writer) {
       writer.format(", %dms backoff remaining", sleepLeft);
     }
     writer.format(
-        ", current stream is %dms old, last send %dms, last response %dms, 
closed: %s",
+        ", current stream is %dms old, last send %dms, last response %dms, 
closed: %s, "
+            + "isShutdown: %s, shutdown time: %s",
         debugDuration(nowMs, startTimeMs.get()),
         debugDuration(nowMs, lastSendTimeMs.get()),
         debugDuration(nowMs, lastResponseTimeMs.get()),
-        streamClosed.get());
+        streamClosed.get(),
+        isShutdown.get(),
+        shutdownTime.get());
   }
 
-  // Don't require synchronization on stream, see the appendSummaryHtml 
comment.
+  /**
+   * @implNote Don't require synchronization on stream, see the {@link
+   *     #appendSummaryHtml(PrintWriter)} comment.
+   */
   protected abstract void appendSpecificHtml(PrintWriter writer);
 
   @Override
   public final synchronized void halfClose() {
-    // Synchronization of close and onCompleted necessary for correct retry 
logic in onNewStream.

Review Comment:
   is this not true anymore?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java:
##########
@@ -47,16 +53,21 @@ public interface WindmillStream {
   Instant startTime();
 
   /**
-   * Shutdown the stream. There should be no further interactions with the 
stream once this has been
-   * called.
+   * Shuts down the stream. No further interactions should be made with the 
stream, and the stream
+   * will no longer try to connect internally. Any pending retries or 
in-flight requests will be
+   * cancelled and all responses dropped and considered invalid.
    */
   void shutdown();
 
   /** Handle representing a stream of GetWork responses. */
   @ThreadSafe
   interface GetWorkStream extends WindmillStream {
     /** Adjusts the {@link GetWorkBudget} for the stream. */
-    void adjustBudget(long itemsDelta, long bytesDelta);
+    void adjustBudget(long newItems, long newBytes);

Review Comment:
   ```suggestion
       void setBudget(long newItems, long newBytes);
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
   private StreamObserver<RequestT> requestObserver() {
     if (requestObserver == null) {
       throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+          "requestObserver cannot be null. Missing a call to start() to 
initialize stream.");
     }
 
     return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
+      if (isShutdown()) {
+        return;
+      }
+
       if (streamClosed.get()) {
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      try {
+        lastSendTimeMs.set(Instant.now().getMillis());
+        requestObserver.onNext(request);
+      } catch (StreamObserverCancelledException e) {
+        if (isShutdown()) {
+          logger.debug("Stream was closed or shutdown during send.", e);
+          return;
+        }
+
+        requestObserver.onError(e);
+      }
+    }
+  }
+
+  @Override
+  public final void start() {
+    if (!isShutdown.get() && started.compareAndSet(false, true)) {

Review Comment:
   1. isShutdown returns false
   2. A different thread calls Shutdown() and isShutdown becomes true
   3. started is set to true and startStream is called
   
   Is this a valid sequence? do we need to prevent it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to