This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 88ada9dfee3 fix silent failures in dispatch loop from stalling the 
pipeline (#32922)
88ada9dfee3 is described below

commit 88ada9dfee3c4602ff62b0f6ebdded29518760c9
Author: martin trieu <[email protected]>
AuthorDate: Wed Oct 30 04:19:36 2024 -0600

    fix silent failures in dispatch loop from stalling the pipeline (#32922)
    
    * use ExecutorService instead of ScheduledExecutorService which swallows 
exceptions into futures that were not examined
    
    Co-authored-by: Arun Pandian <[email protected]>
---
 .../worker/DataflowWorkerHarnessHelper.java        |   4 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |  14 ++-
 .../worker/WorkerUncaughtExceptionHandler.java     |  10 +-
 .../FanOutStreamingEngineWorkerHarness.java        |   2 +-
 .../harness/SingleSourceWorkerHarness.java         |   2 +-
 .../commits/StreamingApplianceWorkCommitter.java   |   2 +-
 .../work/budget/GetWorkBudgetRefresher.java        |   2 +-
 .../work/processing/StreamingWorkScheduler.java    |   2 +-
 ...treamingEngineComputationConfigFetcherTest.java |   1 -
 .../harness/SingleSourceWorkerHarnessTest.java     | 117 +++++++++++++++++++++
 10 files changed, 136 insertions(+), 20 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
index 94c894608a4..a28a5e989c8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
@@ -82,7 +82,9 @@ public final class DataflowWorkerHarnessHelper {
 
   @SuppressWarnings("Slf4jIllegalPassedClass")
   public static void initializeLogging(Class<?> workerHarnessClass) {
-    /* Set up exception handling tied to the workerHarnessClass. */
+    // Set up exception handling for raw Threads tied to the 
workerHarnessClass.
+    // Does NOT handle exceptions thrown by threads created by
+    // ScheduledExecutors/ScheduledExecutorServices.
     Thread.setDefaultUncaughtExceptionHandler(
         new 
WorkerUncaughtExceptionHandler(LoggerFactory.getLogger(workerHarnessClass)));
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index c478341c1c3..ff72add83e4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -175,7 +175,7 @@ public final class StreamingDataflowWorker {
       StreamingCounters streamingCounters,
       MemoryMonitor memoryMonitor,
       GrpcWindmillStreamFactory windmillStreamFactory,
-      Function<String, ScheduledExecutorService> executorSupplier,
+      ScheduledExecutorService activeWorkRefreshExecutorFn,
       ConcurrentMap<String, StageInfo> stageInfoMap) {
     // Register standard file systems.
     FileSystems.setDefaultPipelineOptions(options);
@@ -285,7 +285,7 @@ public final class StreamingDataflowWorker {
             stuckCommitDurationMillis,
             computationStateCache::getAllPresentComputations,
             sampler,
-            executorSupplier.apply("RefreshWork"),
+            activeWorkRefreshExecutorFn,
             getDataMetricTracker::trackHeartbeats);
 
     this.statusPages =
@@ -347,10 +347,7 @@ public final class StreamingDataflowWorker {
             .setSizeMb(options.getWorkerCacheMb())
             .setSupportMapViaMultimap(options.isEnableStreamingEngine())
             .build();
-    Function<String, ScheduledExecutorService> executorSupplier =
-        threadName ->
-            Executors.newSingleThreadScheduledExecutor(
-                new ThreadFactoryBuilder().setNameFormat(threadName).build());
+
     GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
         createGrpcwindmillStreamFactoryBuilder(options, clientId);
 
@@ -417,7 +414,8 @@ public final class StreamingDataflowWorker {
         streamingCounters,
         memoryMonitor,
         
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
-        executorSupplier,
+        Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()),
         stageInfo);
   }
 
@@ -595,7 +593,7 @@ public final class StreamingDataflowWorker {
                     
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
                 .build()
             : windmillStreamFactory.build(),
-        executorSupplier,
+        executorSupplier.apply("RefreshWork"),
         stageInfo);
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java
index 5a8e87d23ab..b4ec170099d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java
@@ -28,16 +28,16 @@ import org.slf4j.Logger;
  * This uncaught exception handler logs the {@link Throwable} to the logger, 
{@link System#err} and
  * exits the application with status code 1.
  */
-class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler {
+public final class WorkerUncaughtExceptionHandler implements 
UncaughtExceptionHandler {
+  @VisibleForTesting public static final int JVM_TERMINATED_STATUS_CODE = 1;
   private final JvmRuntime runtime;
   private final Logger logger;
 
-  WorkerUncaughtExceptionHandler(Logger logger) {
+  public WorkerUncaughtExceptionHandler(Logger logger) {
     this(JvmRuntime.INSTANCE, logger);
   }
 
-  @VisibleForTesting
-  WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) {
+  public WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) {
     this.runtime = runtime;
     this.logger = logger;
   }
@@ -59,7 +59,7 @@ class WorkerUncaughtExceptionHandler implements 
UncaughtExceptionHandler {
         t.printStackTrace(originalStdErr);
       }
     } finally {
-      runtime.halt(1);
+      runtime.halt(JVM_TERMINATED_STATUS_CODE);
     }
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
index 458cf57ca8e..3eed4ee6d83 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
@@ -137,7 +137,7 @@ public final class FanOutStreamingEngineWorkerHarness 
implements StreamingWorker
         Executors.newCachedThreadPool(
             new 
ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build());
     this.workerMetadataConsumer =
-        Executors.newSingleThreadScheduledExecutor(
+        Executors.newSingleThreadExecutor(
             new 
ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build());
     this.getWorkBudgetDistributor = getWorkBudgetDistributor;
     this.totalGetWorkBudget = totalGetWorkBudget;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
index bc93e6d89c4..06598b61c45 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
@@ -82,7 +82,7 @@ public final class SingleSourceWorkerHarness implements 
StreamingWorkerHarness {
     this.waitForResources = waitForResources;
     this.computationStateFetcher = computationStateFetcher;
     this.workProviderExecutor =
-        Executors.newSingleThreadScheduledExecutor(
+        Executors.newSingleThreadExecutor(
             new ThreadFactoryBuilder()
                 .setDaemon(true)
                 .setPriority(Thread.MIN_PRIORITY)
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
index d092ebf53fc..6889764afe6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
@@ -57,7 +57,7 @@ public final class StreamingApplianceWorkCommitter implements 
WorkCommitter {
         WeightedBoundedQueue.create(
             MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, 
commit.getSize()));
     this.commitWorkers =
-        Executors.newSingleThreadScheduledExecutor(
+        Executors.newSingleThreadExecutor(
             new ThreadFactoryBuilder()
                 .setDaemon(true)
                 .setPriority(Thread.MAX_PRIORITY)
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
index e39aa8dbc8a..d81c7d0593f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
@@ -51,7 +51,7 @@ public final class GetWorkBudgetRefresher {
       Supplier<Boolean> isBudgetRefreshPaused, Runnable redistributeBudget) {
     this.budgetRefreshTrigger = new AdvancingPhaser(1);
     this.budgetRefreshExecutor =
-        Executors.newSingleThreadScheduledExecutor(
+        Executors.newSingleThreadExecutor(
             new ThreadFactoryBuilder()
                 .setNameFormat(BUDGET_REFRESH_THREAD)
                 .setUncaughtExceptionHandler(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 9a3e6eb6b09..c74874c465a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory;
  */
 @Internal
 @ThreadSafe
-public final class StreamingWorkScheduler {
+public class StreamingWorkScheduler {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamingWorkScheduler.class);
 
   private final DataflowWorkerHarnessOptions options;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
index 9fa17588c94..3a0ae7bb208 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
@@ -47,7 +47,6 @@ import org.mockito.internal.stubbing.answers.Returns;
 
 @RunWith(JUnit4.class)
 public class StreamingEngineComputationConfigFetcherTest {
-
   private final WorkUnitClient mockDataflowServiceClient =
       mock(WorkUnitClient.class, new Returns(Optional.empty()));
   private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
new file mode 100644
index 00000000000..5a2df4baae6
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.JvmRuntime;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+public class SingleSourceWorkerHarnessTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SingleSourceWorkerHarnessTest.class);
+  private final WorkCommitter workCommitter = mock(WorkCommitter.class);
+  private final GetDataClient getDataClient = mock(GetDataClient.class);
+  private final HeartbeatSender heartbeatSender = mock(HeartbeatSender.class);
+  private final Runnable waitForResources = () -> {};
+  private final Function<String, Optional<ComputationState>> 
computationStateFetcher =
+      ignored -> Optional.empty();
+  private final StreamingWorkScheduler streamingWorkScheduler = 
mock(StreamingWorkScheduler.class);
+
+  private SingleSourceWorkerHarness createWorkerHarness(
+      SingleSourceWorkerHarness.GetWorkSender getWorkSender, JvmRuntime 
runtime) {
+    // In non-test scenario this is set in 
DataflowWorkerHarnessHelper.initializeLogging(...).
+    Thread.setDefaultUncaughtExceptionHandler(new 
WorkerUncaughtExceptionHandler(runtime, LOG));
+    return SingleSourceWorkerHarness.builder()
+        .setWorkCommitter(workCommitter)
+        .setGetDataClient(getDataClient)
+        .setHeartbeatSender(heartbeatSender)
+        .setWaitForResources(waitForResources)
+        .setStreamingWorkScheduler(streamingWorkScheduler)
+        .setComputationStateFetcher(computationStateFetcher)
+        .setGetWorkSender(getWorkSender)
+        .build();
+  }
+
+  @Test
+  public void testDispatchLoop_unexpectedFailureKillsJvm_appliance() {
+    SingleSourceWorkerHarness.GetWorkSender getWorkSender =
+        SingleSourceWorkerHarness.GetWorkSender.forAppliance(
+            () -> {
+              throw new RuntimeException("something bad happened");
+            });
+
+    FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime();
+    createWorkerHarness(getWorkSender, fakeJvmRuntime).start();
+    assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS));
+    fakeJvmRuntime.assertJvmTerminated();
+  }
+
+  @Test
+  public void testDispatchLoop_unexpectedFailureKillsJvm_streamingEngine() {
+    SingleSourceWorkerHarness.GetWorkSender getWorkSender =
+        SingleSourceWorkerHarness.GetWorkSender.forStreamingEngine(
+            workItemReceiver -> {
+              throw new RuntimeException("something bad happened");
+            });
+
+    FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime();
+    createWorkerHarness(getWorkSender, fakeJvmRuntime).start();
+    assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS));
+    fakeJvmRuntime.assertJvmTerminated();
+  }
+
+  private static class FakeJvmRuntime implements JvmRuntime {
+    private final CountDownLatch haltedLatch = new CountDownLatch(1);
+    private volatile int exitStatus = 0;
+
+    @Override
+    public void halt(int status) {
+      exitStatus = status;
+      haltedLatch.countDown();
+    }
+
+    public boolean waitForRuntimeDeath(long timeout, TimeUnit unit) {
+      try {
+        return haltedLatch.await(timeout, unit);
+      } catch (InterruptedException e) {
+        return false;
+      }
+    }
+
+    private void assertJvmTerminated() {
+      
assertThat(exitStatus).isEqualTo(WorkerUncaughtExceptionHandler.JVM_TERMINATED_STATUS_CODE);
+    }
+  }
+}

Reply via email to