This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 36136bc  [FLINK-26723][runtime]fix the error message thrown by 
SourceCoordinatorContext
36136bc is described below

commit 36136bc2bc33f0dc0add1303af949f681a7e42cd
Author: zoucao <[email protected]>
AuthorDate: Sat Mar 19 00:45:19 2022 +0800

    [FLINK-26723][runtime]fix the error message thrown by 
SourceCoordinatorContext
---
 .../source/coordinator/SourceCoordinatorContext.java  | 19 +++++++++++--------
 .../coordinator/SourceCoordinatorContextTest.java     |  2 +-
 2 files changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index c07f62d..3c05fe4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -178,14 +178,17 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
         callInCoordinatorThread(
                 () -> {
                     // Ensure all the subtasks in the assignment have 
registered.
-                    for (Integer subtaskId : assignment.assignment().keySet()) 
{
-                        if (!registeredReaders.containsKey(subtaskId)) {
-                            throw new IllegalArgumentException(
-                                    String.format(
-                                            "Cannot assign splits %s to 
subtask %d because the subtask is not registered.",
-                                            registeredReaders.get(subtaskId), 
subtaskId));
-                        }
-                    }
+                    assignment
+                            .assignment()
+                            .forEach(
+                                    (id, splits) -> {
+                                        if 
(!registeredReaders.containsKey(id)) {
+                                            throw new IllegalArgumentException(
+                                                    String.format(
+                                                            "Cannot assign 
splits %s to subtask %d because the subtask is not registered.",
+                                                            splits, id));
+                                        }
+                                    });
 
                     assignmentTracker.recordSplitAssignment(assignment);
                     assignment
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index d15f28e..380e65c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -153,7 +153,7 @@ public class SourceCoordinatorContextTest extends 
SourceCoordinatorTestBase {
                     }
                 },
                 "assignSplits() should fail to assign the splits to a reader 
that is not registered.",
-                "Cannot assign splits");
+                "Cannot assign splits " + 
splitsAssignment.assignment().get(0));
     }
 
     @Test

Reply via email to