JingGe commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r792946618



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
+import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Translates Sink V1 into Sink V2. */
+@Internal
+public class SinkV1Adapter<InputT, CommT, WriterStateT, GlobalCommT> 
implements Sink<InputT> {
+
+    private final org.apache.flink.api.connector.sink.Sink<InputT, CommT, 
WriterStateT, GlobalCommT>
+            sink;
+
+    private SinkV1Adapter(
+            org.apache.flink.api.connector.sink.Sink<InputT, CommT, 
WriterStateT, GlobalCommT>
+                    sink) {
+        this.sink = sink;
+    }
+
+    public static <InputT> Sink<InputT> wrap(
+            org.apache.flink.api.connector.sink.Sink<InputT, ?, ?, ?> sink) {
+        return new SinkV1Adapter<>(sink).asSpecializedSink();
+    }
+
+    @Override
+    public SinkWriterV1Adapter<InputT, CommT, WriterStateT> 
createWriter(InitContext context)
+            throws IOException {
+        org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, 
WriterStateT> writer =
+                sink.createWriter(new InitContextAdapter(context), 
Collections.emptyList());
+        return new SinkWriterV1Adapter<>(writer);
+    }
+
+    public Sink<InputT> asSpecializedSink() {
+        boolean stateful = false;
+        boolean globalCommitter = false;
+        boolean committer = false;
+        if (sink.getWriterStateSerializer().isPresent()) {
+            stateful = true;
+        }
+        if (sink.getGlobalCommittableSerializer().isPresent()) {
+            globalCommitter = true;
+        }
+        if (sink.getCommittableSerializer().isPresent()) {
+            committer = true;
+        }
+
+        if (globalCommitter && committer && stateful) {
+            return new StatefulGlobalTwoPhaseCommittingSinkAdapter();
+        }
+        if (globalCommitter) {
+            return new GlobalCommittingSinkAdapter();
+        }
+        if (committer && stateful) {
+            return new StatefulTwoPhaseCommittingSinkAdapter();
+        }
+        if (committer) {
+            return new TwoPhaseCommittingSinkAdapter();
+        }
+        if (stateful) {
+            return new StatefulSinkAdapter();
+        }
+        return this;
+    }
+
+    private static class SinkWriterV1Adapter<InputT, CommT, WriterStateT>
+            implements StatefulSinkWriter<InputT, WriterStateT>,
+                    PrecommittingSinkWriter<InputT, CommT> {
+
+        private final org.apache.flink.api.connector.sink.SinkWriter<InputT, 
CommT, WriterStateT>
+                writer;
+        private boolean endOfInput = false;
+        private final WriterContextAdapter contextAdapter = new 
WriterContextAdapter();
+
+        public SinkWriterV1Adapter(
+                org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, 
WriterStateT>
+                        writer) {
+            this.writer = writer;
+        }
+
+        @Override
+        public void write(InputT element, Context context)
+                throws IOException, InterruptedException {
+            contextAdapter.setContext(context);
+            this.writer.write(element, contextAdapter);
+        }
+
+        @Override
+        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+            this.endOfInput = endOfInput;
+        }
+
+        @Override
+        public List<WriterStateT> snapshotState(long checkpointId) throws 
IOException {
+            return writer.snapshotState(checkpointId);
+        }
+
+        @Override
+        public Collection<CommT> prepareCommit() throws IOException, 
InterruptedException {
+            return writer.prepareCommit(endOfInput);
+        }
+
+        @Override
+        public void close() throws Exception {
+            writer.close();
+        }
+
+        @Override
+        public void writeWatermark(Watermark watermark) throws IOException, 
InterruptedException {
+            writer.writeWatermark(watermark);
+        }
+    }
+
+    private static class WriterContextAdapter implements SinkWriter.Context {
+        private org.apache.flink.api.connector.sink2.SinkWriter.Context 
context;
+
+        public void 
setContext(org.apache.flink.api.connector.sink2.SinkWriter.Context context) {
+            this.context = context;
+        }
+
+        @Override
+        public long currentWatermark() {
+            return context.currentWatermark();
+        }
+
+        @Override
+        public Long timestamp() {
+            return context.timestamp();
+        }
+    }
+
+    private static class InitContextAdapter
+            implements org.apache.flink.api.connector.sink.Sink.InitContext {
+
+        private final InitContext context;
+
+        public InitContextAdapter(InitContext context) {
+            this.context = context;
+        }
+
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return context.getUserCodeClassLoader();
+        }
+
+        @Override
+        public MailboxExecutor getMailboxExecutor() {
+            return context.getMailboxExecutor();
+        }
+
+        @Override
+        public ProcessingTimeService getProcessingTimeService() {
+            return new 
ProcessingTimeServiceAdapter(context.getProcessingTimeService());
+        }
+
+        @Override
+        public int getSubtaskId() {
+            return context.getSubtaskId();
+        }
+
+        @Override
+        public int getNumberOfParallelSubtasks() {
+            return context.getNumberOfParallelSubtasks();
+        }
+
+        @Override
+        public SinkWriterMetricGroup metricGroup() {
+            return context.metricGroup();
+        }
+
+        @Override
+        public OptionalLong getRestoredCheckpointId() {
+            return context.getRestoredCheckpointId();
+        }
+
+        public InitializationContext 
asSerializationSchemaInitializationContext() {
+            return context.asSerializationSchemaInitializationContext();
+        }
+    }
+
+    private static class ProcessingTimeCallbackAdapter implements 
ProcessingTimeCallback {
+
+        private final ProcessingTimeService.ProcessingTimeCallback 
processingTimerCallback;
+
+        public ProcessingTimeCallbackAdapter(
+                ProcessingTimeService.ProcessingTimeCallback 
processingTimerCallback) {
+            this.processingTimerCallback = processingTimerCallback;
+        }
+
+        @Override
+        public void onProcessingTime(long time) throws IOException, 
InterruptedException {
+            processingTimerCallback.onProcessingTime(time);
+        }
+    }
+
+    private static class ProcessingTimeServiceAdapter implements 
ProcessingTimeService {
+
+        private final 
org.apache.flink.api.common.operators.ProcessingTimeService
+                processingTimeService;
+
+        public ProcessingTimeServiceAdapter(
+                org.apache.flink.api.common.operators.ProcessingTimeService 
processingTimeService) {
+            this.processingTimeService = processingTimeService;
+        }
+
+        @Override
+        public long getCurrentProcessingTime() {
+            return processingTimeService.getCurrentProcessingTime();
+        }
+
+        @Override
+        public void registerProcessingTimer(
+                long time, ProcessingTimeCallback processingTimerCallback) {
+            processingTimeService.registerTimer(
+                    time, new 
ProcessingTimeCallbackAdapter(processingTimerCallback));
+        }
+    }
+
+    private static class CommitterAdapter<CommT> implements Committer<CommT> {
+
+        private final org.apache.flink.api.connector.sink.Committer<CommT> 
committer;
+
+        public 
CommitterAdapter(org.apache.flink.api.connector.sink.Committer<CommT> 
committer) {
+            this.committer = committer;
+        }
+
+        @Override
+        public void commit(Collection<CommitRequest<CommT>> commitRequests)
+                throws IOException, InterruptedException {
+            List<CommT> failed =
+                    committer.commit(
+                            commitRequests.stream()
+                                    .map(CommitRequest::getCommittable)
+                                    .collect(Collectors.toList()));
+            if (!failed.isEmpty()) {
+                Set<CommT> indexed = Collections.newSetFromMap(new 
IdentityHashMap<>());
+                indexed.addAll(failed);
+                commitRequests.stream()
+                        .filter(request -> 
indexed.contains(request.getCommittable()))
+                        .forEach(CommitRequest::retryLater);
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            committer.close();
+        }
+    }
+
+    /** Main class to simulate SinkV1 with SinkV2. */
+    @VisibleForTesting
+    @Internal
+    public class PlainSinkAdapter implements Sink<InputT> {
+        @Override
+        public SinkWriterV1Adapter<InputT, CommT, WriterStateT> 
createWriter(InitContext context)
+                throws IOException {
+            return SinkV1Adapter.this.createWriter(context);
+        }
+
+        @VisibleForTesting

Review comment:
       redundant? There is one at class level.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
##########
@@ -40,32 +45,46 @@
 
     private final PhysicalTransformation<T> transformation;
 
-    @SuppressWarnings("unchecked")
-    protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> 
operator) {
-        this.transformation =
-                (PhysicalTransformation<T>)
-                        new LegacySinkTransformation<>(
-                                inputStream.getTransformation(),
-                                "Unnamed",
-                                operator,
-                                
inputStream.getExecutionEnvironment().getParallelism());
-    }
-
-    @SuppressWarnings("unchecked")
-    protected DataStreamSink(DataStream<T> inputStream, Sink<T, ?, ?, ?> sink) 
{
-        transformation =
-                (PhysicalTransformation<T>)
-                        new SinkTransformation<>(
-                                inputStream.getTransformation(),
-                                sink,
-                                "Unnamed",
-                                
inputStream.getExecutionEnvironment().getParallelism());
+    protected DataStreamSink(PhysicalTransformation<T> transformation) {
+        this.transformation = checkNotNull(transformation);
+    }
+
+    static <T> DataStreamSink<T> forSinkFunction(
+            DataStream<T> inputStream, SinkFunction<T> sinkFunction) {

Review comment:
       Why replace the operator with the sinkFunction, which is planned to 
deprecate? Which means that the modified code will be changed again after two 
releases when the sinkFunction has to be deleted. Will the `StreamSink` be 
deprecated together with `SinkFunction`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/WriterOperator.java
##########
@@ -56,105 +59,64 @@
  * same parallelism or send them downstream to a {@link CommitterOperator} 
with a different
  * parallelism.
  *
- * <p>The operator may be part of a sink pipeline and is the first operator. 
There are currently two
- * ways this operator is used:
- *
- * <ul>
- *   <li>In streaming mode, there is this operator with parallelism p 
containing {@link
- *       org.apache.flink.api.connector.sink.SinkWriter} and {@link
- *       org.apache.flink.api.connector.sink.Committer} and a {@link 
CommitterOperator} containing
- *       the {@link org.apache.flink.api.connector.sink.GlobalCommitter} with 
parallelism 1.
- *   <li>In batch mode, there is this operator with parallelism p containing 
{@link
- *       org.apache.flink.api.connector.sink.SinkWriter} and a {@link 
CommitterOperator} containing
- *       the {@link org.apache.flink.api.connector.sink.Committer} and {@link
- *       org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 
1.
- * </ul>
+ * <p>The operator may be part of a sink pipeline and is the first operator.

Review comment:
       I think it still make sense to add more info about relationship between 
these operators, i.e. WriterOperator and CommitterOperator and the SinkWriter, 
Committer, GlobalCommitter.
   
   "may be" - does it mean there could be a sink pipeline without a 
`WriterOperator` ? Could we describe such scenario a little bit here to let the 
user understand it better.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java
##########
@@ -70,13 +73,15 @@
 
         List<Integer> resultIds = new ArrayList<>();
 
+        StreamExchangeMode exchangeMode = transformation.getExchangeMode();
+        if (!supportsBatchExchange && exchangeMode == 
StreamExchangeMode.BATCH) {
+            exchangeMode = StreamExchangeMode.UNDEFINED;

Review comment:
       hmm, It would be better to add some description to explain:
   1. why `translateForStreamingInternal(...)` is possible to be called with a 
transformation whose exchangeMode is `StreamExchangeMode.BATCH`?
   2. after this modification, a BATCH exchangeMode transformation could become 
a PIPELINED. Will there be any potential issue?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
##########
@@ -40,32 +45,46 @@
 
     private final PhysicalTransformation<T> transformation;
 
-    @SuppressWarnings("unchecked")
-    protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> 
operator) {
-        this.transformation =
-                (PhysicalTransformation<T>)
-                        new LegacySinkTransformation<>(
-                                inputStream.getTransformation(),
-                                "Unnamed",
-                                operator,
-                                
inputStream.getExecutionEnvironment().getParallelism());
-    }
-
-    @SuppressWarnings("unchecked")
-    protected DataStreamSink(DataStream<T> inputStream, Sink<T, ?, ?, ?> sink) 
{
-        transformation =
-                (PhysicalTransformation<T>)
-                        new SinkTransformation<>(
-                                inputStream.getTransformation(),
-                                sink,
-                                "Unnamed",
-                                
inputStream.getExecutionEnvironment().getParallelism());
+    protected DataStreamSink(PhysicalTransformation<T> transformation) {
+        this.transformation = checkNotNull(transformation);
+    }
+
+    static <T> DataStreamSink<T> forSinkFunction(
+            DataStream<T> inputStream, SinkFunction<T> sinkFunction) {
+        StreamSink<T> sinkOperator = new StreamSink<>(sinkFunction);
+        PhysicalTransformation<T> transformation =
+                new LegacySinkTransformation<>(
+                        inputStream.getTransformation(),
+                        "Unnamed",
+                        sinkOperator,
+                        
inputStream.getExecutionEnvironment().getParallelism());
+        inputStream.getExecutionEnvironment().addOperator(transformation);
+        return new DataStreamSink<>(transformation);
+    }
+
+    static <T> DataStreamSink<T> forSink(DataStream<T> inputStream, Sink<T> 
sink) {
+        StreamExecutionEnvironment executionEnvironment = 
inputStream.getExecutionEnvironment();
+        SinkTransformation<T, T> transformation =
+                new SinkTransformation<>(
+                        inputStream,
+                        sink,
+                        inputStream.getTransformation(),
+                        inputStream.getType(),
+                        "Sink",
+                        executionEnvironment.getParallelism());
         inputStream.getExecutionEnvironment().addOperator(transformation);
+        return new DataStreamSink<>(transformation);
+    }
+
+    @Internal
+    public static <T> DataStreamSink<T> forSinkV1(
+            DataStream<T> inputStream, 
org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sink) {
+        return forSink(inputStream, SinkV1Adapter.wrap(sink));
     }
 
     /** Returns the transformation that contains the actual sink operator of 
this sink. */
     @Internal
-    public Transformation<T> getTransformation() {
+    public Transformation<?> getTransformation() {

Review comment:
       Java doc for reducing the type restriction

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
##########
@@ -17,34 +17,27 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.util.function.FunctionWithException;
-
-import java.io.Serializable;
-import java.util.List;
 
 /**
  * Manages the state of a {@link 
org.apache.flink.api.connector.sink.SinkWriter}. There are only two
  * flavors: stateless handled by {@link StatelessSinkWriterStateHandler} and 
stateful handled with
  * {@link StatefulSinkWriterStateHandler}.
  *
- * @param <WriterStateT>
+ * @param <InputT> the input type
  */
-interface SinkWriterStateHandler<WriterStateT> extends Serializable {
-    /**
-     * Extracts the writer state from the {@link StateInitializationContext}. 
The state will be used
-     * to create the writer.
-     */
-    List<WriterStateT> initializeState(StateInitializationContext context) 
throws Exception;
+interface SinkWriterStateHandler<InputT> {
 
     /**
-     * Stores the state of the supplier. The supplier should only be queried 
once.
+     * Stores the state of the writer.
      *
-     * @param stateExtractor
-     * @param checkpointId
+     * @param checkpointId the checkpointId
      */
-    void snapshotState(
-            FunctionWithException<Long, List<WriterStateT>, Exception> 
stateExtractor,
-            long checkpointId)
+    void snapshotState(long checkpointId) throws Exception;
+
+    /** Creates a writer, potentially using state from {@link 
StateInitializationContext}. */
+    SinkWriter<InputT> createWriter(InitContext initContext, 
StateInitializationContext context)

Review comment:
       This is the hiding magic method that neither the class name nor the java 
doc has described. I would suggest using a more feasible class name to cover 
this functionality.
   

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/WriterOperator.java
##########
@@ -56,105 +59,64 @@
  * same parallelism or send them downstream to a {@link CommitterOperator} 
with a different
  * parallelism.
  *
- * <p>The operator may be part of a sink pipeline and is the first operator. 
There are currently two
- * ways this operator is used:
- *
- * <ul>
- *   <li>In streaming mode, there is this operator with parallelism p 
containing {@link
- *       org.apache.flink.api.connector.sink.SinkWriter} and {@link
- *       org.apache.flink.api.connector.sink.Committer} and a {@link 
CommitterOperator} containing
- *       the {@link org.apache.flink.api.connector.sink.GlobalCommitter} with 
parallelism 1.
- *   <li>In batch mode, there is this operator with parallelism p containing 
{@link
- *       org.apache.flink.api.connector.sink.SinkWriter} and a {@link 
CommitterOperator} containing
- *       the {@link org.apache.flink.api.connector.sink.Committer} and {@link
- *       org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 
1.
- * </ul>
+ * <p>The operator may be part of a sink pipeline and is the first operator.
  *
  * @param <InputT> the type of the committable
  * @param <CommT> the type of the committable (to send to downstream operators)
- * @param <WriterStateT> the type of the writer state for stateful sinks
  */
-class SinkOperator<InputT, CommT, WriterStateT> extends 
AbstractStreamOperator<byte[]>
-        implements OneInputStreamOperator<InputT, byte[]>, BoundedOneInput {
+class WriterOperator<InputT, CommT> extends 
AbstractStreamOperator<CommittableMessage<CommT>>

Review comment:
       might `SinkWriterOperator` be the better naming choice? Using 
`WriterOperator` to replace `SinkOperator` is a big change from the domain 
design perspective. We should then explain what is writer, what is sink, what 
is the relationship between them etc. Another fact is that both `Sink` and 
`SinkWriter` are used internally.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
+import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Translates Sink V1 into Sink V2. */
+@Internal
+public class SinkV1Adapter<InputT, CommT, WriterStateT, GlobalCommT> 
implements Sink<InputT> {
+
+    private final org.apache.flink.api.connector.sink.Sink<InputT, CommT, 
WriterStateT, GlobalCommT>
+            sink;
+
+    private SinkV1Adapter(
+            org.apache.flink.api.connector.sink.Sink<InputT, CommT, 
WriterStateT, GlobalCommT>
+                    sink) {
+        this.sink = sink;
+    }
+
+    public static <InputT> Sink<InputT> wrap(
+            org.apache.flink.api.connector.sink.Sink<InputT, ?, ?, ?> sink) {
+        return new SinkV1Adapter<>(sink).asSpecializedSink();
+    }
+
+    @Override
+    public SinkWriterV1Adapter<InputT, CommT, WriterStateT> 
createWriter(InitContext context)
+            throws IOException {
+        org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, 
WriterStateT> writer =
+                sink.createWriter(new InitContextAdapter(context), 
Collections.emptyList());
+        return new SinkWriterV1Adapter<>(writer);
+    }
+
+    public Sink<InputT> asSpecializedSink() {
+        boolean stateful = false;
+        boolean globalCommitter = false;
+        boolean committer = false;
+        if (sink.getWriterStateSerializer().isPresent()) {
+            stateful = true;
+        }
+        if (sink.getGlobalCommittableSerializer().isPresent()) {
+            globalCommitter = true;
+        }
+        if (sink.getCommittableSerializer().isPresent()) {
+            committer = true;
+        }
+
+        if (globalCommitter && committer && stateful) {
+            return new StatefulGlobalTwoPhaseCommittingSinkAdapter();
+        }
+        if (globalCommitter) {
+            return new GlobalCommittingSinkAdapter();
+        }
+        if (committer && stateful) {
+            return new StatefulTwoPhaseCommittingSinkAdapter();
+        }
+        if (committer) {
+            return new TwoPhaseCommittingSinkAdapter();
+        }
+        if (stateful) {
+            return new StatefulSinkAdapter();
+        }
+        return this;
+    }
+
+    private static class SinkWriterV1Adapter<InputT, CommT, WriterStateT>
+            implements StatefulSinkWriter<InputT, WriterStateT>,
+                    PrecommittingSinkWriter<InputT, CommT> {
+
+        private final org.apache.flink.api.connector.sink.SinkWriter<InputT, 
CommT, WriterStateT>
+                writer;
+        private boolean endOfInput = false;
+        private final WriterContextAdapter contextAdapter = new 
WriterContextAdapter();
+
+        public SinkWriterV1Adapter(
+                org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, 
WriterStateT>
+                        writer) {
+            this.writer = writer;
+        }
+
+        @Override
+        public void write(InputT element, Context context)
+                throws IOException, InterruptedException {
+            contextAdapter.setContext(context);
+            this.writer.write(element, contextAdapter);
+        }
+
+        @Override
+        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+            this.endOfInput = endOfInput;
+        }
+
+        @Override
+        public List<WriterStateT> snapshotState(long checkpointId) throws 
IOException {
+            return writer.snapshotState(checkpointId);
+        }
+
+        @Override
+        public Collection<CommT> prepareCommit() throws IOException, 
InterruptedException {
+            return writer.prepareCommit(endOfInput);
+        }
+
+        @Override
+        public void close() throws Exception {
+            writer.close();
+        }
+
+        @Override
+        public void writeWatermark(Watermark watermark) throws IOException, 
InterruptedException {
+            writer.writeWatermark(watermark);
+        }
+    }
+
+    private static class WriterContextAdapter implements SinkWriter.Context {
+        private org.apache.flink.api.connector.sink2.SinkWriter.Context 
context;
+
+        public void 
setContext(org.apache.flink.api.connector.sink2.SinkWriter.Context context) {
+            this.context = context;
+        }
+
+        @Override
+        public long currentWatermark() {
+            return context.currentWatermark();
+        }
+
+        @Override
+        public Long timestamp() {
+            return context.timestamp();
+        }
+    }
+
+    private static class InitContextAdapter
+            implements org.apache.flink.api.connector.sink.Sink.InitContext {
+
+        private final InitContext context;
+
+        public InitContextAdapter(InitContext context) {
+            this.context = context;
+        }
+
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return context.getUserCodeClassLoader();
+        }
+
+        @Override
+        public MailboxExecutor getMailboxExecutor() {
+            return context.getMailboxExecutor();
+        }
+
+        @Override
+        public ProcessingTimeService getProcessingTimeService() {
+            return new 
ProcessingTimeServiceAdapter(context.getProcessingTimeService());
+        }
+
+        @Override
+        public int getSubtaskId() {
+            return context.getSubtaskId();
+        }
+
+        @Override
+        public int getNumberOfParallelSubtasks() {
+            return context.getNumberOfParallelSubtasks();
+        }
+
+        @Override
+        public SinkWriterMetricGroup metricGroup() {
+            return context.metricGroup();
+        }
+
+        @Override
+        public OptionalLong getRestoredCheckpointId() {
+            return context.getRestoredCheckpointId();
+        }
+
+        public InitializationContext 
asSerializationSchemaInitializationContext() {
+            return context.asSerializationSchemaInitializationContext();
+        }
+    }
+
+    private static class ProcessingTimeCallbackAdapter implements 
ProcessingTimeCallback {
+
+        private final ProcessingTimeService.ProcessingTimeCallback 
processingTimerCallback;
+
+        public ProcessingTimeCallbackAdapter(
+                ProcessingTimeService.ProcessingTimeCallback 
processingTimerCallback) {
+            this.processingTimerCallback = processingTimerCallback;
+        }
+
+        @Override
+        public void onProcessingTime(long time) throws IOException, 
InterruptedException {
+            processingTimerCallback.onProcessingTime(time);
+        }
+    }
+
+    private static class ProcessingTimeServiceAdapter implements 
ProcessingTimeService {
+
+        private final 
org.apache.flink.api.common.operators.ProcessingTimeService
+                processingTimeService;
+
+        public ProcessingTimeServiceAdapter(
+                org.apache.flink.api.common.operators.ProcessingTimeService 
processingTimeService) {
+            this.processingTimeService = processingTimeService;
+        }
+
+        @Override
+        public long getCurrentProcessingTime() {
+            return processingTimeService.getCurrentProcessingTime();
+        }
+
+        @Override
+        public void registerProcessingTimer(
+                long time, ProcessingTimeCallback processingTimerCallback) {
+            processingTimeService.registerTimer(
+                    time, new 
ProcessingTimeCallbackAdapter(processingTimerCallback));
+        }
+    }
+
+    private static class CommitterAdapter<CommT> implements Committer<CommT> {
+
+        private final org.apache.flink.api.connector.sink.Committer<CommT> 
committer;
+
+        public 
CommitterAdapter(org.apache.flink.api.connector.sink.Committer<CommT> 
committer) {
+            this.committer = committer;
+        }
+
+        @Override
+        public void commit(Collection<CommitRequest<CommT>> commitRequests)
+                throws IOException, InterruptedException {
+            List<CommT> failed =
+                    committer.commit(
+                            commitRequests.stream()
+                                    .map(CommitRequest::getCommittable)
+                                    .collect(Collectors.toList()));
+            if (!failed.isEmpty()) {
+                Set<CommT> indexed = Collections.newSetFromMap(new 
IdentityHashMap<>());
+                indexed.addAll(failed);
+                commitRequests.stream()
+                        .filter(request -> 
indexed.contains(request.getCommittable()))
+                        .forEach(CommitRequest::retryLater);
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            committer.close();
+        }
+    }
+
+    /** Main class to simulate SinkV1 with SinkV2. */
+    @VisibleForTesting
+    @Internal
+    public class PlainSinkAdapter implements Sink<InputT> {
+        @Override
+        public SinkWriterV1Adapter<InputT, CommT, WriterStateT> 
createWriter(InitContext context)
+                throws IOException {
+            return SinkV1Adapter.this.createWriter(context);
+        }
+
+        @VisibleForTesting
+        public org.apache.flink.api.connector.sink.Sink<InputT, CommT, 
WriterStateT, GlobalCommT>
+                getSink() {
+            return sink;
+        }
+    }
+
+    private class StatefulSinkAdapter extends PlainSinkAdapter
+            implements StatefulSink<InputT, WriterStateT> {
+        @Override
+        public StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+                InitContext context, Collection<WriterStateT> recoveredState) 
throws IOException {
+            org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, 
WriterStateT> writer =
+                    sink.createWriter(
+                            new InitContextAdapter(context), new 
ArrayList<>(recoveredState));
+            return new SinkWriterV1Adapter<>(writer);
+        }
+
+        @Override
+        public SimpleVersionedSerializer<WriterStateT> 
getWriterStateSerializer() {
+            return sink.getWriterStateSerializer()
+                    .orElseThrow(
+                            () ->
+                                    new IllegalStateException(
+                                            "This method should only be called 
after adapter established that the result is non-empty."));
+        }
+    }
+
+    private class TwoPhaseCommittingSinkAdapter extends PlainSinkAdapter
+            implements TwoPhaseCommittingSink<InputT, CommT>, 
WithCompatibleState {
+        @Override
+        public Committer<CommT> createCommitter() throws IOException {
+            return new CommitterAdapter<>(
+                    sink.createCommitter().orElse(new 
SinkV1Adapter.NoopCommitter<>()));
+        }
+
+        @Override
+        public SimpleVersionedSerializer<CommT> getCommittableSerializer() {
+            return sink.getCommittableSerializer()
+                    .orElseThrow(
+                            () ->
+                                    new IllegalStateException(
+                                            "This method should only be called 
after adapter established that the result is non-empty."));
+        }
+
+        @Override
+        public Collection<String> getCompatibleWriterStateNames() {
+            return sink.getCompatibleStateNames();
+        }
+    }
+
+    private class GlobalCommittingSinkAdapter extends 
TwoPhaseCommittingSinkAdapter
+            implements WithPostCommitTopology<InputT, CommT> {
+
+        @Override
+        public void 
addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
+            StandardSinkTopologies.addGlobalCommitter(committables, 
GlobalCommitterAdapter::new);
+        }
+    }
+
+    private class StatefulTwoPhaseCommittingSinkAdapter extends 
StatefulSinkAdapter

Review comment:
       have you considered using one Adapter class to implement all of these 
interfaces so that there are less decorators and therefore improve the code 
readability? The code in `SinkV1Adapter#asSpecializedSink()` can also be 
reduced appropriately.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
##########
@@ -17,95 +17,173 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
-import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittables;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.Committables;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.OptionalLong;
 
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An operator that processes committables of a {@link 
org.apache.flink.api.connector.sink.Sink}.
  *
- * <p>The operator may be part of a sink pipeline but usually is the last 
operator. There are
- * currently two ways this operator is used:
+ * <p>The operator may be part of a sink pipeline, and it always follows 
{@link WriterOperator},
+ * which initially outputs the committables.
  *
- * <ul>
- *   <li>In streaming mode, there is a {@link SinkOperator} with parallelism p 
containing {@link
- *       org.apache.flink.api.connector.sink.SinkWriter} and {@link
- *       org.apache.flink.api.connector.sink.Committer} and this operator 
containing the {@link
- *       org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 
1.
- *   <li>In batch mode, there is a {@link SinkOperator} with parallelism p 
containing {@link
- *       org.apache.flink.api.connector.sink.SinkWriter} and this operator 
containing the {@link
- *       org.apache.flink.api.connector.sink.Committer} and {@link
- *       org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 
1.
- * </ul>
- *
- * @param <InputT> the type of the committable
- * @param <OutputT> the type of the committable to send to downstream operators
+ * @param <CommT> the type of the committable
  */
-class CommitterOperator<InputT, OutputT> extends AbstractStreamOperator<byte[]>
-        implements OneInputStreamOperator<byte[], byte[]>, BoundedOneInput {
+class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage<CommT>>
+        implements OneInputStreamOperator<CommittableMessage<CommT>, 
CommittableMessage<CommT>>,
+                BoundedOneInput {
+
+    private static final long RETRY_DELAY = 1000;
+    private final SimpleVersionedSerializer<CommT> committableSerializer;
+    private final Committer<CommT> committer;
+    private final boolean emitDownstream;
+    private CommittableCollector<CommT> committableCollector;
+    private long lastCompletedCheckpointId = -1;
 
-    private final SimpleVersionedSerializer<InputT> inputSerializer;
-    private final CommitterHandler<InputT, OutputT> committerHandler;
-    private final CommitRetrier commitRetrier;
+    /** The operator's state descriptor. */
+    private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    /** The operator's state. */
+    private ListState<CommittableCollector<CommT>> committableCollectorState;
 
     public CommitterOperator(
             ProcessingTimeService processingTimeService,
-            SimpleVersionedSerializer<InputT> inputSerializer,
-            CommitterHandler<InputT, OutputT> committerHandler) {
-        this.inputSerializer = checkNotNull(inputSerializer);
-        this.committerHandler = checkNotNull(committerHandler);
-        this.processingTimeService = processingTimeService;
-        this.commitRetrier = new CommitRetrier(processingTimeService, 
committerHandler);
+            SimpleVersionedSerializer<CommT> committableSerializer,
+            Committer<CommT> committer,
+            boolean emitDownstream) {
+        this.emitDownstream = emitDownstream;
+        this.processingTimeService = checkNotNull(processingTimeService);
+        this.committableSerializer = checkNotNull(committableSerializer);
+        this.committer = checkNotNull(committer);
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<CommittableMessage<CommT>>> output) {
+        super.setup(containingTask, config, output);
+        committableCollector = CommittableCollector.of(getRuntimeContext());
     }
 
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        committerHandler.initializeState(context);
-        // try to re-commit recovered transactions as quickly as possible
-        commitRetrier.retryWithDelay();
+        committableCollectorState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+                        new 
CommittableCollectorSerializer<>(committableSerializer));
+
+        if (context.isRestored()) {
+            committableCollectorState.get().forEach(cc -> 
committableCollector.merge(cc));
+            lastCompletedCheckpointId = 
context.getRestoredCheckpointId().getAsLong();
+            // try to re-commit recovered transactions as quickly as possible
+            commitAndEmitCheckpoints();
+        }
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
-        committerHandler.snapshotState(context);
+        // It is important to copy the collector to not mutate the state.
+        
committableCollectorState.update(Collections.singletonList(committableCollector.copy()));
     }
 
     @Override
     public void endInput() throws Exception {
-        committerHandler.endOfInput();
-        commitRetrier.retryIndefinitely();
+        Collection<? extends Committables<CommT>> endOfInputCommittables =
+                committableCollector.getEndOfInputCommittables();
+        // indicates batch
+        if (endOfInputCommittables != null) {
+            do {
+                for (Committables<CommT> endOfInputCommittable : 
endOfInputCommittables) {
+                    commitAndEmit(endOfInputCommittable, false);
+                }
+            } while (!committableCollector.isFinished());
+        }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        committerHandler.notifyCheckpointCompleted(checkpointId);
-        commitRetrier.retryWithDelay();
+        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
+        commitAndEmitCheckpoints();
+    }
+
+    private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
+        for (CheckpointCommittables<CommT> checkpoint :
+                
committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
+            // wait for all committables of the current checkpoint before 
submission

Review comment:
       the comment is a little bit confused. What does a boolean check mean 
"wait"?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
##########
@@ -131,6 +150,10 @@ protected DataStreamSink(DataStream<T> inputStream, 
Sink<T, ?, ?, ?> sink) {
      */
     @PublicEvolving
     public DataStreamSink<T> setUidHash(String uidHash) {
+        if (!(transformation instanceof LegacySinkTransformation)) {

Review comment:
       It is the transformation's responsibility to handle this exception, i.e. 
in this case `SinkTransformation`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -924,5 +924,10 @@ public long getDefaultBufferTimeout() {
         public ReadableConfig getGraphGeneratorConfig() {
             return config;
         }
+
+        @Override
+        public Collection<Integer> transform(Transformation<?> transformation) 
{

Review comment:
       Normally, Context, like the name describes, does not do actions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to