fapaul commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r794485945



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
+import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Translates Sink V1 into Sink V2. */
+@Internal
+public class SinkV1Adapter<InputT, CommT, WriterStateT, GlobalCommT> 
implements Sink<InputT> {
+
+    private final org.apache.flink.api.connector.sink.Sink<InputT, CommT, 
WriterStateT, GlobalCommT>
+            sink;
+
+    private SinkV1Adapter(
+            org.apache.flink.api.connector.sink.Sink<InputT, CommT, 
WriterStateT, GlobalCommT>
+                    sink) {
+        this.sink = sink;
+    }
+
+    public static <InputT> Sink<InputT> wrap(
+            org.apache.flink.api.connector.sink.Sink<InputT, ?, ?, ?> sink) {
+        return new SinkV1Adapter<>(sink).asSpecializedSink();
+    }
+
+    @Override
+    public SinkWriterV1Adapter<InputT, CommT, WriterStateT> 
createWriter(InitContext context)
+            throws IOException {
+        org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, 
WriterStateT> writer =
+                sink.createWriter(new InitContextAdapter(context), 
Collections.emptyList());
+        return new SinkWriterV1Adapter<>(writer);
+    }
+
+    public Sink<InputT> asSpecializedSink() {
+        boolean stateful = false;
+        boolean globalCommitter = false;
+        boolean committer = false;
+        if (sink.getWriterStateSerializer().isPresent()) {
+            stateful = true;
+        }
+        if (sink.getGlobalCommittableSerializer().isPresent()) {
+            globalCommitter = true;
+        }
+        if (sink.getCommittableSerializer().isPresent()) {
+            committer = true;
+        }
+
+        if (globalCommitter && committer && stateful) {
+            return new StatefulGlobalTwoPhaseCommittingSinkAdapter();
+        }
+        if (globalCommitter) {
+            return new GlobalCommittingSinkAdapter();
+        }
+        if (committer && stateful) {
+            return new StatefulTwoPhaseCommittingSinkAdapter();
+        }
+        if (committer) {
+            return new TwoPhaseCommittingSinkAdapter();
+        }
+        if (stateful) {
+            return new StatefulSinkAdapter();
+        }
+        return this;
+    }
+
+    private static class SinkWriterV1Adapter<InputT, CommT, WriterStateT>
+            implements StatefulSinkWriter<InputT, WriterStateT>,
+                    PrecommittingSinkWriter<InputT, CommT> {
+
+        private final org.apache.flink.api.connector.sink.SinkWriter<InputT, 
CommT, WriterStateT>
+                writer;
+        private boolean endOfInput = false;
+        private final WriterContextAdapter contextAdapter = new 
WriterContextAdapter();
+
+        public SinkWriterV1Adapter(
+                org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, 
WriterStateT>
+                        writer) {
+            this.writer = writer;
+        }
+
+        @Override
+        public void write(InputT element, Context context)
+                throws IOException, InterruptedException {
+            contextAdapter.setContext(context);
+            this.writer.write(element, contextAdapter);
+        }
+
+        @Override
+        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+            this.endOfInput = endOfInput;
+        }
+
+        @Override
+        public List<WriterStateT> snapshotState(long checkpointId) throws 
IOException {
+            return writer.snapshotState(checkpointId);
+        }
+
+        @Override
+        public Collection<CommT> prepareCommit() throws IOException, 
InterruptedException {
+            return writer.prepareCommit(endOfInput);
+        }
+
+        @Override
+        public void close() throws Exception {
+            writer.close();
+        }
+
+        @Override
+        public void writeWatermark(Watermark watermark) throws IOException, 
InterruptedException {
+            writer.writeWatermark(watermark);
+        }
+    }
+
+    private static class WriterContextAdapter implements SinkWriter.Context {
+        private org.apache.flink.api.connector.sink2.SinkWriter.Context 
context;
+
+        public void 
setContext(org.apache.flink.api.connector.sink2.SinkWriter.Context context) {
+            this.context = context;
+        }
+
+        @Override
+        public long currentWatermark() {
+            return context.currentWatermark();
+        }
+
+        @Override
+        public Long timestamp() {
+            return context.timestamp();
+        }
+    }
+
+    private static class InitContextAdapter
+            implements org.apache.flink.api.connector.sink.Sink.InitContext {
+
+        private final InitContext context;
+
+        public InitContextAdapter(InitContext context) {
+            this.context = context;
+        }
+
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return context.getUserCodeClassLoader();
+        }
+
+        @Override
+        public MailboxExecutor getMailboxExecutor() {
+            return context.getMailboxExecutor();
+        }
+
+        @Override
+        public ProcessingTimeService getProcessingTimeService() {
+            return new 
ProcessingTimeServiceAdapter(context.getProcessingTimeService());
+        }
+
+        @Override
+        public int getSubtaskId() {
+            return context.getSubtaskId();
+        }
+
+        @Override
+        public int getNumberOfParallelSubtasks() {
+            return context.getNumberOfParallelSubtasks();
+        }
+
+        @Override
+        public SinkWriterMetricGroup metricGroup() {
+            return context.metricGroup();
+        }
+
+        @Override
+        public OptionalLong getRestoredCheckpointId() {
+            return context.getRestoredCheckpointId();
+        }
+
+        public InitializationContext 
asSerializationSchemaInitializationContext() {
+            return context.asSerializationSchemaInitializationContext();
+        }
+    }
+
+    private static class ProcessingTimeCallbackAdapter implements 
ProcessingTimeCallback {
+
+        private final ProcessingTimeService.ProcessingTimeCallback 
processingTimerCallback;
+
+        public ProcessingTimeCallbackAdapter(
+                ProcessingTimeService.ProcessingTimeCallback 
processingTimerCallback) {
+            this.processingTimerCallback = processingTimerCallback;
+        }
+
+        @Override
+        public void onProcessingTime(long time) throws IOException, 
InterruptedException {
+            processingTimerCallback.onProcessingTime(time);
+        }
+    }
+
+    private static class ProcessingTimeServiceAdapter implements 
ProcessingTimeService {
+
+        private final 
org.apache.flink.api.common.operators.ProcessingTimeService
+                processingTimeService;
+
+        public ProcessingTimeServiceAdapter(
+                org.apache.flink.api.common.operators.ProcessingTimeService 
processingTimeService) {
+            this.processingTimeService = processingTimeService;
+        }
+
+        @Override
+        public long getCurrentProcessingTime() {
+            return processingTimeService.getCurrentProcessingTime();
+        }
+
+        @Override
+        public void registerProcessingTimer(
+                long time, ProcessingTimeCallback processingTimerCallback) {
+            processingTimeService.registerTimer(
+                    time, new 
ProcessingTimeCallbackAdapter(processingTimerCallback));
+        }
+    }
+
+    private static class CommitterAdapter<CommT> implements Committer<CommT> {
+
+        private final org.apache.flink.api.connector.sink.Committer<CommT> 
committer;
+
+        public 
CommitterAdapter(org.apache.flink.api.connector.sink.Committer<CommT> 
committer) {
+            this.committer = committer;
+        }
+
+        @Override
+        public void commit(Collection<CommitRequest<CommT>> commitRequests)
+                throws IOException, InterruptedException {
+            List<CommT> failed =
+                    committer.commit(
+                            commitRequests.stream()
+                                    .map(CommitRequest::getCommittable)
+                                    .collect(Collectors.toList()));
+            if (!failed.isEmpty()) {
+                Set<CommT> indexed = Collections.newSetFromMap(new 
IdentityHashMap<>());
+                indexed.addAll(failed);
+                commitRequests.stream()
+                        .filter(request -> 
indexed.contains(request.getCommittable()))
+                        .forEach(CommitRequest::retryLater);
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            committer.close();
+        }
+    }
+
+    /** Main class to simulate SinkV1 with SinkV2. */
+    @VisibleForTesting
+    @Internal
+    public class PlainSinkAdapter implements Sink<InputT> {
+        @Override
+        public SinkWriterV1Adapter<InputT, CommT, WriterStateT> 
createWriter(InitContext context)
+                throws IOException {
+            return SinkV1Adapter.this.createWriter(context);
+        }
+
+        @VisibleForTesting
+        public org.apache.flink.api.connector.sink.Sink<InputT, CommT, 
WriterStateT, GlobalCommT>
+                getSink() {
+            return sink;
+        }
+    }
+
+    private class StatefulSinkAdapter extends PlainSinkAdapter
+            implements StatefulSink<InputT, WriterStateT> {
+        @Override
+        public StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+                InitContext context, Collection<WriterStateT> recoveredState) 
throws IOException {
+            org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, 
WriterStateT> writer =
+                    sink.createWriter(
+                            new InitContextAdapter(context), new 
ArrayList<>(recoveredState));
+            return new SinkWriterV1Adapter<>(writer);
+        }
+
+        @Override
+        public SimpleVersionedSerializer<WriterStateT> 
getWriterStateSerializer() {
+            return sink.getWriterStateSerializer()
+                    .orElseThrow(
+                            () ->
+                                    new IllegalStateException(
+                                            "This method should only be called 
after adapter established that the result is non-empty."));
+        }
+    }
+
+    private class TwoPhaseCommittingSinkAdapter extends PlainSinkAdapter
+            implements TwoPhaseCommittingSink<InputT, CommT>, 
WithCompatibleState {
+        @Override
+        public Committer<CommT> createCommitter() throws IOException {
+            return new CommitterAdapter<>(
+                    sink.createCommitter().orElse(new 
SinkV1Adapter.NoopCommitter<>()));
+        }
+
+        @Override
+        public SimpleVersionedSerializer<CommT> getCommittableSerializer() {
+            return sink.getCommittableSerializer()
+                    .orElseThrow(
+                            () ->
+                                    new IllegalStateException(
+                                            "This method should only be called 
after adapter established that the result is non-empty."));
+        }
+
+        @Override
+        public Collection<String> getCompatibleWriterStateNames() {
+            return sink.getCompatibleStateNames();
+        }
+    }
+
+    private class GlobalCommittingSinkAdapter extends 
TwoPhaseCommittingSinkAdapter
+            implements WithPostCommitTopology<InputT, CommT> {
+
+        @Override
+        public void 
addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
+            StandardSinkTopologies.addGlobalCommitter(committables, 
GlobalCommitterAdapter::new);
+        }
+    }
+
+    private class StatefulTwoPhaseCommittingSinkAdapter extends 
StatefulSinkAdapter

Review comment:
       AFAICT I need multiple classes or it increases the complexity to 
implement the given use cases with one class i.e. converting a previously only 
SinkWriter -> GlobalCommitter topologies now becomes SinkWriter -> 
ForwardCommitter -> GlobalCommitter. I think the conditional logic is then 
distributed across all creation methods (i.e. SinkWriter, Commiter, 
GlobalCommiter) which I find harder to understand instead of doing the checks 
once and returning different implementations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to