AHeise commented on a change in pull request #17536:
URL: https://github.com/apache/flink/pull/17536#discussion_r735868485



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
##########
@@ -21,33 +21,44 @@
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Retries the committables of a {@link CommitterHandler} until all 
committables are eventually
  * committed.
  */
-class CommitRetrier {
+class CommitRetrier<CommT> {
+    @VisibleForTesting static final int RETRY_DELAY = 1000;
     private final ProcessingTimeService processingTimeService;
-    private final CommitterHandler<?> committerHandler;
+    private final CommitterHandler<CommT> committerHandler;
+    private final ThrowingConsumer<? super Collection<CommT>, IOException> 
committableConsumer;
     private final Clock clock;
-    @VisibleForTesting static final int RETRY_DELAY = 1000;
 
     public CommitRetrier(
-            ProcessingTimeService processingTimeService, CommitterHandler<?> 
committerHandler) {
-        this(processingTimeService, committerHandler, 
SystemClock.getInstance());
+            ProcessingTimeService processingTimeService,
+            CommitterHandler<CommT> committerHandler,
+            ThrowingConsumer<? super Collection<CommT>, IOException> 
committableConsumer) {

Review comment:
       I used a lambda here as a callback for timer-based retry. Not sure how 
this can be solved differently.
   
   I certainly would try to avoid having the retrier in the emitCommittables - 
that can probably lead to nasty stack exceptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to