This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 600cde4 [FLINK-24303][coordination] Failure when creating a source
enumerator lead to full failover, not JobManager failure.
600cde4 is described below
commit 600cde46a72bd78ac3aefffde7ae936e57624131
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Sep 20 02:59:02 2021 +0200
[FLINK-24303][coordination] Failure when creating a source enumerator lead
to full failover, not JobManager failure.
Instead of letting exceptions during the creation of the Source Enumerator
bubble up (and utimately fail
the JobManager / Scheduler creation), we now catch those exceptions and
trigger a full (global) failover
for that case.
---
.../source/reader/CoordinatedSourceITCase.java | 164 +++++++++++++++++++++
.../source/coordinator/SourceCoordinator.java | 20 ++-
.../source/coordinator/SourceCoordinatorTest.java | 20 +++
3 files changed, 201 insertions(+), 3 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index 158ec94..4c23973 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -20,17 +20,26 @@ package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.FlinkRuntimeException;
import org.junit.Test;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -60,6 +69,34 @@ public class CoordinatedSourceITCase extends
AbstractTestBase {
executeAndVerify(env, stream1.union(stream2), 40);
}
+ @Test
+ public void testEnumeratorCreationFails() throws Exception {
+ OnceFailingToCreateEnumeratorSource.reset();
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
0L));
+ final Source<Integer, ?, ?> source =
+ new OnceFailingToCreateEnumeratorSource(2, 10,
Boundedness.BOUNDED);
+ final DataStream<Integer> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"TestingSource");
+ executeAndVerify(env, stream, 20);
+ }
+
+ @Test
+ public void testEnumeratorRestoreFails() throws Exception {
+ OnceFailingToRestoreEnumeratorSource.reset();
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
0L));
+ env.enableCheckpointing(10);
+
+ final Source<Integer, ?, ?> source =
+ new OnceFailingToRestoreEnumeratorSource(2, 10,
Boundedness.BOUNDED);
+ final DataStream<Integer> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"TestingSource");
+ executeAndVerify(env, stream, 20);
+ }
+
@SuppressWarnings("serial")
private void executeAndVerify(
StreamExecutionEnvironment env, DataStream<Integer> stream, int
numRecords)
@@ -83,4 +120,131 @@ public class CoordinatedSourceITCase extends
AbstractTestBase {
assertEquals(0, (int) result.get(0));
assertEquals(numRecords - 1, (int) result.get(result.size() - 1));
}
+
+ // ------------------------------------------------------------------------
+
+ private static class OnceFailingToCreateEnumeratorSource extends
MockBaseSource {
+
+ private static final long serialVersionUID = 1L;
+ private static boolean hasFailed;
+
+ OnceFailingToCreateEnumeratorSource(
+ int numSplits, int numRecordsPerSplit, Boundedness
boundedness) {
+ super(numSplits, numRecordsPerSplit, boundedness);
+ }
+
+ @Override
+ public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>>
createEnumerator(
+ SplitEnumeratorContext<MockSourceSplit> enumContext) {
+ if (!hasFailed) {
+ hasFailed = true;
+ throw new FlinkRuntimeException("Test Failure");
+ }
+
+ return super.createEnumerator(enumContext);
+ }
+
+ static void reset() {
+ hasFailed = false;
+ }
+ }
+
+ /**
+ * A source with the following behavior:
+ *
+ * <ol>
+ * <li>It initially creates an enumerator that does not assign work,
waits until the first
+ * checkpoint completes (which contains all work, because none is
assigned, yet) and then
+ * triggers a global failure.
+ * <li>Upon restoring from the failure, the first attempt to restore the
enumerator fails with
+ * an exception.
+ * <li>The next time to restore the enumerator succeeds and the
enumerator works regularly.
+ * </ol>
+ */
+ private static class OnceFailingToRestoreEnumeratorSource extends
MockBaseSource {
+
+ private static final long serialVersionUID = 1L;
+ private static boolean hasFailed;
+
+ OnceFailingToRestoreEnumeratorSource(
+ int numSplits, int numRecordsPerSplit, Boundedness
boundedness) {
+ super(numSplits, numRecordsPerSplit, boundedness);
+ }
+
+ @Override
+ public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>>
createEnumerator(
+ SplitEnumeratorContext<MockSourceSplit> enumContext) {
+
+ final SplitEnumerator<MockSourceSplit, List<MockSourceSplit>>
enumerator =
+ super.createEnumerator(enumContext);
+
+ if (hasFailed) {
+ // after the failure happened, we proceed normally
+ return enumerator;
+ } else {
+ // before the failure, we go with
+ try {
+ final List<MockSourceSplit> splits =
enumerator.snapshotState(1L);
+ return new NonAssigningEnumerator(splits, enumContext);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>>
restoreEnumerator(
+ SplitEnumeratorContext<MockSourceSplit> enumContext,
+ List<MockSourceSplit> checkpoint)
+ throws IOException {
+ if (!hasFailed) {
+ hasFailed = true;
+ throw new FlinkRuntimeException("Test Failure");
+ }
+
+ return super.restoreEnumerator(enumContext, checkpoint);
+ }
+
+ static void reset() {
+ hasFailed = false;
+ }
+
+ /**
+ * This enumerator does not assign work, so all state is in the
checkpoint. After the first
+ * checkpoint is complete, it triggers a global failure.
+ */
+ private static class NonAssigningEnumerator extends
MockSplitEnumerator {
+
+ private final SplitEnumeratorContext<?> context;
+
+ NonAssigningEnumerator(
+ List<MockSourceSplit> splits,
SplitEnumeratorContext<MockSourceSplit> context) {
+ super(splits, context);
+ this.context = context;
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // we do nothing here to make sure there is no progress
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {
+ // we do nothing here to make sure there is no progress
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws
Exception {
+ // This is a bit of a clumsy way to trigger a global failover
from a coordinator.
+ // This is safe, though, because per the contract, exceptions
in the enumerator
+ // handlers trigger a global failover.
+ context.callAsync(
+ () -> null,
+ (success, failure) -> {
+ throw new FlinkRuntimeException(
+ "Artificial trigger for Global Failover");
+ });
+ }
+ }
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index add85bb..5ba4160 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -111,6 +111,10 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
public void start() throws Exception {
LOG.info("Starting split enumerator for source {}.", operatorName);
+ // we mark this as started first, so that we can later distinguish the
cases where
+ // 'start()' wasn't called and where 'start()' failed.
+ started = true;
+
// there are two ways the coordinator can get created:
// (1) Source.restoreEnumerator(), in which case the
'resetToCheckpoint()' method creates
// it
@@ -122,13 +126,17 @@ public class SourceCoordinator<SplitT extends
SourceSplit, EnumChkT>
try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(userCodeClassLoader)) {
enumerator = source.createEnumerator(context);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+ LOG.error("Failed to create Source Enumerator for source {}",
operatorName, t);
+ context.failJob(t);
+ return;
}
}
// The start sequence is the first task in the coordinator executor.
// We rely on the single-threaded coordinator executor to guarantee
// the other methods are invoked after the enumerator has started.
- started = true;
runInEventLoop(() -> enumerator.start(), "starting the
SplitEnumerator.");
}
@@ -309,6 +317,14 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
final Object... actionNameFormatParameters) {
ensureStarted();
+
+ // we may end up here even for a non-started enumerator, in case the
instantiation
+ // failed, and we get the 'subtaskFailed()' notification during the
failover.
+ // we need to ignore those.
+ if (enumerator == null) {
+ return;
+ }
+
coordinatorExecutor.execute(
() -> {
try {
@@ -410,7 +426,5 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
if (!started) {
throw new IllegalStateException("The coordinator has not started
yet.");
}
-
- assert enumerator != null;
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 0b62215..b562643 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -255,6 +255,26 @@ public class SourceCoordinatorTest extends
SourceCoordinatorTestBase {
}
@Test
+ public void testFailJobWhenExceptionThrownFromEnumeratorCreation() throws
Exception {
+ final RuntimeException failureReason = new
RuntimeException("Artificial Exception");
+
+ final SourceCoordinator<?, ?> coordinator =
+ new SourceCoordinator<>(
+ OPERATOR_NAME,
+ coordinatorExecutor,
+ new EnumeratorCreatingSource<>(
+ () -> {
+ throw failureReason;
+ }),
+ context);
+
+ coordinator.start();
+
+ assertTrue(operatorCoordinatorContext.isJobFailed());
+ assertEquals(failureReason,
operatorCoordinatorContext.getJobFailureReason());
+ }
+
+ @Test
public void testErrorThrownFromSplitEnumerator() throws Exception {
final Error error = new Error("Test Error");