gaoyunhaii commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r796878192



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
##########
@@ -20,249 +20,258 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
-import org.apache.flink.streaming.runtime.operators.sink.SinkOperatorFactory;
-import org.apache.flink.streaming.util.graph.StreamGraphUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/** A {@link TransformationTranslator} for the {@link SinkTransformation}. */
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for 
the {@link
+ * org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ */
 @Internal
-public class SinkTransformationTranslator<InputT, CommT, WriterStateT, 
GlobalCommT>
-        implements TransformationTranslator<
-                Object, SinkTransformation<InputT, CommT, WriterStateT, 
GlobalCommT>> {
+public class SinkTransformationTranslator<Input, Output>
+        implements TransformationTranslator<Output, SinkTransformation<Input, 
Output>> {
 
-    protected static final Logger LOG = 
LoggerFactory.getLogger(SinkTransformationTranslator.class);
+    private static final String COMMITTER_NAME = "Committer";
+    private static final String WRITER_NAME = "Writer";
 
     @Override
     public Collection<Integer> translateForBatch(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> 
transformation,
-            Context context) {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), 
transformation);
-        final int parallelism = getParallelism(transformation, context);
-
-        try {
-            internalTranslate(transformation, parallelism, true, context);
-        } catch (IOException e) {
-            throw new FlinkRuntimeException(
-                    "Could not add the Committer or GlobalCommitter to the 
stream graph.", e);
-        }
-        return Collections.emptyList();
+            SinkTransformation<Input, Output> transformation, Context context) 
{
+        return translateForStreaming(transformation, context);
     }
 
     @Override
     public Collection<Integer> translateForStreaming(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> 
transformation,
-            Context context) {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), 
transformation);
-
-        final int parallelism = getParallelism(transformation, context);
-
-        try {
-            internalTranslate(transformation, parallelism, false, context);
-        } catch (IOException e) {
-            throw new FlinkRuntimeException(
-                    "Could not add the Committer or GlobalCommitter to the 
stream graph.", e);
-        }
+            SinkTransformation<Input, Output> transformation, Context context) 
{
 
+        SinkExpander<Input> expander =
+                new SinkExpander<>(
+                        transformation.getInputStream(),
+                        transformation.getSink(),
+                        transformation,
+                        context);
+        expander.expand();
+        // Remove the SinkTransformation to not translate it again
+        transformation
+                .getInputStream()
+                .getExecutionEnvironment()
+                .getTransformations()
+                .remove(transformation);
         return Collections.emptyList();
     }
 
     /**
-     * Add the sink operators to the stream graph.
-     *
-     * @param sinkTransformation The sink transformation that committer and 
global committer belongs
-     *     to.
-     * @param writerParallelism The parallelism of the writer operator.
-     * @param batch Specifies if this sink is executed in batch mode.
+     * Expands the FLIP-143 Sink to a subtopology. Each part of the topology 
is created after the
+     * previous part of the topology has been completely configured by the 
user. For example, if a
+     * user explicitly sets the parallelism of the sink, each part of the 
subtopology can rely on
+     * the input having that parallelism.
      */
-    private void internalTranslate(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> 
sinkTransformation,
-            int writerParallelism,
-            boolean batch,
-            Context context)
-            throws IOException {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), 
sinkTransformation);
-
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = 
sinkTransformation.getSink();
-        boolean needsCommitterOperator =
-                batch && sink.getCommittableSerializer().isPresent()
-                        || sink.getGlobalCommittableSerializer().isPresent();
-        final int writerId =
-                addWriterAndCommitter(
-                        sinkTransformation,
-                        writerParallelism,
-                        batch,
-                        needsCommitterOperator,
-                        context);
-
-        if (needsCommitterOperator) {
-            addGlobalCommitter(writerId, sinkTransformation, batch, context);
+    private static class SinkExpander<T> {
+        private final SinkTransformation<T, ?> transformation;
+        private final Sink<T> sink;
+        private final Context context;
+        private final DataStream<T> inputStream;
+        private final StreamExecutionEnvironment executionEnvironment;
+        private boolean expanded;
+
+        public SinkExpander(
+                DataStream<T> inputStream,
+                Sink<T> sink,
+                SinkTransformation<T, ?> transformation,
+                Context context) {
+            this.inputStream = inputStream;
+            this.executionEnvironment = inputStream.getExecutionEnvironment();
+            this.transformation = transformation;
+            this.sink = sink;
+            this.context = context;
         }
-    }
-
-    /**
-     * Add a sink writer node to the stream graph.
-     *
-     * @param sinkTransformation The transformation that the writer belongs to
-     * @param parallelism The parallelism of the writer
-     * @param batch Specifies if this sink is executed in batch mode.
-     * @param shouldEmit Specifies whether the write should emit committables.
-     * @return The stream node id of the writer
-     */
-    private int addWriterAndCommitter(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> 
sinkTransformation,
-            int parallelism,
-            boolean batch,
-            boolean shouldEmit,
-            Context context) {
 
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = 
sinkTransformation.getSink();
-        checkState(sinkTransformation.getInputs().size() == 1);
-        @SuppressWarnings("unchecked")
-        final Transformation<InputT> input =
-                (Transformation<InputT>) sinkTransformation.getInputs().get(0);
-        final TypeInformation<InputT> inputTypeInfo = input.getOutputType();
-
-        final StreamOperatorFactory<byte[]> factory =
-                new SinkOperatorFactory<>(sink, batch, shouldEmit);
-
-        final ChainingStrategy chainingStrategy = 
sinkTransformation.getChainingStrategy();
-
-        if (chainingStrategy != null) {
-            factory.setChainingStrategy(chainingStrategy);
+        private void expand() {
+            if (expanded) {
+                // may be called twice for multi-staged application, make sure 
to expand only once
+                return;
+            }
+            final int sizeBefore = 
executionEnvironment.getTransformations().size();
+            expanded = true;
+
+            DataStream<T> prewritten = inputStream;
+            if (sink instanceof WithPreWriteTopology) {
+                prewritten =
+                        adjustTransformations(
+                                prewritten, ((WithPreWriteTopology<T>) 
sink)::addPreWriteTopology);
+            }
+
+            if (sink instanceof TwoPhaseCommittingSink) {
+                addCommittingTopology(sink, prewritten);
+            } else {
+                adjustTransformations(
+                        prewritten,
+                        input ->
+                                input.transform(
+                                        WRITER_NAME,
+                                        CommittableMessageTypeInfo.noOutput(),
+                                        new 
SinkWriterOperatorFactory<>(sink)));
+            }
+            final List<Transformation<?>> sinkTransformations =
+                    executionEnvironment
+                            .getTransformations()
+                            .subList(sizeBefore, 
executionEnvironment.getTransformations().size());
+            sinkTransformations.forEach(context::transform);
         }
 
-        final String format = batch && shouldEmit ? "Sink %s Writer" : "Sink 
%s";
-
-        return addOperatorToStreamGraph(
-                factory,
-                context.getStreamNodeIds(input),
-                inputTypeInfo,
-                TypeInformation.of(byte[].class),
-                String.format(format, sinkTransformation.getName()),
-                sinkTransformation.getUid(),
-                parallelism,
-                sinkTransformation.getMaxParallelism(),
-                sinkTransformation,
-                context);
-    }
-
-    /**
-     * Try to add a sink global committer to the stream graph.
-     *
-     * @param inputId The global committer's input stream node id.
-     * @param sinkTransformation The transformation that the global committer 
belongs to.
-     * @param batch Specifies if this sink is executed in batch mode.
-     */
-    private void addGlobalCommitter(
-            int inputId,
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> 
sinkTransformation,
-            boolean batch,
-            Context context) {
-
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = 
sinkTransformation.getSink();
-
-        final String format = batch ? "Sink %s Committer" : "Sink %s Global 
Committer";
-
-        addOperatorToStreamGraph(
-                new CommitterOperatorFactory<>(sink, batch),
-                Collections.singletonList(inputId),
-                TypeInformation.of(byte[].class),
-                null,
-                String.format(format, sinkTransformation.getName()),
-                sinkTransformation.getUid() == null
-                        ? null
-                        : String.format(format, sinkTransformation.getUid()),
-                1,
-                1,
-                sinkTransformation,
-                context);
-    }
-
-    private int getParallelism(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> 
sinkTransformation,
-            Context context) {
-        return sinkTransformation.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT
-                ? sinkTransformation.getParallelism()
-                : 
context.getStreamGraph().getExecutionConfig().getParallelism();
-    }
-
-    /**
-     * Add a operator to the {@link StreamGraph}.
-     *
-     * @param operatorFactory The operator factory
-     * @param inputs A collection of upstream stream node ids.
-     * @param inTypeInfo The input type information of the operator
-     * @param outTypInfo The output type information of the operator
-     * @param name The name of the operator.
-     * @param uid The uid of the operator.
-     * @param parallelism The parallelism of the operator
-     * @param maxParallelism The max parallelism of the operator
-     * @param sinkTransformation The sink transformation which the operator 
belongs to
-     * @return The stream node id of the operator
-     */
-    private <IN, OUT> int addOperatorToStreamGraph(
-            StreamOperatorFactory<OUT> operatorFactory,
-            Collection<Integer> inputs,
-            TypeInformation<IN> inTypeInfo,
-            TypeInformation<OUT> outTypInfo,
-            String name,
-            @Nullable String uid,
-            int parallelism,
-            int maxParallelism,
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> 
sinkTransformation,
-            Context context) {
-        final StreamGraph streamGraph = context.getStreamGraph();
-        final String slotSharingGroup = context.getSlotSharingGroup();
-        final int transformationId = Transformation.getNewNodeId();
-
-        streamGraph.addOperator(
-                transformationId,
-                slotSharingGroup,
-                sinkTransformation.getCoLocationGroupKey(),
-                operatorFactory,
-                inTypeInfo,
-                outTypInfo,
-                name);
+        private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> 
inputStream) {
+            TwoPhaseCommittingSink<T, CommT> committingSink =
+                    (TwoPhaseCommittingSink<T, CommT>) sink;
+            TypeInformation<CommittableMessage<CommT>> typeInformation =
+                    
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
+
+            DataStream<CommittableMessage<CommT>> written =
+                    adjustTransformations(
+                            inputStream,
+                            input ->
+                                    input.transform(
+                                            WRITER_NAME,
+                                            typeInformation,
+                                            new 
SinkWriterOperatorFactory<>(sink)));
+
+            DataStream<CommittableMessage<CommT>> precommitted = 
addFailOverRegion(written);
+
+            if (sink instanceof WithPreCommitTopology) {
+                precommitted =
+                        adjustTransformations(
+                                precommitted,
+                                ((WithPreCommitTopology<T, CommT>) 
sink)::addPreCommitTopology);
+            }
+            DataStream<CommittableMessage<CommT>> committed =
+                    adjustTransformations(
+                            precommitted,
+                            pc ->
+                                    pc.transform(
+                                            COMMITTER_NAME,
+                                            typeInformation,
+                                            new 
CommitterOperatorFactory<>(committingSink)));
+            if (sink instanceof WithPostCommitTopology) {
+                DataStream<CommittableMessage<CommT>> postcommitted = 
addFailOverRegion(committed);
+                adjustTransformations(
+                        postcommitted,
+                        pc -> {
+                            ((WithPostCommitTopology<T, CommT>) 
sink).addPostCommitTopology(pc);
+                            return null;
+                        });
+            }
+        }
 
-        streamGraph.setParallelism(transformationId, parallelism);
-        streamGraph.setMaxParallelism(transformationId, maxParallelism);
+        /**
+         * Adds a batch exchange that materializes the output first. This is a 
no-op in STREAMING.
+         */
+        private <I> DataStream<I> addFailOverRegion(DataStream<I> input) {
+            return new DataStream<>(
+                    executionEnvironment,
+                    new PartitionTransformation<>(

Review comment:
       We might need to consider the cases like pre-commit operators starts 
with an operator with different parallelism. Perhaps we need to add a new 
identity map here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to