This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 216f469f821ee61fc48c2df86364c5e54d0aaa94 Author: Stephan Ewen <[email protected]> AuthorDate: Wed Nov 4 15:48:19 2020 +0100 [FLINK-19384][core] Add common permissive exception signatures to all methods of Source. This makes the exception signatures consistent within the class and with the exception philisophy in other API classes, like transformation functions: Allow users to simply throw or forward any type of checked exceptions. --- flink-core/pom.xml | 3 ++- .../org/apache/flink/api/connector/source/Source.java | 16 ++++++++++++---- .../operators/coordination/OperatorCoordinator.java | 2 +- .../coordination/OperatorCoordinatorHolder.java | 4 ++-- .../coordination/RecreateOnResetOperatorCoordinator.java | 9 +++++---- .../runtime/source/coordinator/SourceCoordinator.java | 2 +- .../source/coordinator/SourceCoordinatorProvider.java | 2 +- .../RecreateOnResetOperatorCoordinatorTest.java | 3 ++- .../coordinator/SourceCoordinatorProviderTest.java | 2 +- .../source/coordinator/SourceCoordinatorTestBase.java | 5 +++-- .../flink/streaming/api/operators/SourceOperator.java | 6 +++--- .../streaming/api/operators/SourceOperatorFactory.java | 10 +++++----- 12 files changed, 38 insertions(+), 26 deletions(-) diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 9fb9448..9a7b53e 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -185,9 +185,10 @@ under the License. <exclude>org.apache.flink.api.common.typeinfo.TypeHint</exclude> <exclude>org.apache.flink.api.java.typeutils.TypeInfoParser</exclude> - <!-- additions to Source API context interfaces --> + <!-- additions to Source API context interfaces and exception signatures --> <exclude>org.apache.flink.api.connector.source.SourceReaderContext</exclude> <exclude>org.apache.flink.api.connector.source.SplitEnumeratorContext</exclude> + <exclude>org.apache.flink.api.connector.source.Source</exclude> </excludes> </parameter> </configuration> diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java index fe81db8..38a2578 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java @@ -21,7 +21,6 @@ package org.apache.flink.api.connector.source; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.SimpleVersionedSerializer; -import java.io.IOException; import java.io.Serializable; /** @@ -49,16 +48,22 @@ public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Seriali * * @param readerContext The {@link SourceReaderContext context} for the source reader. * @return A new SourceReader. + * + * @throws Exception The implementor is free to forward all exceptions directly. + * Exceptions thrown from this method cause task failure/recovery. */ - SourceReader<T, SplitT> createReader(SourceReaderContext readerContext); + SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) throws Exception; /** * Creates a new SplitEnumerator for this source, starting a new input. * * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator. * @return A new SplitEnumerator. + * + * @throws Exception The implementor is free to forward all exceptions directly. + * * Exceptions thrown from this method cause JobManager failure/recovery. */ - SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext); + SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext) throws Exception; /** * Restores an enumerator from a checkpoint. @@ -66,10 +71,13 @@ public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Seriali * @param enumContext The {@link SplitEnumeratorContext context} for the restored split enumerator. * @param checkpoint The checkpoint to restore the SplitEnumerator from. * @return A SplitEnumerator restored from the given checkpoint. + * + * @throws Exception The implementor is free to forward all exceptions directly. + * * Exceptions thrown from this method cause JobManager failure/recovery. */ SplitEnumerator<SplitT, EnumChkT> restoreEnumerator( SplitEnumeratorContext<SplitT> enumContext, - EnumChkT checkpoint) throws IOException; + EnumChkT checkpoint) throws Exception; // ------------------------------------------------------------------------ // serializers for the metadata diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java index bc280f0..b40e4f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java @@ -197,6 +197,6 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { /** * Creates the {@code OperatorCoordinator}, using the given context. */ - OperatorCoordinator create(Context context); + OperatorCoordinator create(Context context) throws Exception; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 9819754..a4c43a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -302,7 +302,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC public static OperatorCoordinatorHolder create( SerializedValue<OperatorCoordinator.Provider> serializedProvider, ExecutionJobVertex jobVertex, - ClassLoader classLoader) throws IOException, ClassNotFoundException { + ClassLoader classLoader) throws Exception { try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) { final OperatorCoordinator.Provider provider = serializedProvider.deserializeValue(classLoader); @@ -331,7 +331,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender, final String operatorName, final int operatorParallelism, - final int operatorMaxParallelism) { + final int operatorMaxParallelism) throws Exception { final OperatorEventValve valve = new OperatorEventValve(eventSender); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index c19997c..03861b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.Acknowledge; import javax.annotation.Nullable; + import java.util.concurrent.CompletableFuture; /** @@ -39,7 +40,7 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { private RecreateOnResetOperatorCoordinator( QuiesceableContext context, - Provider provider) { + Provider provider) throws Exception { this.quiesceableContext = context; this.provider = provider; this.coordinator = provider.getCoordinator(context); @@ -107,7 +108,7 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { // --------------------- - public static abstract class Provider implements OperatorCoordinator.Provider { + public abstract static class Provider implements OperatorCoordinator.Provider { private static final long serialVersionUID = 3002837631612629071L; private final OperatorID operatorID; @@ -121,12 +122,12 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { } @Override - public OperatorCoordinator create(Context context) { + public OperatorCoordinator create(Context context) throws Exception { QuiesceableContext quiesceableContext = new QuiesceableContext(context); return new RecreateOnResetOperatorCoordinator(quiesceableContext, this); } - protected abstract OperatorCoordinator getCoordinator(OperatorCoordinator.Context context); + protected abstract OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception; } // ---------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index a22a239..3e3af76 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -89,7 +89,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements String operatorName, ExecutorService coordinatorExecutor, Source<?, SplitT, EnumChkT> source, - SourceCoordinatorContext<SplitT> context) { + SourceCoordinatorContext<SplitT> context) throws Exception { this.operatorName = operatorName; this.coordinatorExecutor = coordinatorExecutor; this.source = source; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java index 44d7671..0304a88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java @@ -64,7 +64,7 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre } @Override - public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) { + public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception { final String coordinatorThreadName = "SourceCoordinator-" + operatorName; CoordinatorExecutorThreadFactory coordinatorThreadFactory = new CoordinatorExecutorThreadFactory(coordinatorThreadName); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java index f4298ce..7ff487f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators.coordination; import org.apache.flink.runtime.jobgraph.OperatorID; + import org.junit.Test; import java.util.Collections; @@ -90,7 +91,7 @@ public class RecreateOnResetOperatorCoordinatorTest { private RecreateOnResetOperatorCoordinator createCoordinator( TestingCoordinatorProvider provider, - OperatorCoordinator.Context context) { + OperatorCoordinator.Context context) throws Exception { return (RecreateOnResetOperatorCoordinator) provider.create(context); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java index c86981c..3e736bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java @@ -55,7 +55,7 @@ public class SourceCoordinatorProviderTest { } @Test - public void testCreate() { + public void testCreate() throws Exception { OperatorCoordinator coordinator = provider.create(new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS)); assertTrue(coordinator instanceof RecreateOnResetOperatorCoordinator); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java index a14c868..a46b1ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java @@ -52,7 +52,7 @@ public abstract class SourceCoordinatorTestBase { protected MockSplitEnumerator enumerator; @Before - public void setup() { + public void setup() throws Exception { operatorCoordinatorContext = new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, NUM_SUBTASKS); splitSplitAssignmentTracker = new SplitAssignmentTracker<>(); String coordinatorThreadName = TEST_OPERATOR_ID.toHexString(); @@ -80,9 +80,10 @@ public abstract class SourceCoordinatorTestBase { // -------------------------- - protected SourceCoordinator getNewSourceCoordinator() { + protected SourceCoordinator getNewSourceCoordinator() throws Exception { Source<Integer, MockSourceSplit, Set<MockSourceSplit>> mockSource = new MockSource(Boundedness.BOUNDED, NUM_SUBTASKS * 2); + return new SourceCoordinator<>(OPERATOR_NAME, coordinatorExecutor, mockSource, context); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index b18d85e..799b2a1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -47,11 +47,11 @@ import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.FunctionWithException; import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -79,7 +79,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> /** The factory for the source reader. This is a workaround, because currently the SourceReader * must be lazily initialized, which is mainly because the metrics groups that the reader relies on is * lazily initialized. */ - private final Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory; + private final FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> readerFactory; /** The serializer for the splits, applied to the split types before storing them in the reader state. */ private final SimpleVersionedSerializer<SplitT> splitSerializer; @@ -113,7 +113,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> private TimestampsAndWatermarks<OUT> eventTimeLogic; public SourceOperator( - Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory, + FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> readerFactory, OperatorEventGateway operatorEventGateway, SimpleVersionedSerializer<SplitT> splitSerializer, WatermarkStrategy<OUT> watermarkStrategy, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java index 1d7aed7..ac7b031 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java @@ -31,8 +31,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware; - -import java.util.function.Function; +import org.apache.flink.util.function.FunctionWithException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -114,7 +113,7 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU */ @SuppressWarnings("unchecked") private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator( - Function<SourceReaderContext, SourceReader<T, ?>> readerFactory, + FunctionWithException<SourceReaderContext, SourceReader<T, ?>, Exception> readerFactory, OperatorEventGateway eventGateway, SimpleVersionedSerializer<?> splitSerializer, WatermarkStrategy<T> watermarkStrategy, @@ -123,8 +122,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU String localHostName) { // jumping through generics hoops: cast the generics away to then cast them back more strictly typed - final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory = - (Function<SourceReaderContext, SourceReader<T, SplitT>>) (Function<?, ?>) readerFactory; + final FunctionWithException<SourceReaderContext, SourceReader<T, SplitT>, Exception> typedReaderFactory = + (FunctionWithException<SourceReaderContext, SourceReader<T, SplitT>, Exception>) + (FunctionWithException<?, ?, ?>) readerFactory; final SimpleVersionedSerializer<SplitT> typedSplitSerializer = (SimpleVersionedSerializer<SplitT>) splitSerializer;
