JingGe commented on a change in pull request #18428: URL: https://github.com/apache/flink/pull/18428#discussion_r792946618
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter; +import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; +import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.util.UserCodeClassLoader; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.OptionalLong; +import java.util.Set; +import java.util.stream.Collectors; + +/** Translates Sink V1 into Sink V2. */ +@Internal +public class SinkV1Adapter<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT> { + + private final org.apache.flink.api.connector.sink.Sink<InputT, CommT, WriterStateT, GlobalCommT> + sink; + + private SinkV1Adapter( + org.apache.flink.api.connector.sink.Sink<InputT, CommT, WriterStateT, GlobalCommT> + sink) { + this.sink = sink; + } + + public static <InputT> Sink<InputT> wrap( + org.apache.flink.api.connector.sink.Sink<InputT, ?, ?, ?> sink) { + return new SinkV1Adapter<>(sink).asSpecializedSink(); + } + + @Override + public SinkWriterV1Adapter<InputT, CommT, WriterStateT> createWriter(InitContext context) + throws IOException { + org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> writer = + sink.createWriter(new InitContextAdapter(context), Collections.emptyList()); + return new SinkWriterV1Adapter<>(writer); + } + + public Sink<InputT> asSpecializedSink() { + boolean stateful = false; + boolean globalCommitter = false; + boolean committer = false; + if (sink.getWriterStateSerializer().isPresent()) { + stateful = true; + } + if (sink.getGlobalCommittableSerializer().isPresent()) { + globalCommitter = true; + } + if (sink.getCommittableSerializer().isPresent()) { + committer = true; + } + + if (globalCommitter && committer && stateful) { + return new StatefulGlobalTwoPhaseCommittingSinkAdapter(); + } + if (globalCommitter) { + return new GlobalCommittingSinkAdapter(); + } + if (committer && stateful) { + return new StatefulTwoPhaseCommittingSinkAdapter(); + } + if (committer) { + return new TwoPhaseCommittingSinkAdapter(); + } + if (stateful) { + return new StatefulSinkAdapter(); + } + return this; + } + + private static class SinkWriterV1Adapter<InputT, CommT, WriterStateT> + implements StatefulSinkWriter<InputT, WriterStateT>, + PrecommittingSinkWriter<InputT, CommT> { + + private final org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> + writer; + private boolean endOfInput = false; + private final WriterContextAdapter contextAdapter = new WriterContextAdapter(); + + public SinkWriterV1Adapter( + org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> + writer) { + this.writer = writer; + } + + @Override + public void write(InputT element, Context context) + throws IOException, InterruptedException { + contextAdapter.setContext(context); + this.writer.write(element, contextAdapter); + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + this.endOfInput = endOfInput; + } + + @Override + public List<WriterStateT> snapshotState(long checkpointId) throws IOException { + return writer.snapshotState(checkpointId); + } + + @Override + public Collection<CommT> prepareCommit() throws IOException, InterruptedException { + return writer.prepareCommit(endOfInput); + } + + @Override + public void close() throws Exception { + writer.close(); + } + + @Override + public void writeWatermark(Watermark watermark) throws IOException, InterruptedException { + writer.writeWatermark(watermark); + } + } + + private static class WriterContextAdapter implements SinkWriter.Context { + private org.apache.flink.api.connector.sink2.SinkWriter.Context context; + + public void setContext(org.apache.flink.api.connector.sink2.SinkWriter.Context context) { + this.context = context; + } + + @Override + public long currentWatermark() { + return context.currentWatermark(); + } + + @Override + public Long timestamp() { + return context.timestamp(); + } + } + + private static class InitContextAdapter + implements org.apache.flink.api.connector.sink.Sink.InitContext { + + private final InitContext context; + + public InitContextAdapter(InitContext context) { + this.context = context; + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return context.getUserCodeClassLoader(); + } + + @Override + public MailboxExecutor getMailboxExecutor() { + return context.getMailboxExecutor(); + } + + @Override + public ProcessingTimeService getProcessingTimeService() { + return new ProcessingTimeServiceAdapter(context.getProcessingTimeService()); + } + + @Override + public int getSubtaskId() { + return context.getSubtaskId(); + } + + @Override + public int getNumberOfParallelSubtasks() { + return context.getNumberOfParallelSubtasks(); + } + + @Override + public SinkWriterMetricGroup metricGroup() { + return context.metricGroup(); + } + + @Override + public OptionalLong getRestoredCheckpointId() { + return context.getRestoredCheckpointId(); + } + + public InitializationContext asSerializationSchemaInitializationContext() { + return context.asSerializationSchemaInitializationContext(); + } + } + + private static class ProcessingTimeCallbackAdapter implements ProcessingTimeCallback { + + private final ProcessingTimeService.ProcessingTimeCallback processingTimerCallback; + + public ProcessingTimeCallbackAdapter( + ProcessingTimeService.ProcessingTimeCallback processingTimerCallback) { + this.processingTimerCallback = processingTimerCallback; + } + + @Override + public void onProcessingTime(long time) throws IOException, InterruptedException { + processingTimerCallback.onProcessingTime(time); + } + } + + private static class ProcessingTimeServiceAdapter implements ProcessingTimeService { + + private final org.apache.flink.api.common.operators.ProcessingTimeService + processingTimeService; + + public ProcessingTimeServiceAdapter( + org.apache.flink.api.common.operators.ProcessingTimeService processingTimeService) { + this.processingTimeService = processingTimeService; + } + + @Override + public long getCurrentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } + + @Override + public void registerProcessingTimer( + long time, ProcessingTimeCallback processingTimerCallback) { + processingTimeService.registerTimer( + time, new ProcessingTimeCallbackAdapter(processingTimerCallback)); + } + } + + private static class CommitterAdapter<CommT> implements Committer<CommT> { + + private final org.apache.flink.api.connector.sink.Committer<CommT> committer; + + public CommitterAdapter(org.apache.flink.api.connector.sink.Committer<CommT> committer) { + this.committer = committer; + } + + @Override + public void commit(Collection<CommitRequest<CommT>> commitRequests) + throws IOException, InterruptedException { + List<CommT> failed = + committer.commit( + commitRequests.stream() + .map(CommitRequest::getCommittable) + .collect(Collectors.toList())); + if (!failed.isEmpty()) { + Set<CommT> indexed = Collections.newSetFromMap(new IdentityHashMap<>()); + indexed.addAll(failed); + commitRequests.stream() + .filter(request -> indexed.contains(request.getCommittable())) + .forEach(CommitRequest::retryLater); + } + } + + @Override + public void close() throws Exception { + committer.close(); + } + } + + /** Main class to simulate SinkV1 with SinkV2. */ + @VisibleForTesting + @Internal + public class PlainSinkAdapter implements Sink<InputT> { + @Override + public SinkWriterV1Adapter<InputT, CommT, WriterStateT> createWriter(InitContext context) + throws IOException { + return SinkV1Adapter.this.createWriter(context); + } + + @VisibleForTesting Review comment: redundant? There is one at class level. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java ########## @@ -40,32 +45,46 @@ private final PhysicalTransformation<T> transformation; - @SuppressWarnings("unchecked") - protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) { - this.transformation = - (PhysicalTransformation<T>) - new LegacySinkTransformation<>( - inputStream.getTransformation(), - "Unnamed", - operator, - inputStream.getExecutionEnvironment().getParallelism()); - } - - @SuppressWarnings("unchecked") - protected DataStreamSink(DataStream<T> inputStream, Sink<T, ?, ?, ?> sink) { - transformation = - (PhysicalTransformation<T>) - new SinkTransformation<>( - inputStream.getTransformation(), - sink, - "Unnamed", - inputStream.getExecutionEnvironment().getParallelism()); + protected DataStreamSink(PhysicalTransformation<T> transformation) { + this.transformation = checkNotNull(transformation); + } + + static <T> DataStreamSink<T> forSinkFunction( + DataStream<T> inputStream, SinkFunction<T> sinkFunction) { Review comment: Why replace the operator with the sinkFunction, which is planned to deprecate? Which means that the modified code will be changed again after two releases when the sinkFunction has to be deleted. Will the `StreamSink` be deprecated together with `SinkFunction`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/WriterOperator.java ########## @@ -56,105 +59,64 @@ * same parallelism or send them downstream to a {@link CommitterOperator} with a different * parallelism. * - * <p>The operator may be part of a sink pipeline and is the first operator. There are currently two - * ways this operator is used: - * - * <ul> - * <li>In streaming mode, there is this operator with parallelism p containing {@link - * org.apache.flink.api.connector.sink.SinkWriter} and {@link - * org.apache.flink.api.connector.sink.Committer} and a {@link CommitterOperator} containing - * the {@link org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 1. - * <li>In batch mode, there is this operator with parallelism p containing {@link - * org.apache.flink.api.connector.sink.SinkWriter} and a {@link CommitterOperator} containing - * the {@link org.apache.flink.api.connector.sink.Committer} and {@link - * org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 1. - * </ul> + * <p>The operator may be part of a sink pipeline and is the first operator. Review comment: I think it still make sense to add more info about relationship between these operators, i.e. WriterOperator and CommitterOperator and the SinkWriter, Committer, GlobalCommitter. "may be" - does it mean there could be a sink pipeline without a `WriterOperator` ? Could we describe such scenario a little bit here to let the user understand it better. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java ########## @@ -70,13 +73,15 @@ List<Integer> resultIds = new ArrayList<>(); + StreamExchangeMode exchangeMode = transformation.getExchangeMode(); + if (!supportsBatchExchange && exchangeMode == StreamExchangeMode.BATCH) { + exchangeMode = StreamExchangeMode.UNDEFINED; Review comment: hmm, It would be better to add some description to explain: 1. why `translateForStreamingInternal(...)` is possible to be called with a transformation whose exchangeMode is `StreamExchangeMode.BATCH`? 2. after this modification, a BATCH exchangeMode transformation could become a PIPELINED. Will there be any potential issue? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java ########## @@ -40,32 +45,46 @@ private final PhysicalTransformation<T> transformation; - @SuppressWarnings("unchecked") - protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) { - this.transformation = - (PhysicalTransformation<T>) - new LegacySinkTransformation<>( - inputStream.getTransformation(), - "Unnamed", - operator, - inputStream.getExecutionEnvironment().getParallelism()); - } - - @SuppressWarnings("unchecked") - protected DataStreamSink(DataStream<T> inputStream, Sink<T, ?, ?, ?> sink) { - transformation = - (PhysicalTransformation<T>) - new SinkTransformation<>( - inputStream.getTransformation(), - sink, - "Unnamed", - inputStream.getExecutionEnvironment().getParallelism()); + protected DataStreamSink(PhysicalTransformation<T> transformation) { + this.transformation = checkNotNull(transformation); + } + + static <T> DataStreamSink<T> forSinkFunction( + DataStream<T> inputStream, SinkFunction<T> sinkFunction) { + StreamSink<T> sinkOperator = new StreamSink<>(sinkFunction); + PhysicalTransformation<T> transformation = + new LegacySinkTransformation<>( + inputStream.getTransformation(), + "Unnamed", + sinkOperator, + inputStream.getExecutionEnvironment().getParallelism()); + inputStream.getExecutionEnvironment().addOperator(transformation); + return new DataStreamSink<>(transformation); + } + + static <T> DataStreamSink<T> forSink(DataStream<T> inputStream, Sink<T> sink) { + StreamExecutionEnvironment executionEnvironment = inputStream.getExecutionEnvironment(); + SinkTransformation<T, T> transformation = + new SinkTransformation<>( + inputStream, + sink, + inputStream.getTransformation(), + inputStream.getType(), + "Sink", + executionEnvironment.getParallelism()); inputStream.getExecutionEnvironment().addOperator(transformation); + return new DataStreamSink<>(transformation); + } + + @Internal + public static <T> DataStreamSink<T> forSinkV1( + DataStream<T> inputStream, org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sink) { + return forSink(inputStream, SinkV1Adapter.wrap(sink)); } /** Returns the transformation that contains the actual sink operator of this sink. */ @Internal - public Transformation<T> getTransformation() { + public Transformation<?> getTransformation() { Review comment: Java doc for reducing the type restriction ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java ########## @@ -17,34 +17,27 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.util.function.FunctionWithException; - -import java.io.Serializable; -import java.util.List; /** * Manages the state of a {@link org.apache.flink.api.connector.sink.SinkWriter}. There are only two * flavors: stateless handled by {@link StatelessSinkWriterStateHandler} and stateful handled with * {@link StatefulSinkWriterStateHandler}. * - * @param <WriterStateT> + * @param <InputT> the input type */ -interface SinkWriterStateHandler<WriterStateT> extends Serializable { - /** - * Extracts the writer state from the {@link StateInitializationContext}. The state will be used - * to create the writer. - */ - List<WriterStateT> initializeState(StateInitializationContext context) throws Exception; +interface SinkWriterStateHandler<InputT> { /** - * Stores the state of the supplier. The supplier should only be queried once. + * Stores the state of the writer. * - * @param stateExtractor - * @param checkpointId + * @param checkpointId the checkpointId */ - void snapshotState( - FunctionWithException<Long, List<WriterStateT>, Exception> stateExtractor, - long checkpointId) + void snapshotState(long checkpointId) throws Exception; + + /** Creates a writer, potentially using state from {@link StateInitializationContext}. */ + SinkWriter<InputT> createWriter(InitContext initContext, StateInitializationContext context) Review comment: This is the hiding magic method that neither the class name nor the java doc has described. I would suggest using a more feasible class name to cover this functionality. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/WriterOperator.java ########## @@ -56,105 +59,64 @@ * same parallelism or send them downstream to a {@link CommitterOperator} with a different * parallelism. * - * <p>The operator may be part of a sink pipeline and is the first operator. There are currently two - * ways this operator is used: - * - * <ul> - * <li>In streaming mode, there is this operator with parallelism p containing {@link - * org.apache.flink.api.connector.sink.SinkWriter} and {@link - * org.apache.flink.api.connector.sink.Committer} and a {@link CommitterOperator} containing - * the {@link org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 1. - * <li>In batch mode, there is this operator with parallelism p containing {@link - * org.apache.flink.api.connector.sink.SinkWriter} and a {@link CommitterOperator} containing - * the {@link org.apache.flink.api.connector.sink.Committer} and {@link - * org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 1. - * </ul> + * <p>The operator may be part of a sink pipeline and is the first operator. * * @param <InputT> the type of the committable * @param <CommT> the type of the committable (to send to downstream operators) - * @param <WriterStateT> the type of the writer state for stateful sinks */ -class SinkOperator<InputT, CommT, WriterStateT> extends AbstractStreamOperator<byte[]> - implements OneInputStreamOperator<InputT, byte[]>, BoundedOneInput { +class WriterOperator<InputT, CommT> extends AbstractStreamOperator<CommittableMessage<CommT>> Review comment: might `SinkWriterOperator` be the better naming choice? Using `WriterOperator` to replace `SinkOperator` is a big change from the domain design perspective. We should then explain what is writer, what is sink, what is the relationship between them etc. Another fact is that both `Sink` and `SinkWriter` are used internally. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter; +import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; +import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.util.UserCodeClassLoader; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.OptionalLong; +import java.util.Set; +import java.util.stream.Collectors; + +/** Translates Sink V1 into Sink V2. */ +@Internal +public class SinkV1Adapter<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT> { + + private final org.apache.flink.api.connector.sink.Sink<InputT, CommT, WriterStateT, GlobalCommT> + sink; + + private SinkV1Adapter( + org.apache.flink.api.connector.sink.Sink<InputT, CommT, WriterStateT, GlobalCommT> + sink) { + this.sink = sink; + } + + public static <InputT> Sink<InputT> wrap( + org.apache.flink.api.connector.sink.Sink<InputT, ?, ?, ?> sink) { + return new SinkV1Adapter<>(sink).asSpecializedSink(); + } + + @Override + public SinkWriterV1Adapter<InputT, CommT, WriterStateT> createWriter(InitContext context) + throws IOException { + org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> writer = + sink.createWriter(new InitContextAdapter(context), Collections.emptyList()); + return new SinkWriterV1Adapter<>(writer); + } + + public Sink<InputT> asSpecializedSink() { + boolean stateful = false; + boolean globalCommitter = false; + boolean committer = false; + if (sink.getWriterStateSerializer().isPresent()) { + stateful = true; + } + if (sink.getGlobalCommittableSerializer().isPresent()) { + globalCommitter = true; + } + if (sink.getCommittableSerializer().isPresent()) { + committer = true; + } + + if (globalCommitter && committer && stateful) { + return new StatefulGlobalTwoPhaseCommittingSinkAdapter(); + } + if (globalCommitter) { + return new GlobalCommittingSinkAdapter(); + } + if (committer && stateful) { + return new StatefulTwoPhaseCommittingSinkAdapter(); + } + if (committer) { + return new TwoPhaseCommittingSinkAdapter(); + } + if (stateful) { + return new StatefulSinkAdapter(); + } + return this; + } + + private static class SinkWriterV1Adapter<InputT, CommT, WriterStateT> + implements StatefulSinkWriter<InputT, WriterStateT>, + PrecommittingSinkWriter<InputT, CommT> { + + private final org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> + writer; + private boolean endOfInput = false; + private final WriterContextAdapter contextAdapter = new WriterContextAdapter(); + + public SinkWriterV1Adapter( + org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> + writer) { + this.writer = writer; + } + + @Override + public void write(InputT element, Context context) + throws IOException, InterruptedException { + contextAdapter.setContext(context); + this.writer.write(element, contextAdapter); + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + this.endOfInput = endOfInput; + } + + @Override + public List<WriterStateT> snapshotState(long checkpointId) throws IOException { + return writer.snapshotState(checkpointId); + } + + @Override + public Collection<CommT> prepareCommit() throws IOException, InterruptedException { + return writer.prepareCommit(endOfInput); + } + + @Override + public void close() throws Exception { + writer.close(); + } + + @Override + public void writeWatermark(Watermark watermark) throws IOException, InterruptedException { + writer.writeWatermark(watermark); + } + } + + private static class WriterContextAdapter implements SinkWriter.Context { + private org.apache.flink.api.connector.sink2.SinkWriter.Context context; + + public void setContext(org.apache.flink.api.connector.sink2.SinkWriter.Context context) { + this.context = context; + } + + @Override + public long currentWatermark() { + return context.currentWatermark(); + } + + @Override + public Long timestamp() { + return context.timestamp(); + } + } + + private static class InitContextAdapter + implements org.apache.flink.api.connector.sink.Sink.InitContext { + + private final InitContext context; + + public InitContextAdapter(InitContext context) { + this.context = context; + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return context.getUserCodeClassLoader(); + } + + @Override + public MailboxExecutor getMailboxExecutor() { + return context.getMailboxExecutor(); + } + + @Override + public ProcessingTimeService getProcessingTimeService() { + return new ProcessingTimeServiceAdapter(context.getProcessingTimeService()); + } + + @Override + public int getSubtaskId() { + return context.getSubtaskId(); + } + + @Override + public int getNumberOfParallelSubtasks() { + return context.getNumberOfParallelSubtasks(); + } + + @Override + public SinkWriterMetricGroup metricGroup() { + return context.metricGroup(); + } + + @Override + public OptionalLong getRestoredCheckpointId() { + return context.getRestoredCheckpointId(); + } + + public InitializationContext asSerializationSchemaInitializationContext() { + return context.asSerializationSchemaInitializationContext(); + } + } + + private static class ProcessingTimeCallbackAdapter implements ProcessingTimeCallback { + + private final ProcessingTimeService.ProcessingTimeCallback processingTimerCallback; + + public ProcessingTimeCallbackAdapter( + ProcessingTimeService.ProcessingTimeCallback processingTimerCallback) { + this.processingTimerCallback = processingTimerCallback; + } + + @Override + public void onProcessingTime(long time) throws IOException, InterruptedException { + processingTimerCallback.onProcessingTime(time); + } + } + + private static class ProcessingTimeServiceAdapter implements ProcessingTimeService { + + private final org.apache.flink.api.common.operators.ProcessingTimeService + processingTimeService; + + public ProcessingTimeServiceAdapter( + org.apache.flink.api.common.operators.ProcessingTimeService processingTimeService) { + this.processingTimeService = processingTimeService; + } + + @Override + public long getCurrentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } + + @Override + public void registerProcessingTimer( + long time, ProcessingTimeCallback processingTimerCallback) { + processingTimeService.registerTimer( + time, new ProcessingTimeCallbackAdapter(processingTimerCallback)); + } + } + + private static class CommitterAdapter<CommT> implements Committer<CommT> { + + private final org.apache.flink.api.connector.sink.Committer<CommT> committer; + + public CommitterAdapter(org.apache.flink.api.connector.sink.Committer<CommT> committer) { + this.committer = committer; + } + + @Override + public void commit(Collection<CommitRequest<CommT>> commitRequests) + throws IOException, InterruptedException { + List<CommT> failed = + committer.commit( + commitRequests.stream() + .map(CommitRequest::getCommittable) + .collect(Collectors.toList())); + if (!failed.isEmpty()) { + Set<CommT> indexed = Collections.newSetFromMap(new IdentityHashMap<>()); + indexed.addAll(failed); + commitRequests.stream() + .filter(request -> indexed.contains(request.getCommittable())) + .forEach(CommitRequest::retryLater); + } + } + + @Override + public void close() throws Exception { + committer.close(); + } + } + + /** Main class to simulate SinkV1 with SinkV2. */ + @VisibleForTesting + @Internal + public class PlainSinkAdapter implements Sink<InputT> { + @Override + public SinkWriterV1Adapter<InputT, CommT, WriterStateT> createWriter(InitContext context) + throws IOException { + return SinkV1Adapter.this.createWriter(context); + } + + @VisibleForTesting + public org.apache.flink.api.connector.sink.Sink<InputT, CommT, WriterStateT, GlobalCommT> + getSink() { + return sink; + } + } + + private class StatefulSinkAdapter extends PlainSinkAdapter + implements StatefulSink<InputT, WriterStateT> { + @Override + public StatefulSinkWriter<InputT, WriterStateT> restoreWriter( + InitContext context, Collection<WriterStateT> recoveredState) throws IOException { + org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> writer = + sink.createWriter( + new InitContextAdapter(context), new ArrayList<>(recoveredState)); + return new SinkWriterV1Adapter<>(writer); + } + + @Override + public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() { + return sink.getWriterStateSerializer() + .orElseThrow( + () -> + new IllegalStateException( + "This method should only be called after adapter established that the result is non-empty.")); + } + } + + private class TwoPhaseCommittingSinkAdapter extends PlainSinkAdapter + implements TwoPhaseCommittingSink<InputT, CommT>, WithCompatibleState { + @Override + public Committer<CommT> createCommitter() throws IOException { + return new CommitterAdapter<>( + sink.createCommitter().orElse(new SinkV1Adapter.NoopCommitter<>())); + } + + @Override + public SimpleVersionedSerializer<CommT> getCommittableSerializer() { + return sink.getCommittableSerializer() + .orElseThrow( + () -> + new IllegalStateException( + "This method should only be called after adapter established that the result is non-empty.")); + } + + @Override + public Collection<String> getCompatibleWriterStateNames() { + return sink.getCompatibleStateNames(); + } + } + + private class GlobalCommittingSinkAdapter extends TwoPhaseCommittingSinkAdapter + implements WithPostCommitTopology<InputT, CommT> { + + @Override + public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) { + StandardSinkTopologies.addGlobalCommitter(committables, GlobalCommitterAdapter::new); + } + } + + private class StatefulTwoPhaseCommittingSinkAdapter extends StatefulSinkAdapter Review comment: have you considered using one Adapter class to implement all of these interfaces so that there are less decorators and therefore improve the code readability? The code in `SinkV1Adapter#asSpecializedSink()` can also be reduced appropriately. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java ########## @@ -17,95 +17,173 @@ package org.apache.flink.streaming.runtime.operators.sink; -import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittables; +import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector; +import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer; +import org.apache.flink.streaming.runtime.operators.sink.committables.Committables; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import java.io.IOException; +import java.util.Collection; import java.util.Collections; +import java.util.OptionalLong; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; /** * An operator that processes committables of a {@link org.apache.flink.api.connector.sink.Sink}. * - * <p>The operator may be part of a sink pipeline but usually is the last operator. There are - * currently two ways this operator is used: + * <p>The operator may be part of a sink pipeline, and it always follows {@link WriterOperator}, + * which initially outputs the committables. * - * <ul> - * <li>In streaming mode, there is a {@link SinkOperator} with parallelism p containing {@link - * org.apache.flink.api.connector.sink.SinkWriter} and {@link - * org.apache.flink.api.connector.sink.Committer} and this operator containing the {@link - * org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 1. - * <li>In batch mode, there is a {@link SinkOperator} with parallelism p containing {@link - * org.apache.flink.api.connector.sink.SinkWriter} and this operator containing the {@link - * org.apache.flink.api.connector.sink.Committer} and {@link - * org.apache.flink.api.connector.sink.GlobalCommitter} with parallelism 1. - * </ul> - * - * @param <InputT> the type of the committable - * @param <OutputT> the type of the committable to send to downstream operators + * @param <CommT> the type of the committable */ -class CommitterOperator<InputT, OutputT> extends AbstractStreamOperator<byte[]> - implements OneInputStreamOperator<byte[], byte[]>, BoundedOneInput { +class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage<CommT>> + implements OneInputStreamOperator<CommittableMessage<CommT>, CommittableMessage<CommT>>, + BoundedOneInput { + + private static final long RETRY_DELAY = 1000; + private final SimpleVersionedSerializer<CommT> committableSerializer; + private final Committer<CommT> committer; + private final boolean emitDownstream; + private CommittableCollector<CommT> committableCollector; + private long lastCompletedCheckpointId = -1; - private final SimpleVersionedSerializer<InputT> inputSerializer; - private final CommitterHandler<InputT, OutputT> committerHandler; - private final CommitRetrier commitRetrier; + /** The operator's state descriptor. */ + private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = + new ListStateDescriptor<>( + "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE); + + /** The operator's state. */ + private ListState<CommittableCollector<CommT>> committableCollectorState; public CommitterOperator( ProcessingTimeService processingTimeService, - SimpleVersionedSerializer<InputT> inputSerializer, - CommitterHandler<InputT, OutputT> committerHandler) { - this.inputSerializer = checkNotNull(inputSerializer); - this.committerHandler = checkNotNull(committerHandler); - this.processingTimeService = processingTimeService; - this.commitRetrier = new CommitRetrier(processingTimeService, committerHandler); + SimpleVersionedSerializer<CommT> committableSerializer, + Committer<CommT> committer, + boolean emitDownstream) { + this.emitDownstream = emitDownstream; + this.processingTimeService = checkNotNull(processingTimeService); + this.committableSerializer = checkNotNull(committableSerializer); + this.committer = checkNotNull(committer); + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<CommittableMessage<CommT>>> output) { + super.setup(containingTask, config, output); + committableCollector = CommittableCollector.of(getRuntimeContext()); } @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - committerHandler.initializeState(context); - // try to re-commit recovered transactions as quickly as possible - commitRetrier.retryWithDelay(); + committableCollectorState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(STREAMING_COMMITTER_RAW_STATES_DESC), + new CommittableCollectorSerializer<>(committableSerializer)); + + if (context.isRestored()) { + committableCollectorState.get().forEach(cc -> committableCollector.merge(cc)); + lastCompletedCheckpointId = context.getRestoredCheckpointId().getAsLong(); + // try to re-commit recovered transactions as quickly as possible + commitAndEmitCheckpoints(); + } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); - committerHandler.snapshotState(context); + // It is important to copy the collector to not mutate the state. + committableCollectorState.update(Collections.singletonList(committableCollector.copy())); } @Override public void endInput() throws Exception { - committerHandler.endOfInput(); - commitRetrier.retryIndefinitely(); + Collection<? extends Committables<CommT>> endOfInputCommittables = + committableCollector.getEndOfInputCommittables(); + // indicates batch + if (endOfInputCommittables != null) { + do { + for (Committables<CommT> endOfInputCommittable : endOfInputCommittables) { + commitAndEmit(endOfInputCommittable, false); + } + } while (!committableCollector.isFinished()); + } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - committerHandler.notifyCheckpointCompleted(checkpointId); - commitRetrier.retryWithDelay(); + lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); + commitAndEmitCheckpoints(); + } + + private void commitAndEmitCheckpoints() throws IOException, InterruptedException { + for (CheckpointCommittables<CommT> checkpoint : + committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) { + // wait for all committables of the current checkpoint before submission Review comment: the comment is a little bit confused. What does a boolean check mean "wait"? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java ########## @@ -131,6 +150,10 @@ protected DataStreamSink(DataStream<T> inputStream, Sink<T, ?, ?, ?> sink) { */ @PublicEvolving public DataStreamSink<T> setUidHash(String uidHash) { + if (!(transformation instanceof LegacySinkTransformation)) { Review comment: It is the transformation's responsibility to handle this exception, i.e. in this case `SinkTransformation` ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -924,5 +924,10 @@ public long getDefaultBufferTimeout() { public ReadableConfig getGraphGeneratorConfig() { return config; } + + @Override + public Collection<Integer> transform(Transformation<?> transformation) { Review comment: Normally, Context, like the name describes, does not do actions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
