gaoyunhaii commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r794322437



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Manages the committables coming from one subtask. */
+class SubtaskCommittableManager<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittableManager(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittableManager(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {
+        add(committable.getCommittable());
+    }
+
+    void add(CommT committable) {
+        checkState(
+                requests.size() != summary.getNumberOfCommittables(),

Review comment:
       Might change `!=` to `<` ?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across 
checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    /** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. 
*/
+    private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>>
+            checkpointCommittables;
+    /** Denotes the subtask id the collector is running. */
+    private final int subtaskId;

Review comment:
       As a whole, perhaps we could remove `subtaskId` and `numberOfSubtasks` 
from the commit states?
   
   Currently it seems the `subtaskId` and `numberOfSubtasks` is mainly used to 
create the summary message, and the two values should refers to the index and 
parallelism of the current committer operator. We might pass the two fields 
when committing. This would also avoid the case we ignore the two fields (like 
set them with 0 and 1). 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
##########
@@ -17,95 +17,173 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
-import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.OptionalLong;
 
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An operator that processes committables of a {@link 
org.apache.flink.api.connector.sink.Sink}.
  *
- * <p>The operator may be part of a sink pipeline but usually is the last 
operator. There are
- * currently two ways this operator is used:
+ * <p>The operator may be part of a sink pipeline, and it always follows 
{@link SinkWriterOperator},
+ * which initially outputs the committables.
  *
- * <ul>
- *   <li>In streaming mode, there is a {@link SinkOperator} with parallelism p 
containing {@link
- *       org.apache.flink.api.connector.sink.SinkWriter} and {@link
- *       org.apache.flink.api.connector.sink.Committer} and this operator 
containing the {@link
- *       org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 
1.
- *   <li>In batch mode, there is a {@link SinkOperator} with parallelism p 
containing {@link
- *       org.apache.flink.api.connector.sink.SinkWriter} and this operator 
containing the {@link
- *       org.apache.flink.api.connector.sink.Committer} and {@link
- *       org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 
1.
- * </ul>
- *
- * @param <InputT> the type of the committable
- * @param <OutputT> the type of the committable to send to downstream operators
+ * @param <CommT> the type of the committable
  */
-class CommitterOperator<InputT, OutputT> extends AbstractStreamOperator<byte[]>
-        implements OneInputStreamOperator<byte[], byte[]>, BoundedOneInput {
+class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage<CommT>>
+        implements OneInputStreamOperator<CommittableMessage<CommT>, 
CommittableMessage<CommT>>,
+                BoundedOneInput {
+
+    private static final long RETRY_DELAY = 1000;
+    private final SimpleVersionedSerializer<CommT> committableSerializer;
+    private final Committer<CommT> committer;
+    private final boolean emitDownstream;
+    private CommittableCollector<CommT> committableCollector;
+    private long lastCompletedCheckpointId = -1;
 
-    private final SimpleVersionedSerializer<InputT> inputSerializer;
-    private final CommitterHandler<InputT, OutputT> committerHandler;
-    private final CommitRetrier commitRetrier;
+    /** The operator's state descriptor. */
+    private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    /** The operator's state. */
+    private ListState<CommittableCollector<CommT>> committableCollectorState;
 
     public CommitterOperator(
             ProcessingTimeService processingTimeService,
-            SimpleVersionedSerializer<InputT> inputSerializer,
-            CommitterHandler<InputT, OutputT> committerHandler) {
-        this.inputSerializer = checkNotNull(inputSerializer);
-        this.committerHandler = checkNotNull(committerHandler);
-        this.processingTimeService = processingTimeService;
-        this.commitRetrier = new CommitRetrier(processingTimeService, 
committerHandler);
+            SimpleVersionedSerializer<CommT> committableSerializer,
+            Committer<CommT> committer,
+            boolean emitDownstream) {
+        this.emitDownstream = emitDownstream;
+        this.processingTimeService = checkNotNull(processingTimeService);
+        this.committableSerializer = checkNotNull(committableSerializer);
+        this.committer = checkNotNull(committer);
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<CommittableMessage<CommT>>> output) {
+        super.setup(containingTask, config, output);
+        committableCollector = CommittableCollector.of(getRuntimeContext());
     }
 
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        committerHandler.initializeState(context);
-        // try to re-commit recovered transactions as quickly as possible
-        commitRetrier.retryWithDelay();
+        committableCollectorState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+                        new 
CommittableCollectorSerializer<>(committableSerializer));
+
+        if (context.isRestored()) {
+            committableCollectorState.get().forEach(cc -> 
committableCollector.merge(cc));
+            lastCompletedCheckpointId = 
context.getRestoredCheckpointId().getAsLong();
+            // try to re-commit recovered transactions as quickly as possible
+            commitAndEmitCheckpoints();
+        }
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
-        committerHandler.snapshotState(context);
+        // It is important to copy the collector to not mutate the state.
+        
committableCollectorState.update(Collections.singletonList(committableCollector.copy()));
     }
 
     @Override
     public void endInput() throws Exception {
-        committerHandler.endOfInput();
-        commitRetrier.retryIndefinitely();
+        Collection<? extends CommittableManager<CommT>> endOfInputCommittables 
=
+                committableCollector.getEndOfInputCommittables();
+        // indicates batch
+        if (endOfInputCommittables != null) {
+            do {
+                for (CommittableManager<CommT> endOfInputCommittable : 
endOfInputCommittables) {
+                    commitAndEmit(endOfInputCommittable, false);
+                }
+            } while (!committableCollector.isFinished());
+        }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        committerHandler.notifyCheckpointCompleted(checkpointId);
-        commitRetrier.retryWithDelay();
+        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
+        commitAndEmitCheckpoints();
+    }
+
+    private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
+        for (CheckpointCommittableManager<CommT> manager :

Review comment:
       Is it possible we avoid access `CheckpointCommittableManager` in 
`CommitterOperator` ?
   
   Currently the states is organized into three levels, namely `commit state -> 
per-checkpoint state -> per-checkpoint per-subtask state`, perhaps it is better 
to not access the inner level directly. Here we might have a method in 
`CommittableCollector` like
   
   ```
   List<CommittableMessage> commit(int subtaskId, int totalSubtasks);
   ```
   
   that commits all the committables and returns the list of messages to emit. 
   
   This might also related to 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Manages the committables coming from one subtask. */
+class SubtaskCommittableManager<CommT> {
+    private CommittableSummary<CommT> summary;

Review comment:
       Could we directly use a `int numCommittables` or `int numTotal` to 
bookkeep the total number of the committables? 
   
   The only usage for other fields is the `drainCommitted` method below. I'm a 
bit concern on the right `subtaskId` to fill in here: if we want to keep the 
same protocol for the post-committer topology, the subtask id here should be 
the id of the current commit operator subtask? (Assume users might create 
operators in pre-committer topology that ends with a non-forward connection, or 
there is rescale of committer operator after restarting). Thus the subtaskId 
should be updated, and they could be passed from the `CheckpointManager`, which 
could also avoid replicate information between different level of state class~

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across 
checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {

Review comment:
       Is it possible we also change this class to `CommittableManager`? Since 
`Collector` seems to already have another meaning (something like output) in 
Flink project and also it would be consistency with the other two levels. 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Manages the committables coming from one subtask. */
+class SubtaskCommittableManager<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittableManager(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittableManager(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {
+        add(committable.getCommittable());
+    }
+
+    void add(CommT committable) {
+        checkState(
+                requests.size() != summary.getNumberOfCommittables(),
+                "Already received all committables.");
+        requests.add(new CommitRequestImpl<>(committable));
+    }
+
+    /**
+     * Returns whether the received number of committables matches the 
expected number.
+     *
+     * @return boolean

Review comment:
       The message might need update or remove, also similar to the following 
methods~

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Internal wrapper to handle the committing of committables.
+ *
+ * @param <CommT> type of the committable
+ */
+@Internal
+public interface CommittableManager<CommT> {

Review comment:
       Currently it seems `CommittableManager` and 
`CheckpointCommittableManager` has only one implementation?
   
   Since they are all internal classes, perhaps we do not need to extract the 
inerfaces here? We might have only one concrete class 
`CheckpointCommittableManager` that manages the states of the per-checkpoint.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across 
checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    /** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. 
*/
+    private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>>
+            checkpointCommittables;
+    /** Denotes the subtask id the collector is running. */
+    private final int subtaskId;
+
+    private final int numberOfSubtasks;
+
+    CommittableCollector(int subtaskId, int numberOfSubtasks) {
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointCommittables = new TreeMap<>();
+    }
+
+    /**
+     * For deserialization we can ignore the subtaskId and number of subtasks 
because it is going to
+     * be merged with another collector that overwrites these values.
+     */
+    CommittableCollector(
+            Map<Long, CheckpointCommittableManagerImpl<CommT>> 
checkpointCommittables) {
+        this(checkpointCommittables, 0, 1);
+    }
+
+    /** For deep-copy. */
+    CommittableCollector(
+            Map<Long, CheckpointCommittableManagerImpl<CommT>> 
checkpointCommittables,
+            int subtaskId,
+            int numberOfSubtasks) {
+        this.checkpointCommittables = new 
TreeMap<>(checkNotNull(checkpointCommittables));
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} based on the current runtime 
information. This method
+     * should be used for to instantiate a collector for all Sink V2.
+     *
+     * @param context holding runtime of information
+     * @param <CommT> type of the committable
+     * @return {@link CommittableCollector}
+     */
+    public static <CommT> CommittableCollector<CommT> of(RuntimeContext 
context) {
+        return new CommittableCollector<>(
+                context.getIndexOfThisSubtask(), 
context.getNumberOfParallelSubtasks());
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} for a list of committables. This 
method is mainly used
+     * to create a collector from the state of Sink V1.
+     *
+     * @param r list of committables
+     * @param <CommT> type of committables
+     * @return {@link CommittableCollector}
+     */
+    static <CommT> CommittableCollector<CommT> ofLegacy(List<CommT> r) {
+        CommittableCollector<CommT> committableCollector = new 
CommittableCollector<>(0, 1);
+        // add a checkpoint with the lowest checkpoint id, this will be merged 
into the next
+        // checkpoint data, subtask id is arbitrary
+        CommittableSummary<CommT> summary =
+                new CommittableSummary<>(
+                        0, 1, InitContext.INITIAL_CHECKPOINT_ID, r.size(), 
r.size(), 0);
+        committableCollector.addSummary(summary);
+        SubtaskCommittableManager<CommT> subtask =

Review comment:
       Similarly, perhaps we could also create the `CommittableWithLinearge` 
message and use `addMessage` to add them, then we could avoid directly access 
of the `SubtaskCommittableManager`, which helps to reduce the complexity of the 
interfaces between different levels. 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableManager<CommT> {
+    /** Mapping of subtask id to {@link SubtaskCommittableManager}. */
+    private final Map<Integer, SubtaskCommittableManager<CommT>> 
subtasksCommittableManagers;
+
+    private final CommittableCollector<CommT> collector;

Review comment:
       We could also remove the reverse dependency if we do not need to 
maintain the `subtaskId` and the `numberOfTasks`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to