fapaul commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r794298730
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
##########
@@ -17,95 +17,173 @@
package org.apache.flink.streaming.runtime.operators.sink;
-import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittables;
+import
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
+import
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
+import
org.apache.flink.streaming.runtime.operators.sink.committables.Committables;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
+import java.util.OptionalLong;
import static org.apache.flink.util.IOUtils.closeAll;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An operator that processes committables of a {@link
org.apache.flink.api.connector.sink.Sink}.
*
- * <p>The operator may be part of a sink pipeline but usually is the last
operator. There are
- * currently two ways this operator is used:
+ * <p>The operator may be part of a sink pipeline, and it always follows
{@link WriterOperator},
+ * which initially outputs the committables.
*
- * <ul>
- * <li>In streaming mode, there is a {@link SinkOperator} with parallelism p
containing {@link
- * org.apache.flink.api.connector.sink.SinkWriter} and {@link
- * org.apache.flink.api.connector.sink.Committer} and this operator
containing the {@link
- * org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism
1.
- * <li>In batch mode, there is a {@link SinkOperator} with parallelism p
containing {@link
- * org.apache.flink.api.connector.sink.SinkWriter} and this operator
containing the {@link
- * org.apache.flink.api.connector.sink.Committer} and {@link
- * org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism
1.
- * </ul>
- *
- * @param <InputT> the type of the committable
- * @param <OutputT> the type of the committable to send to downstream operators
+ * @param <CommT> the type of the committable
*/
-class CommitterOperator<InputT, OutputT> extends AbstractStreamOperator<byte[]>
- implements OneInputStreamOperator<byte[], byte[]>, BoundedOneInput {
+class CommitterOperator<CommT> extends
AbstractStreamOperator<CommittableMessage<CommT>>
+ implements OneInputStreamOperator<CommittableMessage<CommT>,
CommittableMessage<CommT>>,
+ BoundedOneInput {
+
+ private static final long RETRY_DELAY = 1000;
+ private final SimpleVersionedSerializer<CommT> committableSerializer;
+ private final Committer<CommT> committer;
+ private final boolean emitDownstream;
+ private CommittableCollector<CommT> committableCollector;
+ private long lastCompletedCheckpointId = -1;
- private final SimpleVersionedSerializer<InputT> inputSerializer;
- private final CommitterHandler<InputT, OutputT> committerHandler;
- private final CommitRetrier commitRetrier;
+ /** The operator's state descriptor. */
+ private static final ListStateDescriptor<byte[]>
STREAMING_COMMITTER_RAW_STATES_DESC =
+ new ListStateDescriptor<>(
+ "streaming_committer_raw_states",
BytePrimitiveArraySerializer.INSTANCE);
+
+ /** The operator's state. */
+ private ListState<CommittableCollector<CommT>> committableCollectorState;
public CommitterOperator(
ProcessingTimeService processingTimeService,
- SimpleVersionedSerializer<InputT> inputSerializer,
- CommitterHandler<InputT, OutputT> committerHandler) {
- this.inputSerializer = checkNotNull(inputSerializer);
- this.committerHandler = checkNotNull(committerHandler);
- this.processingTimeService = processingTimeService;
- this.commitRetrier = new CommitRetrier(processingTimeService,
committerHandler);
+ SimpleVersionedSerializer<CommT> committableSerializer,
+ Committer<CommT> committer,
+ boolean emitDownstream) {
+ this.emitDownstream = emitDownstream;
+ this.processingTimeService = checkNotNull(processingTimeService);
+ this.committableSerializer = checkNotNull(committableSerializer);
+ this.committer = checkNotNull(committer);
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<CommittableMessage<CommT>>> output) {
+ super.setup(containingTask, config, output);
+ committableCollector = CommittableCollector.of(getRuntimeContext());
}
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- committerHandler.initializeState(context);
- // try to re-commit recovered transactions as quickly as possible
- commitRetrier.retryWithDelay();
+ committableCollectorState =
+ new SimpleVersionedListState<>(
+ context.getOperatorStateStore()
+
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+ new
CommittableCollectorSerializer<>(committableSerializer));
+
+ if (context.isRestored()) {
+ committableCollectorState.get().forEach(cc ->
committableCollector.merge(cc));
+ lastCompletedCheckpointId =
context.getRestoredCheckpointId().getAsLong();
+ // try to re-commit recovered transactions as quickly as possible
+ commitAndEmitCheckpoints();
+ }
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
- committerHandler.snapshotState(context);
+ // It is important to copy the collector to not mutate the state.
+
committableCollectorState.update(Collections.singletonList(committableCollector.copy()));
}
@Override
public void endInput() throws Exception {
- committerHandler.endOfInput();
- commitRetrier.retryIndefinitely();
+ Collection<? extends Committables<CommT>> endOfInputCommittables =
+ committableCollector.getEndOfInputCommittables();
+ // indicates batch
+ if (endOfInputCommittables != null) {
+ do {
+ for (Committables<CommT> endOfInputCommittable :
endOfInputCommittables) {
+ commitAndEmit(endOfInputCommittable, false);
+ }
+ } while (!committableCollector.isFinished());
+ }
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
- committerHandler.notifyCheckpointCompleted(checkpointId);
- commitRetrier.retryWithDelay();
+ lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId,
checkpointId);
+ commitAndEmitCheckpoints();
+ }
+
+ private void commitAndEmitCheckpoints() throws IOException,
InterruptedException {
+ for (CheckpointCommittables<CommT> checkpoint :
+
committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
+ // wait for all committables of the current checkpoint before
submission
Review comment:
I guess this one is related to the `fullyReceived` discussion. The idea
is that we do not want to send out committables of the latest checkpoint if we
haven't received all of them. Does this make sense?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]