fapaul commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r697377025
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java
##########
@@ -19,18 +19,58 @@
import org.apache.flink.util.function.SupplierWithException;
+import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
-abstract class AbstractCommitterHandler<InputT, OutputT>
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+abstract class AbstractCommitterHandler<InputT, OutputT, RecoverT>
implements CommitterHandler<InputT, OutputT> {
/** Record all the committables until commit. */
private final Deque<InputT> committables = new ArrayDeque<>();
+ /** The committables that need to be committed again after recovering from
a failover. */
+ private final List<RecoverT> recoveredCommittables = new ArrayList<>();
Review comment:
I see the risk that this list might increase constantly and cause an
suprising OOM error. If a lot of committables fail the periodic commit can be
too slow.
I think we either need to guard somehow against and warn the user or in
addition commit the retried committables also as part of the checkpoint.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]