AHeise commented on a change in pull request #17536:
URL: https://github.com/apache/flink/pull/17536#discussion_r735867252



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java
##########
@@ -65,21 +68,55 @@ protected void recoveredCommittables(List<StateT> 
recovered) throws IOException
         return all;
     }
 
+    protected final Collection<StateT> commitAndReturnSuccess(List<StateT> 
committables)
+            throws IOException, InterruptedException {
+        Collection<StateT> failed = commit(committables);
+        if (failed.isEmpty()) {
+            return committables;
+        }
+        // Assume that (Global)Committer#commit does not create a new instance 
for failed
+        // committables. This assumption is documented in the respective 
JavaDoc.
+        Set<StateT> successful =
+                Collections.newSetFromMap(new 
IdentityHashMap<>(committables.size()));
+        successful.addAll(committables);
+        successful.removeAll(failed);
+        return successful;
+    }
+
+    protected final Collection<StateT> commit(List<StateT> committables)
+            throws IOException, InterruptedException {
+        List<StateT> failed = commitInternal(committables);
+        recoveredCommittables(failed);
+        return failed;
+    }
+
+    /**
+     * Commits a list of committables.
+     *
+     * @param committables A list of committables that is ready for committing.
+     * @return A list of committables needed to re-commit.
+     */
+    abstract List<StateT> commitInternal(List<StateT> committables)
+            throws IOException, InterruptedException;
+
     @Override
     public boolean needsRetry() {
         return !recoveredCommittables.isEmpty();
     }
 
     @Override
-    public void retry() throws IOException, InterruptedException {
-        retry(prependRecoveredCommittables(Collections.emptyList()));
+    public Collection<CommT> retry() throws IOException, InterruptedException {
+        return retry(prependRecoveredCommittables(Collections.emptyList()));
     }
 
-    protected abstract void retry(List<StateT> recoveredCommittables)
-            throws IOException, InterruptedException;
+    protected Collection<CommT> retry(List<StateT> recoveredCommittables)

Review comment:
       That's unfortunately not that easy because of the global committers: 
Currently all committers are emitting `CommT` and not `GlobalCommT` anymore 
after this refactor. This is possible because in fact the global committers are 
not emitting anything.
   Now `commitAndReturnSuccess` is working on the internal type (`GlobalCommT` 
in case of global committers). Hence, the signature is conflicting here.
   We could create mix-ins interfaces for non-global and global committers 
where we can implement them. The question is if that's simpler. We could also 
re-introduce an emit type to `CommitterHandler`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to