This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 48a90c73ba9fefdccf6d83ccdbb90295b587a42c
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Sep 9 08:54:29 2020 +0200

    [FLINK-19338][connectors/common] Remove null-check from 
SourceCoordinatorContext#unregisterSourceReader.
    
    If an error happens during startup, the reader may not be registered (yet), 
but cleanup is triggered anyways.
---
 .../flink/runtime/source/coordinator/SourceCoordinatorContext.java   | 4 +---
 .../runtime/source/coordinator/SourceCoordinatorContextTest.java     | 5 +++++
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 3ed0ab5..8bb46d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -280,8 +279,7 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
         * @param subtaskId the subtask id of the source reader.
         */
        void unregisterSourceReader(int subtaskId) {
-               Preconditions.checkNotNull(registeredReaders.remove(subtaskId), 
String.format(
-                               "Failed to unregister source reader of id %s 
because it is not registered.", subtaskId));
+               registeredReaders.remove(subtaskId);
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 70550ae..1228d85 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -70,6 +70,11 @@ public class SourceCoordinatorContextTest extends 
SourceCoordinatorTestBase {
        }
 
        @Test
+       public void testUnregisterUnregisteredReader() {
+               context.unregisterSourceReader(0);
+       }
+
+       @Test
        public void testAssignSplitsFromCoordinatorExecutor() throws Exception {
                testAssignSplits(true);
        }

Reply via email to