This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 216f469f821ee61fc48c2df86364c5e54d0aaa94
Author: Stephan Ewen <[email protected]>
AuthorDate: Wed Nov 4 15:48:19 2020 +0100

    [FLINK-19384][core] Add common permissive exception signatures to all 
methods of Source.
    
    This makes the exception signatures consistent within the class and with 
the exception philisophy
    in other API classes, like transformation functions: Allow users to simply 
throw or forward any
    type of checked exceptions.
---
 flink-core/pom.xml                                       |  3 ++-
 .../org/apache/flink/api/connector/source/Source.java    | 16 ++++++++++++----
 .../operators/coordination/OperatorCoordinator.java      |  2 +-
 .../coordination/OperatorCoordinatorHolder.java          |  4 ++--
 .../coordination/RecreateOnResetOperatorCoordinator.java |  9 +++++----
 .../runtime/source/coordinator/SourceCoordinator.java    |  2 +-
 .../source/coordinator/SourceCoordinatorProvider.java    |  2 +-
 .../RecreateOnResetOperatorCoordinatorTest.java          |  3 ++-
 .../coordinator/SourceCoordinatorProviderTest.java       |  2 +-
 .../source/coordinator/SourceCoordinatorTestBase.java    |  5 +++--
 .../flink/streaming/api/operators/SourceOperator.java    |  6 +++---
 .../streaming/api/operators/SourceOperatorFactory.java   | 10 +++++-----
 12 files changed, 38 insertions(+), 26 deletions(-)

diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 9fb9448..9a7b53e 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -185,9 +185,10 @@ under the License.
                                                        
<exclude>org.apache.flink.api.common.typeinfo.TypeHint</exclude>
                                                        
<exclude>org.apache.flink.api.java.typeutils.TypeInfoParser</exclude>
 
-                                                       <!-- additions to 
Source API context interfaces -->
+                                                       <!-- additions to 
Source API context interfaces and exception signatures -->
                                                        
<exclude>org.apache.flink.api.connector.source.SourceReaderContext</exclude>
                                                        
<exclude>org.apache.flink.api.connector.source.SplitEnumeratorContext</exclude>
+                                                       
<exclude>org.apache.flink.api.connector.source.Source</exclude>
                                                </excludes>
                                        </parameter>
                                </configuration>
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
index fe81db8..38a2578 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.connector.source;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
-import java.io.IOException;
 import java.io.Serializable;
 
 /**
@@ -49,16 +48,22 @@ public interface Source<T, SplitT extends SourceSplit, 
EnumChkT> extends Seriali
         *
         * @param readerContext The {@link SourceReaderContext context} for the 
source reader.
         * @return A new SourceReader.
+        *
+        * @throws Exception The implementor is free to forward all exceptions 
directly.
+        *                   Exceptions thrown from this method cause task 
failure/recovery.
         */
-       SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);
+       SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) 
throws Exception;
 
        /**
         * Creates a new SplitEnumerator for this source, starting a new input.
         *
         * @param enumContext The {@link SplitEnumeratorContext context} for 
the split enumerator.
         * @return A new SplitEnumerator.
+        *
+        * @throws Exception The implementor is free to forward all exceptions 
directly.
+        *       *                   Exceptions thrown from this method cause 
JobManager failure/recovery.
         */
-       SplitEnumerator<SplitT, EnumChkT> 
createEnumerator(SplitEnumeratorContext<SplitT> enumContext);
+       SplitEnumerator<SplitT, EnumChkT> 
createEnumerator(SplitEnumeratorContext<SplitT> enumContext) throws Exception;
 
        /**
         * Restores an enumerator from a checkpoint.
@@ -66,10 +71,13 @@ public interface Source<T, SplitT extends SourceSplit, 
EnumChkT> extends Seriali
         * @param enumContext The {@link SplitEnumeratorContext context} for 
the restored split enumerator.
         * @param checkpoint The checkpoint to restore the SplitEnumerator from.
         * @return A SplitEnumerator restored from the given checkpoint.
+        *
+        * @throws Exception The implementor is free to forward all exceptions 
directly.
+        *       *                   Exceptions thrown from this method cause 
JobManager failure/recovery.
         */
        SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
                        SplitEnumeratorContext<SplitT> enumContext,
-                       EnumChkT checkpoint) throws IOException;
+                       EnumChkT checkpoint) throws Exception;
 
        // 
------------------------------------------------------------------------
        //  serializers for the metadata
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index bc280f0..b40e4f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -197,6 +197,6 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
                /**
                 * Creates the {@code OperatorCoordinator}, using the given 
context.
                 */
-               OperatorCoordinator create(Context context);
+               OperatorCoordinator create(Context context) throws Exception;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 9819754..a4c43a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -302,7 +302,7 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
        public static OperatorCoordinatorHolder create(
                        SerializedValue<OperatorCoordinator.Provider> 
serializedProvider,
                        ExecutionJobVertex jobVertex,
-                       ClassLoader classLoader) throws IOException, 
ClassNotFoundException {
+                       ClassLoader classLoader) throws Exception {
 
                try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {
                        final OperatorCoordinator.Provider provider = 
serializedProvider.deserializeValue(classLoader);
@@ -331,7 +331,7 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                        final BiFunction<SerializedValue<OperatorEvent>, 
Integer, CompletableFuture<Acknowledge>> eventSender,
                        final String operatorName,
                        final int operatorParallelism,
-                       final int operatorMaxParallelism) {
+                       final int operatorMaxParallelism) throws Exception {
 
                final OperatorEventValve valve = new 
OperatorEventValve(eventSender);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index c19997c..03861b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
 
 import javax.annotation.Nullable;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -39,7 +40,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
 
        private RecreateOnResetOperatorCoordinator(
                        QuiesceableContext context,
-                       Provider provider) {
+                       Provider provider) throws Exception {
                this.quiesceableContext = context;
                this.provider = provider;
                this.coordinator = provider.getCoordinator(context);
@@ -107,7 +108,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
 
        // ---------------------
 
-       public static abstract class Provider implements 
OperatorCoordinator.Provider {
+       public abstract static class Provider implements 
OperatorCoordinator.Provider {
                private static final long serialVersionUID = 
3002837631612629071L;
                private final OperatorID operatorID;
 
@@ -121,12 +122,12 @@ public class RecreateOnResetOperatorCoordinator 
implements OperatorCoordinator {
                }
 
                @Override
-               public OperatorCoordinator create(Context context) {
+               public OperatorCoordinator create(Context context) throws 
Exception {
                        QuiesceableContext quiesceableContext = new 
QuiesceableContext(context);
                        return new 
RecreateOnResetOperatorCoordinator(quiesceableContext, this);
                }
 
-               protected abstract OperatorCoordinator 
getCoordinator(OperatorCoordinator.Context context);
+               protected abstract OperatorCoordinator 
getCoordinator(OperatorCoordinator.Context context) throws Exception;
        }
 
        // ----------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index a22a239..3e3af76 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -89,7 +89,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT> implements
                        String operatorName,
                        ExecutorService coordinatorExecutor,
                        Source<?, SplitT, EnumChkT> source,
-                       SourceCoordinatorContext<SplitT> context) {
+                       SourceCoordinatorContext<SplitT> context) throws 
Exception {
                this.operatorName = operatorName;
                this.coordinatorExecutor = coordinatorExecutor;
                this.source = source;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 44d7671..0304a88 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -64,7 +64,7 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit> extends Recre
        }
 
        @Override
-       public OperatorCoordinator getCoordinator(OperatorCoordinator.Context 
context) {
+       public OperatorCoordinator getCoordinator(OperatorCoordinator.Context 
context) throws Exception  {
                final String coordinatorThreadName = "SourceCoordinator-" + 
operatorName;
                CoordinatorExecutorThreadFactory coordinatorThreadFactory =
                                new 
CoordinatorExecutorThreadFactory(coordinatorThreadName);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
index f4298ce..7ff487f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
+
 import org.junit.Test;
 
 import java.util.Collections;
@@ -90,7 +91,7 @@ public class RecreateOnResetOperatorCoordinatorTest {
 
        private RecreateOnResetOperatorCoordinator createCoordinator(
                        TestingCoordinatorProvider provider,
-                       OperatorCoordinator.Context context) {
+                       OperatorCoordinator.Context context) throws Exception {
                return (RecreateOnResetOperatorCoordinator) 
provider.create(context);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index c86981c..3e736bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -55,7 +55,7 @@ public class SourceCoordinatorProviderTest {
        }
 
        @Test
-       public void testCreate() {
+       public void testCreate() throws Exception {
                OperatorCoordinator coordinator =
                                provider.create(new 
MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS));
                assertTrue(coordinator instanceof 
RecreateOnResetOperatorCoordinator);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index a14c868..a46b1ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -52,7 +52,7 @@ public abstract class SourceCoordinatorTestBase {
        protected MockSplitEnumerator enumerator;
 
        @Before
-       public void setup() {
+       public void setup() throws Exception {
                operatorCoordinatorContext = new 
MockOperatorCoordinatorContext(TEST_OPERATOR_ID, NUM_SUBTASKS);
                splitSplitAssignmentTracker = new SplitAssignmentTracker<>();
                String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
@@ -80,9 +80,10 @@ public abstract class SourceCoordinatorTestBase {
 
        // --------------------------
 
-       protected SourceCoordinator getNewSourceCoordinator() {
+       protected SourceCoordinator getNewSourceCoordinator() throws Exception {
                Source<Integer, MockSourceSplit, Set<MockSourceSplit>> 
mockSource =
                                new MockSource(Boundedness.BOUNDED, 
NUM_SUBTASKS * 2);
+
                return new SourceCoordinator<>(OPERATOR_NAME, 
coordinatorExecutor, mockSource, context);
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index b18d85e..799b2a1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -47,11 +47,11 @@ import 
org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.FunctionWithException;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -79,7 +79,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
        /** The factory for the source reader. This is a workaround, because 
currently the SourceReader
         * must be lazily initialized, which is mainly because the metrics 
groups that the reader relies on is
         * lazily initialized. */
-       private final Function<SourceReaderContext, SourceReader<OUT, SplitT>> 
readerFactory;
+       private final FunctionWithException<SourceReaderContext, 
SourceReader<OUT, SplitT>, Exception> readerFactory;
 
        /** The serializer for the splits, applied to the split types before 
storing them in the reader state. */
        private final SimpleVersionedSerializer<SplitT> splitSerializer;
@@ -113,7 +113,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
        private TimestampsAndWatermarks<OUT> eventTimeLogic;
 
        public SourceOperator(
-                       Function<SourceReaderContext, SourceReader<OUT, 
SplitT>> readerFactory,
+                       FunctionWithException<SourceReaderContext, 
SourceReader<OUT, SplitT>, Exception> readerFactory,
                        OperatorEventGateway operatorEventGateway,
                        SimpleVersionedSerializer<SplitT> splitSerializer,
                        WatermarkStrategy<OUT> watermarkStrategy,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 1d7aed7..ac7b031 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -31,8 +31,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
-
-import java.util.function.Function;
+import org.apache.flink.util.function.FunctionWithException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -114,7 +113,7 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
         */
        @SuppressWarnings("unchecked")
        private static <T, SplitT extends SourceSplit> SourceOperator<T, 
SplitT> instantiateSourceOperator(
-                       Function<SourceReaderContext, SourceReader<T, ?>> 
readerFactory,
+                       FunctionWithException<SourceReaderContext, 
SourceReader<T, ?>, Exception> readerFactory,
                        OperatorEventGateway eventGateway,
                        SimpleVersionedSerializer<?> splitSerializer,
                        WatermarkStrategy<T> watermarkStrategy,
@@ -123,8 +122,9 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
                        String localHostName) {
 
                // jumping through generics hoops: cast the generics away to 
then cast them back more strictly typed
-               final Function<SourceReaderContext, SourceReader<T, SplitT>> 
typedReaderFactory =
-                               (Function<SourceReaderContext, SourceReader<T, 
SplitT>>) (Function<?, ?>) readerFactory;
+               final FunctionWithException<SourceReaderContext, 
SourceReader<T, SplitT>, Exception> typedReaderFactory =
+                               (FunctionWithException<SourceReaderContext, 
SourceReader<T, SplitT>, Exception>)
+                               (FunctionWithException<?, ?, ?>) readerFactory;
 
                final SimpleVersionedSerializer<SplitT> typedSplitSerializer = 
(SimpleVersionedSerializer<SplitT>) splitSerializer;
 

Reply via email to