StephanEwen commented on a change in pull request #14259:
URL: https://github.com/apache/flink/pull/14259#discussion_r532764178
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
##########
@@ -114,15 +113,20 @@ public void
recordSplitAssignment(SplitsAssignment<SplitT> splitsAssignment) {
* if those splits were never assigned. To handle this case, the
coordinator needs to find those
* splits and return them back to the SplitEnumerator for re-assignment.
*
- * @param failedSubtaskId the failed subtask id.
+ * @param subtaskId the subtask id of the reader that failed over.
+ * @param restoredCheckpointId the ID of the checkpoint that the reader
was restored to.
* @return A list of splits that needs to be added back to the {@link
SplitEnumerator}.
*/
- public List<SplitT> getAndRemoveUncheckpointedAssignment(int
failedSubtaskId) {
- List<SplitT> splits = new ArrayList<>();
- assignmentsByCheckpointId.values().forEach(assignments -> {
- removeFromAssignment(failedSubtaskId, assignments,
splits);
- });
- removeFromAssignment(failedSubtaskId,
uncheckpointedAssignments, splits);
+ public List<SplitT> getAndRemoveUncheckpointedAssignment(int subtaskId,
long restoredCheckpointId) {
+ final ArrayList<SplitT> splits = new ArrayList<>();
+
+ for (final Map.Entry<Long, Map<Integer, LinkedHashSet<SplitT>>>
entry : assignmentsByCheckpointId.entrySet()) {
Review comment:
I actually had that code initially, but the `tailMap()`' lower bound key
is inclusive (anything >= checkpointID) and we need a tailmap where the key is
exclusive. Compensating for that makes the code in the end more complex than
this version here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]