fapaul commented on code in PR #25547:
URL: https://github.com/apache/flink/pull/25547#discussion_r1818576013
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java:
##########
@@ -39,11 +39,21 @@ public class CommittableSummary<CommT> implements
CommittableMessage<CommT> {
private final long checkpointId;
/** The number of committables coming from the given subtask in the
particular checkpoint. */
private final int numberOfCommittables;
+
+ @Deprecated
/** The number of committables that have not been successfully committed.
*/
private final int numberOfPendingCommittables;
+
+ @Deprecated
/** The number of committables that are not retried and have been failed.
*/
private final int numberOfFailedCommittables;
Review Comment:
Why do we deprecate failed committables? I still see some value to provide
the possibility to discard unrecoverable committables. The change also looks
unrelated to the retry mechanism.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java:
##########
@@ -164,41 +165,37 @@ public void notifyCheckpointComplete(long checkpointId)
throws Exception {
private void commitAndEmitCheckpoints() throws IOException,
InterruptedException {
long completedCheckpointId = endInput ? EOI :
lastCompletedCheckpointId;
- do {
- for (CheckpointCommittableManager<CommT> manager :
-
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
- commitAndEmit(manager);
- }
- // !committableCollector.isFinished() indicates that we should
retry
- // Retry should be done here if this is a final checkpoint
(indicated by endInput)
- // WARN: this is an endless retry, may make the job stuck while
finishing
- } while (!committableCollector.isFinished() && endInput);
-
- if (!committableCollector.isFinished()) {
- // if not endInput, we can schedule retrying later
- retryWithDelay();
+ for (CheckpointCommittableManager<CommT> checkpointManager :
+
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
+ // ensure that all committables of the first checkpoint are fully
committed before
+ // attempting the next committable
+ commitAndEmit(checkpointManager);
+ committableCollector.remove(checkpointManager);
}
- committableCollector.compact();
}
private void commitAndEmit(CheckpointCommittableManager<CommT>
committableManager)
throws IOException, InterruptedException {
- Collection<CommittableWithLineage<CommT>> committed =
committableManager.commit(committer);
- if (emitDownstream && committableManager.isFinished()) {
- int subtaskId =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
- int numberOfSubtasks =
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
- output.collect(
- new
StreamRecord<>(committableManager.getSummary(subtaskId, numberOfSubtasks)));
- for (CommittableWithLineage<CommT> committable : committed) {
- output.collect(new
StreamRecord<>(committable.withSubtaskId(subtaskId)));
- }
+ committableManager.commit(committer, MAX_RETRIES);
+ if (emitDownstream) {
+ emit(committableManager);
}
}
- private void retryWithDelay() {
- processingTimeService.registerTimer(
- processingTimeService.getCurrentProcessingTime() + RETRY_DELAY,
- ts -> commitAndEmitCheckpoints());
+ private void emit(CheckpointCommittableManager<CommT> committableManager) {
+ int subtaskId =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+ int numberOfSubtasks =
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
Review Comment:
Nit: I know this wasn't introduced by this PR but why do we need to fetch
the subtask id and number of tasks on every `emit` call?
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java:
##########
@@ -95,6 +94,7 @@
public class GlobalCommitterOperator<CommT, GlobalCommT> extends
AbstractStreamOperator<Void>
implements OneInputStreamOperator<CommittableMessage<CommT>, Void> {
+ private static final int MAX_RETRIES = 10;
Review Comment:
IMO we should make this a Flink config specific to the sink and allow users
to opt-out with the failed committable mechanism.
Currently the number is duplicated in committer and global committer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]