AHeise commented on a change in pull request #17536:
URL: https://github.com/apache/flink/pull/17536#discussion_r735867252
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java
##########
@@ -65,21 +68,55 @@ protected void recoveredCommittables(List<StateT>
recovered) throws IOException
return all;
}
+ protected final Collection<StateT> commitAndReturnSuccess(List<StateT>
committables)
+ throws IOException, InterruptedException {
+ Collection<StateT> failed = commit(committables);
+ if (failed.isEmpty()) {
+ return committables;
+ }
+ // Assume that (Global)Committer#commit does not create a new instance
for failed
+ // committables. This assumption is documented in the respective
JavaDoc.
+ Set<StateT> successful =
+ Collections.newSetFromMap(new
IdentityHashMap<>(committables.size()));
+ successful.addAll(committables);
+ successful.removeAll(failed);
+ return successful;
+ }
+
+ protected final Collection<StateT> commit(List<StateT> committables)
+ throws IOException, InterruptedException {
+ List<StateT> failed = commitInternal(committables);
+ recoveredCommittables(failed);
+ return failed;
+ }
+
+ /**
+ * Commits a list of committables.
+ *
+ * @param committables A list of committables that is ready for committing.
+ * @return A list of committables needed to re-commit.
+ */
+ abstract List<StateT> commitInternal(List<StateT> committables)
+ throws IOException, InterruptedException;
+
@Override
public boolean needsRetry() {
return !recoveredCommittables.isEmpty();
}
@Override
- public void retry() throws IOException, InterruptedException {
- retry(prependRecoveredCommittables(Collections.emptyList()));
+ public Collection<CommT> retry() throws IOException, InterruptedException {
+ return retry(prependRecoveredCommittables(Collections.emptyList()));
}
- protected abstract void retry(List<StateT> recoveredCommittables)
- throws IOException, InterruptedException;
+ protected Collection<CommT> retry(List<StateT> recoveredCommittables)
Review comment:
That's unfortunately not that easy because of the global committers:
Currently all committers are emitting `CommT` and not `GlobalCommT` anymore
after this refactor. This is possible because in fact the global committers are
not emitting anything.
Now `commitAndReturnSuccess` is working on the internal type (`GlobalCommT`
in case of global committers). Hence, the signature is conflicting here.
We could create mix-ins interfaces for non-global and global committers
where we can implement them. The question is if that's simpler. We could also
re-introduce an emit type to `CommitterHandler`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]