scwhittle commented on code in PR #34367:
URL: https://github.com/apache/beam/pull/34367#discussion_r2016168525


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -88,19 +105,24 @@ private WindmillStreamSender(
         streamingEngineStreamFactory.createDirectCommitWorkStream(
             connection, 
streamingEngineThrottleTimers.commitWorkThrottleTimer());
     this.workCommitter = workCommitterFactory.apply(commitWorkStream);
-    this.getWorkStream =
-        streamingEngineStreamFactory.createDirectGetWorkStream(
-            connection,
-            withRequestBudget(getWorkRequest, getWorkBudget.get()),
-            streamingEngineThrottleTimers.getWorkThrottleTimer(),
-            FixedStreamHeartbeatSender.create(getDataStream),
-            getDataClientFactory.apply(getDataStream),
-            workCommitter,
-            workItemScheduler);
+    this.activeGetWorkStream = new AtomicReference<>();
+    this.getWorkStreamFactory =
+        () ->
+            streamingEngineStreamFactory.createDirectGetWorkStream(
+                connection,
+                withRequestBudget(getWorkRequest, getWorkBudget.get()),
+                streamingEngineThrottleTimers.getWorkThrottleTimer(),
+                FixedStreamHeartbeatSender.create(getDataStream),
+                getDataClientFactory.apply(getDataStream),
+                workCommitter,
+                workItemScheduler);
     // 3 threads, 1 for each stream type (GetWork, GetData, CommitWork).
     this.streamStarter =
         Executors.newFixedThreadPool(

Review Comment:
   how about newCachedThreadPool? it seems like 2 of these threads are just for 
start() and then won't be used and we might as well have them go away.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -88,19 +105,24 @@ private WindmillStreamSender(
         streamingEngineStreamFactory.createDirectCommitWorkStream(
             connection, 
streamingEngineThrottleTimers.commitWorkThrottleTimer());
     this.workCommitter = workCommitterFactory.apply(commitWorkStream);
-    this.getWorkStream =
-        streamingEngineStreamFactory.createDirectGetWorkStream(
-            connection,
-            withRequestBudget(getWorkRequest, getWorkBudget.get()),
-            streamingEngineThrottleTimers.getWorkThrottleTimer(),
-            FixedStreamHeartbeatSender.create(getDataStream),
-            getDataClientFactory.apply(getDataStream),
-            workCommitter,
-            workItemScheduler);
+    this.activeGetWorkStream = new AtomicReference<>();
+    this.getWorkStreamFactory =
+        () ->
+            streamingEngineStreamFactory.createDirectGetWorkStream(
+                connection,
+                withRequestBudget(getWorkRequest, getWorkBudget.get()),
+                streamingEngineThrottleTimers.getWorkThrottleTimer(),

Review Comment:
   should we just use the same throttle timer, heartbeat sender, and 
getdataclientfactory for each stream?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,15 +64,26 @@
 @Internal
 @ThreadSafe
 final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender 
{
-  private static final String STREAM_STARTER_THREAD_NAME = 
"StartWindmillStreamThread-%d";
-  private final AtomicBoolean started;
-  private final AtomicReference<GetWorkBudget> getWorkBudget;
-  private final GetWorkStream getWorkStream;
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillStreamSender.class);
+  private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = 
"WindmillStreamManagerThread";
+  private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
+
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
   private final GetDataStream getDataStream;
   private final CommitWorkStream commitWorkStream;
   private final WorkCommitter workCommitter;
   private final StreamingEngineThrottleTimers streamingEngineThrottleTimers;
   private final ExecutorService streamStarter;
+  private final String backendWorkerToken;
+
+  @GuardedBy("activeGetWorkStream")
+  private final AtomicReference<GetWorkStream> activeGetWorkStream;
+
+  @GuardedBy("activeGetWorkStream")
+  private final AtomicReference<GetWorkBudget> getWorkBudget;
+
+  @GuardedBy("activeGetWorkStream")

Review Comment:
   this doesn't seem like it needs to be guarded? any reason it has to be 
serially called?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -165,4 +201,39 @@ long getAndResetThrottleTime() {
   long getCurrentActiveCommitBytes() {
     return workCommitter.currentActiveCommitBytes();
   }
+
+  /**
+   * Creates, starts, and gracefully terminates {@link GetWorkStream} before 
the clientside deadline
+   * to prevent {@link 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
+   * If at any point the server closes the stream, reconnects immediately.
+   */
+  private void getWorkStreamLoop(CountDownLatch waitForInitialStream) {
+    @Nullable GetWorkStream newStream = null;
+    while (isRunning.get()) {
+      synchronized (activeGetWorkStream) {
+        newStream = getWorkStreamFactory.get();

Review Comment:
   maybe this coudl be outside the synchronized block?  we can create the new 
stream before swapping it in as the active one.  We could create the new stream 
with no budget and just set it's budget once the old stream is half-closed.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -126,35 +148,49 @@ private static GetWorkRequest 
withRequestBudget(GetWorkRequest request, GetWorkB
   }
 
   synchronized void start() {
-    if (!started.get()) {
+    if (isRunning.compareAndSet(false, true)) {
       checkState(!streamStarter.isShutdown(), "WindmillStreamSender has 
already been shutdown.");
-
       // Start these 3 streams in parallel since they each may perform 
blocking IO.
+      CountDownLatch waitForInitialStream = new CountDownLatch(1);
+      streamStarter.execute(() -> getWorkStreamLoop(waitForInitialStream));
       CompletableFuture.allOf(
-              CompletableFuture.runAsync(getWorkStream::start, streamStarter),
               CompletableFuture.runAsync(getDataStream::start, streamStarter),
               CompletableFuture.runAsync(commitWorkStream::start, 
streamStarter))
           .join();
+      try {
+        waitForInitialStream.await();
+      } catch (InterruptedException e) {
+        close();
+        LOG.error("GetWorkStream to {} was never able to start.", 
backendWorkerToken);
+        throw new IllegalStateException("GetWorkStream unable to start 
aborting.", e);
+      }
       workCommitter.start();
-      started.set(true);
     }
   }
 
   @Override
   public synchronized void close() {
+    isRunning.set(false);
     streamStarter.shutdownNow();
-    getWorkStream.shutdown();
     getDataStream.shutdown();
     workCommitter.stop();
     commitWorkStream.shutdown();
   }
 
   @Override
   public void setBudget(long items, long bytes) {
-    GetWorkBudget budget = 
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
-    getWorkBudget.set(budget);
-    if (started.get()) {
-      getWorkStream.setBudget(budget);
+    synchronized (activeGetWorkStream) {
+      GetWorkBudget budget = 
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();

Review Comment:
   this build and setting on the atomic, could be outside synchronized block



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,15 +64,26 @@
 @Internal
 @ThreadSafe
 final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender 
{
-  private static final String STREAM_STARTER_THREAD_NAME = 
"StartWindmillStreamThread-%d";
-  private final AtomicBoolean started;
-  private final AtomicReference<GetWorkBudget> getWorkBudget;
-  private final GetWorkStream getWorkStream;
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillStreamSender.class);
+  private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = 
"WindmillStreamManagerThread";
+  private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
+
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
   private final GetDataStream getDataStream;
   private final CommitWorkStream commitWorkStream;
   private final WorkCommitter workCommitter;
   private final StreamingEngineThrottleTimers streamingEngineThrottleTimers;
   private final ExecutorService streamStarter;
+  private final String backendWorkerToken;
+
+  @GuardedBy("activeGetWorkStream")
+  private final AtomicReference<GetWorkStream> activeGetWorkStream;

Review Comment:
   if this is guarded I don't think it needs to be an atomic.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -126,35 +148,49 @@ private static GetWorkRequest 
withRequestBudget(GetWorkRequest request, GetWorkB
   }
 
   synchronized void start() {
-    if (!started.get()) {
+    if (isRunning.compareAndSet(false, true)) {
       checkState(!streamStarter.isShutdown(), "WindmillStreamSender has 
already been shutdown.");
-
       // Start these 3 streams in parallel since they each may perform 
blocking IO.
+      CountDownLatch waitForInitialStream = new CountDownLatch(1);
+      streamStarter.execute(() -> getWorkStreamLoop(waitForInitialStream));
       CompletableFuture.allOf(
-              CompletableFuture.runAsync(getWorkStream::start, streamStarter),
               CompletableFuture.runAsync(getDataStream::start, streamStarter),
               CompletableFuture.runAsync(commitWorkStream::start, 
streamStarter))
           .join();
+      try {
+        waitForInitialStream.await();
+      } catch (InterruptedException e) {
+        close();
+        LOG.error("GetWorkStream to {} was never able to start.", 
backendWorkerToken);
+        throw new IllegalStateException("GetWorkStream unable to start 
aborting.", e);
+      }
       workCommitter.start();
-      started.set(true);
     }
   }
 
   @Override
   public synchronized void close() {
+    isRunning.set(false);
     streamStarter.shutdownNow();
-    getWorkStream.shutdown();
     getDataStream.shutdown();
     workCommitter.stop();
     commitWorkStream.shutdown();
   }
 
   @Override
   public void setBudget(long items, long bytes) {
-    GetWorkBudget budget = 
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
-    getWorkBudget.set(budget);
-    if (started.get()) {
-      getWorkStream.setBudget(budget);
+    synchronized (activeGetWorkStream) {
+      GetWorkBudget budget = 
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
+      getWorkBudget.set(budget);
+      if (isRunning.get()) {

Review Comment:
   remove running check and just use the null below? seems like if 
activeGetWorkStream is set it is ok to call and it's one less interleaving to 
think about



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,15 +64,26 @@
 @Internal
 @ThreadSafe
 final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender 
{
-  private static final String STREAM_STARTER_THREAD_NAME = 
"StartWindmillStreamThread-%d";
-  private final AtomicBoolean started;
-  private final AtomicReference<GetWorkBudget> getWorkBudget;
-  private final GetWorkStream getWorkStream;
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillStreamSender.class);
+  private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = 
"WindmillStreamManagerThread";
+  private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
+
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
   private final GetDataStream getDataStream;
   private final CommitWorkStream commitWorkStream;
   private final WorkCommitter workCommitter;
   private final StreamingEngineThrottleTimers streamingEngineThrottleTimers;
   private final ExecutorService streamStarter;
+  private final String backendWorkerToken;
+
+  @GuardedBy("activeGetWorkStream")
+  private final AtomicReference<GetWorkStream> activeGetWorkStream;
+
+  @GuardedBy("activeGetWorkStream")
+  private final AtomicReference<GetWorkBudget> getWorkBudget;

Review Comment:
   ditto if this is guarded probably doesn't need to be atomic



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -165,4 +201,39 @@ long getAndResetThrottleTime() {
   long getCurrentActiveCommitBytes() {
     return workCommitter.currentActiveCommitBytes();
   }
+
+  /**
+   * Creates, starts, and gracefully terminates {@link GetWorkStream} before 
the clientside deadline
+   * to prevent {@link 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
+   * If at any point the server closes the stream, reconnects immediately.
+   */
+  private void getWorkStreamLoop(CountDownLatch waitForInitialStream) {
+    @Nullable GetWorkStream newStream = null;
+    while (isRunning.get()) {
+      synchronized (activeGetWorkStream) {
+        newStream = getWorkStreamFactory.get();
+        newStream.start();
+        waitForInitialStream.countDown();
+        activeGetWorkStream.set(newStream);
+      }
+      try {
+        // Try to gracefully terminate the stream.
+        if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES, 
TimeUnit.MINUTES)) {
+          newStream.halfClose();
+        }
+
+        // If graceful termination is unsuccessful, forcefully shutdown.
+        if (!newStream.awaitTermination(30, TimeUnit.SECONDS)) {
+          newStream.shutdown();
+        }
+
+      } catch (InterruptedException e) {
+        // continue until !isRunning.

Review Comment:
   Could we instead force that isRunning is set to false here? we don't expect 
interruptions to happen for any other reason.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,15 +64,26 @@
 @Internal
 @ThreadSafe
 final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender 
{
-  private static final String STREAM_STARTER_THREAD_NAME = 
"StartWindmillStreamThread-%d";
-  private final AtomicBoolean started;
-  private final AtomicReference<GetWorkBudget> getWorkBudget;
-  private final GetWorkStream getWorkStream;
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillStreamSender.class);
+  private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = 
"WindmillStreamManagerThread";
+  private static final int GET_WORK_STREAM_TTL_MINUTES = 45;

Review Comment:
   could note that this needs to be less than the deadline in the other file



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -165,4 +201,39 @@ long getAndResetThrottleTime() {
   long getCurrentActiveCommitBytes() {
     return workCommitter.currentActiveCommitBytes();
   }
+
+  /**
+   * Creates, starts, and gracefully terminates {@link GetWorkStream} before 
the clientside deadline
+   * to prevent {@link 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
+   * If at any point the server closes the stream, reconnects immediately.
+   */
+  private void getWorkStreamLoop(CountDownLatch waitForInitialStream) {
+    @Nullable GetWorkStream newStream = null;
+    while (isRunning.get()) {
+      synchronized (activeGetWorkStream) {
+        newStream = getWorkStreamFactory.get();
+        newStream.start();
+        waitForInitialStream.countDown();
+        activeGetWorkStream.set(newStream);
+      }
+      try {
+        // Try to gracefully terminate the stream.
+        if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES, 
TimeUnit.MINUTES)) {
+          newStream.halfClose();
+        }
+
+        // If graceful termination is unsuccessful, forcefully shutdown.
+        if (!newStream.awaitTermination(30, TimeUnit.SECONDS)) {

Review Comment:
   maybe better to increase this? If we lose getwork responses then windmill 
worker has to retry, if we can get them to flush with a little more time that 
seems fine.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java:
##########
@@ -189,6 +189,12 @@ private static <T extends AbstractStub<T>> T 
withDefaultDeadline(T stub) {
     return stub.withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS, 
TimeUnit.SECONDS);
   }
 
+  private static <T extends AbstractStub<T>> T withLongDeadline(T stub) {

Review Comment:
   withDirectPathDeadline?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -165,4 +201,39 @@ long getAndResetThrottleTime() {
   long getCurrentActiveCommitBytes() {
     return workCommitter.currentActiveCommitBytes();
   }
+
+  /**
+   * Creates, starts, and gracefully terminates {@link GetWorkStream} before 
the clientside deadline
+   * to prevent {@link 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
+   * If at any point the server closes the stream, reconnects immediately.
+   */
+  private void getWorkStreamLoop(CountDownLatch waitForInitialStream) {
+    @Nullable GetWorkStream newStream = null;
+    while (isRunning.get()) {
+      synchronized (activeGetWorkStream) {
+        newStream = getWorkStreamFactory.get();
+        newStream.start();
+        waitForInitialStream.countDown();
+        activeGetWorkStream.set(newStream);
+      }
+      try {
+        // Try to gracefully terminate the stream.
+        if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES, 
TimeUnit.MINUTES)) {
+          newStream.halfClose();

Review Comment:
   I think as we half-close here we probably want to create a new stream to 
take over.
   That way we aren't idle while we're waiting for the termination.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -217,11 +217,11 @@ public void terminate(Throwable terminationException) {
   }
 
   private String constructStreamCancelledErrorMessage(long totalSecondsWaited) 
{
-    return deadlineSeconds > 0
+    return inactivityTimeout > 0
         ? "Waited "
             + totalSecondsWaited
             + "s which exceeds given deadline of "
-            + deadlineSeconds
+            + inactivityTimeout
             + "s for the outboundObserver to become ready meaning "
             + "that the stream deadline was not respected."

Review Comment:
   this seems like the wrong message if it isn't the stream deadline
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to