This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 48a90c73ba9fefdccf6d83ccdbb90295b587a42c Author: Arvid Heise <[email protected]> AuthorDate: Wed Sep 9 08:54:29 2020 +0200 [FLINK-19338][connectors/common] Remove null-check from SourceCoordinatorContext#unregisterSourceReader. If an error happens during startup, the reader may not be registered (yet), but cleanup is triggered anyways. --- .../flink/runtime/source/coordinator/SourceCoordinatorContext.java | 4 +--- .../runtime/source/coordinator/SourceCoordinatorContextTest.java | 5 +++++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 3ed0ab5..8bb46d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -34,7 +34,6 @@ import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.Preconditions; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -280,8 +279,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> * @param subtaskId the subtask id of the source reader. */ void unregisterSourceReader(int subtaskId) { - Preconditions.checkNotNull(registeredReaders.remove(subtaskId), String.format( - "Failed to unregister source reader of id %s because it is not registered.", subtaskId)); + registeredReaders.remove(subtaskId); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java index 70550ae..1228d85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java @@ -70,6 +70,11 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase { } @Test + public void testUnregisterUnregisteredReader() { + context.unregisterSourceReader(0); + } + + @Test public void testAssignSplitsFromCoordinatorExecutor() throws Exception { testAssignSplits(true); }
