This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 36136bc [FLINK-26723][runtime]fix the error message thrown by
SourceCoordinatorContext
36136bc is described below
commit 36136bc2bc33f0dc0add1303af949f681a7e42cd
Author: zoucao <[email protected]>
AuthorDate: Sat Mar 19 00:45:19 2022 +0800
[FLINK-26723][runtime]fix the error message thrown by
SourceCoordinatorContext
---
.../source/coordinator/SourceCoordinatorContext.java | 19 +++++++++++--------
.../coordinator/SourceCoordinatorContextTest.java | 2 +-
2 files changed, 12 insertions(+), 9 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index c07f62d..3c05fe4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -178,14 +178,17 @@ public class SourceCoordinatorContext<SplitT extends
SourceSplit>
callInCoordinatorThread(
() -> {
// Ensure all the subtasks in the assignment have
registered.
- for (Integer subtaskId : assignment.assignment().keySet())
{
- if (!registeredReaders.containsKey(subtaskId)) {
- throw new IllegalArgumentException(
- String.format(
- "Cannot assign splits %s to
subtask %d because the subtask is not registered.",
- registeredReaders.get(subtaskId),
subtaskId));
- }
- }
+ assignment
+ .assignment()
+ .forEach(
+ (id, splits) -> {
+ if
(!registeredReaders.containsKey(id)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot assign
splits %s to subtask %d because the subtask is not registered.",
+ splits, id));
+ }
+ });
assignmentTracker.recordSplitAssignment(assignment);
assignment
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index d15f28e..380e65c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -153,7 +153,7 @@ public class SourceCoordinatorContextTest extends
SourceCoordinatorTestBase {
}
},
"assignSplits() should fail to assign the splits to a reader
that is not registered.",
- "Cannot assign splits");
+ "Cannot assign splits " +
splitsAssignment.assignment().get(0));
}
@Test