fapaul commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r795720433



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
##########
@@ -17,95 +17,173 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
-import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.OptionalLong;
 
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An operator that processes committables of a {@link 
org.apache.flink.api.connector.sink.Sink}.
  *
- * <p>The operator may be part of a sink pipeline but usually is the last 
operator. There are
- * currently two ways this operator is used:
+ * <p>The operator may be part of a sink pipeline, and it always follows 
{@link SinkWriterOperator},
+ * which initially outputs the committables.
  *
- * <ul>
- *   <li>In streaming mode, there is a {@link SinkOperator} with parallelism p 
containing {@link
- *       org.apache.flink.api.connector.sink.SinkWriter} and {@link
- *       org.apache.flink.api.connector.sink.Committer} and this operator 
containing the {@link
- *       org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 
1.
- *   <li>In batch mode, there is a {@link SinkOperator} with parallelism p 
containing {@link
- *       org.apache.flink.api.connector.sink.SinkWriter} and this operator 
containing the {@link
- *       org.apache.flink.api.connector.sink.Committer} and {@link
- *       org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 
1.
- * </ul>
- *
- * @param <InputT> the type of the committable
- * @param <OutputT> the type of the committable to send to downstream operators
+ * @param <CommT> the type of the committable
  */
-class CommitterOperator<InputT, OutputT> extends AbstractStreamOperator<byte[]>
-        implements OneInputStreamOperator<byte[], byte[]>, BoundedOneInput {
+class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage<CommT>>
+        implements OneInputStreamOperator<CommittableMessage<CommT>, 
CommittableMessage<CommT>>,
+                BoundedOneInput {
+
+    private static final long RETRY_DELAY = 1000;
+    private final SimpleVersionedSerializer<CommT> committableSerializer;
+    private final Committer<CommT> committer;
+    private final boolean emitDownstream;
+    private CommittableCollector<CommT> committableCollector;
+    private long lastCompletedCheckpointId = -1;
 
-    private final SimpleVersionedSerializer<InputT> inputSerializer;
-    private final CommitterHandler<InputT, OutputT> committerHandler;
-    private final CommitRetrier commitRetrier;
+    /** The operator's state descriptor. */
+    private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    /** The operator's state. */
+    private ListState<CommittableCollector<CommT>> committableCollectorState;
 
     public CommitterOperator(
             ProcessingTimeService processingTimeService,
-            SimpleVersionedSerializer<InputT> inputSerializer,
-            CommitterHandler<InputT, OutputT> committerHandler) {
-        this.inputSerializer = checkNotNull(inputSerializer);
-        this.committerHandler = checkNotNull(committerHandler);
-        this.processingTimeService = processingTimeService;
-        this.commitRetrier = new CommitRetrier(processingTimeService, 
committerHandler);
+            SimpleVersionedSerializer<CommT> committableSerializer,
+            Committer<CommT> committer,
+            boolean emitDownstream) {
+        this.emitDownstream = emitDownstream;
+        this.processingTimeService = checkNotNull(processingTimeService);
+        this.committableSerializer = checkNotNull(committableSerializer);
+        this.committer = checkNotNull(committer);
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<CommittableMessage<CommT>>> output) {
+        super.setup(containingTask, config, output);
+        committableCollector = CommittableCollector.of(getRuntimeContext());
     }
 
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        committerHandler.initializeState(context);
-        // try to re-commit recovered transactions as quickly as possible
-        commitRetrier.retryWithDelay();
+        committableCollectorState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+                        new 
CommittableCollectorSerializer<>(committableSerializer));
+
+        if (context.isRestored()) {
+            committableCollectorState.get().forEach(cc -> 
committableCollector.merge(cc));
+            lastCompletedCheckpointId = 
context.getRestoredCheckpointId().getAsLong();
+            // try to re-commit recovered transactions as quickly as possible
+            commitAndEmitCheckpoints();
+        }
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
-        committerHandler.snapshotState(context);
+        // It is important to copy the collector to not mutate the state.
+        
committableCollectorState.update(Collections.singletonList(committableCollector.copy()));
     }
 
     @Override
     public void endInput() throws Exception {
-        committerHandler.endOfInput();
-        commitRetrier.retryIndefinitely();
+        Collection<? extends CommittableManager<CommT>> endOfInputCommittables 
=
+                committableCollector.getEndOfInputCommittables();
+        // indicates batch
+        if (endOfInputCommittables != null) {
+            do {
+                for (CommittableManager<CommT> endOfInputCommittable : 
endOfInputCommittables) {
+                    commitAndEmit(endOfInputCommittable, false);
+                }
+            } while (!committableCollector.isFinished());
+        }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        committerHandler.notifyCheckpointCompleted(checkpointId);
-        commitRetrier.retryWithDelay();
+        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
+        commitAndEmitCheckpoints();
+    }
+
+    private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
+        for (CheckpointCommittableManager<CommT> manager :

Review comment:
       > Thanks @fapaul for the explanation! I think perhaps 
`CommittableSummary` is also the subclass of the the `CommittableMessage` and 
it should be able to be mixed in the returned list? And passing `committer` is 
indeed required, but it seems fine from my side~
   > 
   Good point. To be honest I like the current abstraction that we have one 
central place to coordinate the additions and removals of the 
`CommittableMessage`: the `CommittableCollector` (I am open for a different 
name). In addition, we have two different interfaces to actually interact with 
the committables one is `CommittableManager` that is relevant to commit 
committables in batch execution mode, and `CheckpointCommittableManager` is 
used in streaming execution mode to commit committables.
   
   WDYT?
   
   > Also may I have a double confirmation that It seems with the current 
implementation we'll emit `CommittableSummary` whenever we commit some 
committables instead of only when checkpointing? It seems to me it is to deal 
with the case that for unaligned checkpoint, the barrier might jump over all 
the summary and committable messages~ It should be also the reason we support 
receive multiple summary from one subtask for the same checkpoint, do I 
understand right~?
   
   Yes, you are right considering unaligned checkpoints made it a bit more 
tricky. Do you think it needs more comments somewhere?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to