This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 88ada9dfee3 fix silent failures in dispatch loop from stalling the
pipeline (#32922)
88ada9dfee3 is described below
commit 88ada9dfee3c4602ff62b0f6ebdded29518760c9
Author: martin trieu <[email protected]>
AuthorDate: Wed Oct 30 04:19:36 2024 -0600
fix silent failures in dispatch loop from stalling the pipeline (#32922)
* use ExecutorService instead of ScheduledExecutorService which swallows
exceptions into futures that were not examined
Co-authored-by: Arun Pandian <[email protected]>
---
.../worker/DataflowWorkerHarnessHelper.java | 4 +-
.../dataflow/worker/StreamingDataflowWorker.java | 14 ++-
.../worker/WorkerUncaughtExceptionHandler.java | 10 +-
.../FanOutStreamingEngineWorkerHarness.java | 2 +-
.../harness/SingleSourceWorkerHarness.java | 2 +-
.../commits/StreamingApplianceWorkCommitter.java | 2 +-
.../work/budget/GetWorkBudgetRefresher.java | 2 +-
.../work/processing/StreamingWorkScheduler.java | 2 +-
...treamingEngineComputationConfigFetcherTest.java | 1 -
.../harness/SingleSourceWorkerHarnessTest.java | 117 +++++++++++++++++++++
10 files changed, 136 insertions(+), 20 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
index 94c894608a4..a28a5e989c8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
@@ -82,7 +82,9 @@ public final class DataflowWorkerHarnessHelper {
@SuppressWarnings("Slf4jIllegalPassedClass")
public static void initializeLogging(Class<?> workerHarnessClass) {
- /* Set up exception handling tied to the workerHarnessClass. */
+ // Set up exception handling for raw Threads tied to the
workerHarnessClass.
+ // Does NOT handle exceptions thrown by threads created by
+ // ScheduledExecutors/ScheduledExecutorServices.
Thread.setDefaultUncaughtExceptionHandler(
new
WorkerUncaughtExceptionHandler(LoggerFactory.getLogger(workerHarnessClass)));
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index c478341c1c3..ff72add83e4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -175,7 +175,7 @@ public final class StreamingDataflowWorker {
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
GrpcWindmillStreamFactory windmillStreamFactory,
- Function<String, ScheduledExecutorService> executorSupplier,
+ ScheduledExecutorService activeWorkRefreshExecutorFn,
ConcurrentMap<String, StageInfo> stageInfoMap) {
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
@@ -285,7 +285,7 @@ public final class StreamingDataflowWorker {
stuckCommitDurationMillis,
computationStateCache::getAllPresentComputations,
sampler,
- executorSupplier.apply("RefreshWork"),
+ activeWorkRefreshExecutorFn,
getDataMetricTracker::trackHeartbeats);
this.statusPages =
@@ -347,10 +347,7 @@ public final class StreamingDataflowWorker {
.setSizeMb(options.getWorkerCacheMb())
.setSupportMapViaMultimap(options.isEnableStreamingEngine())
.build();
- Function<String, ScheduledExecutorService> executorSupplier =
- threadName ->
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat(threadName).build());
+
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
createGrpcwindmillStreamFactoryBuilder(options, clientId);
@@ -417,7 +414,8 @@ public final class StreamingDataflowWorker {
streamingCounters,
memoryMonitor,
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
- executorSupplier,
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()),
stageInfo);
}
@@ -595,7 +593,7 @@ public final class StreamingDataflowWorker {
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.build()
: windmillStreamFactory.build(),
- executorSupplier,
+ executorSupplier.apply("RefreshWork"),
stageInfo);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java
index 5a8e87d23ab..b4ec170099d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java
@@ -28,16 +28,16 @@ import org.slf4j.Logger;
* This uncaught exception handler logs the {@link Throwable} to the logger,
{@link System#err} and
* exits the application with status code 1.
*/
-class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler {
+public final class WorkerUncaughtExceptionHandler implements
UncaughtExceptionHandler {
+ @VisibleForTesting public static final int JVM_TERMINATED_STATUS_CODE = 1;
private final JvmRuntime runtime;
private final Logger logger;
- WorkerUncaughtExceptionHandler(Logger logger) {
+ public WorkerUncaughtExceptionHandler(Logger logger) {
this(JvmRuntime.INSTANCE, logger);
}
- @VisibleForTesting
- WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) {
+ public WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) {
this.runtime = runtime;
this.logger = logger;
}
@@ -59,7 +59,7 @@ class WorkerUncaughtExceptionHandler implements
UncaughtExceptionHandler {
t.printStackTrace(originalStdErr);
}
} finally {
- runtime.halt(1);
+ runtime.halt(JVM_TERMINATED_STATUS_CODE);
}
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
index 458cf57ca8e..3eed4ee6d83 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
@@ -137,7 +137,7 @@ public final class FanOutStreamingEngineWorkerHarness
implements StreamingWorker
Executors.newCachedThreadPool(
new
ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build());
this.workerMetadataConsumer =
- Executors.newSingleThreadScheduledExecutor(
+ Executors.newSingleThreadExecutor(
new
ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build());
this.getWorkBudgetDistributor = getWorkBudgetDistributor;
this.totalGetWorkBudget = totalGetWorkBudget;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
index bc93e6d89c4..06598b61c45 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
@@ -82,7 +82,7 @@ public final class SingleSourceWorkerHarness implements
StreamingWorkerHarness {
this.waitForResources = waitForResources;
this.computationStateFetcher = computationStateFetcher;
this.workProviderExecutor =
- Executors.newSingleThreadScheduledExecutor(
+ Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
index d092ebf53fc..6889764afe6 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java
@@ -57,7 +57,7 @@ public final class StreamingApplianceWorkCommitter implements
WorkCommitter {
WeightedBoundedQueue.create(
MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES,
commit.getSize()));
this.commitWorkers =
- Executors.newSingleThreadScheduledExecutor(
+ Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MAX_PRIORITY)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
index e39aa8dbc8a..d81c7d0593f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
@@ -51,7 +51,7 @@ public final class GetWorkBudgetRefresher {
Supplier<Boolean> isBudgetRefreshPaused, Runnable redistributeBudget) {
this.budgetRefreshTrigger = new AdvancingPhaser(1);
this.budgetRefreshExecutor =
- Executors.newSingleThreadScheduledExecutor(
+ Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat(BUDGET_REFRESH_THREAD)
.setUncaughtExceptionHandler(
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 9a3e6eb6b09..c74874c465a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory;
*/
@Internal
@ThreadSafe
-public final class StreamingWorkScheduler {
+public class StreamingWorkScheduler {
private static final Logger LOG =
LoggerFactory.getLogger(StreamingWorkScheduler.class);
private final DataflowWorkerHarnessOptions options;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
index 9fa17588c94..3a0ae7bb208 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
@@ -47,7 +47,6 @@ import org.mockito.internal.stubbing.answers.Returns;
@RunWith(JUnit4.class)
public class StreamingEngineComputationConfigFetcherTest {
-
private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class, new Returns(Optional.empty()));
private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
new file mode 100644
index 00000000000..5a2df4baae6
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.JvmRuntime;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+public class SingleSourceWorkerHarnessTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(SingleSourceWorkerHarnessTest.class);
+ private final WorkCommitter workCommitter = mock(WorkCommitter.class);
+ private final GetDataClient getDataClient = mock(GetDataClient.class);
+ private final HeartbeatSender heartbeatSender = mock(HeartbeatSender.class);
+ private final Runnable waitForResources = () -> {};
+ private final Function<String, Optional<ComputationState>>
computationStateFetcher =
+ ignored -> Optional.empty();
+ private final StreamingWorkScheduler streamingWorkScheduler =
mock(StreamingWorkScheduler.class);
+
+ private SingleSourceWorkerHarness createWorkerHarness(
+ SingleSourceWorkerHarness.GetWorkSender getWorkSender, JvmRuntime
runtime) {
+ // In non-test scenario this is set in
DataflowWorkerHarnessHelper.initializeLogging(...).
+ Thread.setDefaultUncaughtExceptionHandler(new
WorkerUncaughtExceptionHandler(runtime, LOG));
+ return SingleSourceWorkerHarness.builder()
+ .setWorkCommitter(workCommitter)
+ .setGetDataClient(getDataClient)
+ .setHeartbeatSender(heartbeatSender)
+ .setWaitForResources(waitForResources)
+ .setStreamingWorkScheduler(streamingWorkScheduler)
+ .setComputationStateFetcher(computationStateFetcher)
+ .setGetWorkSender(getWorkSender)
+ .build();
+ }
+
+ @Test
+ public void testDispatchLoop_unexpectedFailureKillsJvm_appliance() {
+ SingleSourceWorkerHarness.GetWorkSender getWorkSender =
+ SingleSourceWorkerHarness.GetWorkSender.forAppliance(
+ () -> {
+ throw new RuntimeException("something bad happened");
+ });
+
+ FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime();
+ createWorkerHarness(getWorkSender, fakeJvmRuntime).start();
+ assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS));
+ fakeJvmRuntime.assertJvmTerminated();
+ }
+
+ @Test
+ public void testDispatchLoop_unexpectedFailureKillsJvm_streamingEngine() {
+ SingleSourceWorkerHarness.GetWorkSender getWorkSender =
+ SingleSourceWorkerHarness.GetWorkSender.forStreamingEngine(
+ workItemReceiver -> {
+ throw new RuntimeException("something bad happened");
+ });
+
+ FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime();
+ createWorkerHarness(getWorkSender, fakeJvmRuntime).start();
+ assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS));
+ fakeJvmRuntime.assertJvmTerminated();
+ }
+
+ private static class FakeJvmRuntime implements JvmRuntime {
+ private final CountDownLatch haltedLatch = new CountDownLatch(1);
+ private volatile int exitStatus = 0;
+
+ @Override
+ public void halt(int status) {
+ exitStatus = status;
+ haltedLatch.countDown();
+ }
+
+ public boolean waitForRuntimeDeath(long timeout, TimeUnit unit) {
+ try {
+ return haltedLatch.await(timeout, unit);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ private void assertJvmTerminated() {
+
assertThat(exitStatus).isEqualTo(WorkerUncaughtExceptionHandler.JVM_TERMINATED_STATUS_CODE);
+ }
+ }
+}