AHeise commented on a change in pull request #17536: URL: https://github.com/apache/flink/pull/17536#discussion_r735867252
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java ########## @@ -65,21 +68,55 @@ protected void recoveredCommittables(List<StateT> recovered) throws IOException return all; } + protected final Collection<StateT> commitAndReturnSuccess(List<StateT> committables) + throws IOException, InterruptedException { + Collection<StateT> failed = commit(committables); + if (failed.isEmpty()) { + return committables; + } + // Assume that (Global)Committer#commit does not create a new instance for failed + // committables. This assumption is documented in the respective JavaDoc. + Set<StateT> successful = + Collections.newSetFromMap(new IdentityHashMap<>(committables.size())); + successful.addAll(committables); + successful.removeAll(failed); + return successful; + } + + protected final Collection<StateT> commit(List<StateT> committables) + throws IOException, InterruptedException { + List<StateT> failed = commitInternal(committables); + recoveredCommittables(failed); + return failed; + } + + /** + * Commits a list of committables. + * + * @param committables A list of committables that is ready for committing. + * @return A list of committables needed to re-commit. + */ + abstract List<StateT> commitInternal(List<StateT> committables) + throws IOException, InterruptedException; + @Override public boolean needsRetry() { return !recoveredCommittables.isEmpty(); } @Override - public void retry() throws IOException, InterruptedException { - retry(prependRecoveredCommittables(Collections.emptyList())); + public Collection<CommT> retry() throws IOException, InterruptedException { + return retry(prependRecoveredCommittables(Collections.emptyList())); } - protected abstract void retry(List<StateT> recoveredCommittables) - throws IOException, InterruptedException; + protected Collection<CommT> retry(List<StateT> recoveredCommittables) Review comment: That's unfortunately not that easy because of the global committers: Currently all committers are emitting `CommT` and not `GlobalCommT` anymore after this refactor. This is possible because in fact the global committers are not emitting anything. Now `commitAndReturnSuccess` is working on the internal type (`GlobalCommT` in case of global committers). Hence, the signature is conflicting here. We could create mix-ins interfaces for non-global and global committers where we can implement them. The question is if that's simpler. We could also re-introduce an emit type to `CommitterHandler`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org