This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cf213ec9e4 [FLINK-34208] Migrate SinkV1Adapter to the new SinkV2 API
2cf213ec9e4 is described below

commit 2cf213ec9e4db81815a9b37013904aa47a94aa01
Author: Peter Vary <[email protected]>
AuthorDate: Tue Jan 23 10:59:32 2024 +0100

    [FLINK-34208] Migrate SinkV1Adapter to the new SinkV2 API
---
 .../api/transformations/SinkV1Adapter.java         | 51 ++++++++++++++--------
 .../api/transformations/SinkV1AdapterTest.java     | 24 +++++-----
 .../sink/WithAdapterCommitterOperatorTest.java     |  8 ++--
 3 files changed, 49 insertions(+), 34 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
index 478898ff19b..5b250fd43c2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
@@ -28,17 +28,17 @@ import 
org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
 import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
-import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.util.UserCodeClassLoader;
 
@@ -72,7 +72,13 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT, 
GlobalCommT> implements
     }
 
     @Override
-    public SinkWriterV1Adapter<InputT, CommT, WriterStateT> 
createWriter(InitContext context)
+    public org.apache.flink.api.connector.sink2.SinkWriter<InputT> 
createWriter(InitContext context)
+            throws IOException {
+        throw new UnsupportedOperationException("Not supported");
+    }
+
+    @Override
+    public SinkWriterV1Adapter<InputT, CommT, WriterStateT> 
createWriter(WriterInitContext context)
             throws IOException {
         org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, 
WriterStateT> writer =
                 sink.createWriter(new InitContextAdapter(context), 
Collections.emptyList());
@@ -117,7 +123,7 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT, 
GlobalCommT> implements
 
     private static class SinkWriterV1Adapter<InputT, CommT, WriterStateT>
             implements StatefulSinkWriter<InputT, WriterStateT>,
-                    PrecommittingSinkWriter<InputT, CommT> {
+                    CommittingSinkWriter<InputT, CommT> {
 
         private final org.apache.flink.api.connector.sink.SinkWriter<InputT, 
CommT, WriterStateT>
                 writer;
@@ -184,9 +190,9 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT, 
GlobalCommT> implements
     private static class InitContextAdapter
             implements org.apache.flink.api.connector.sink.Sink.InitContext {
 
-        private final InitContext context;
+        private final WriterInitContext context;
 
-        public InitContextAdapter(InitContext context) {
+        public InitContextAdapter(WriterInitContext context) {
             this.context = context;
         }
 
@@ -302,8 +308,14 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT, 
GlobalCommT> implements
     /** Main class to simulate SinkV1 with SinkV2. */
     class PlainSinkAdapter implements Sink<InputT> {
         @Override
-        public SinkWriterV1Adapter<InputT, CommT, WriterStateT> 
createWriter(InitContext context)
-                throws IOException {
+        public org.apache.flink.api.connector.sink2.SinkWriter<InputT> 
createWriter(
+                InitContext context) throws IOException {
+            throw new UnsupportedOperationException("Not supported");
+        }
+
+        @Override
+        public SinkWriterV1Adapter<InputT, CommT, WriterStateT> createWriter(
+                WriterInitContext context) throws IOException {
             return SinkV1Adapter.this.createWriter(context);
         }
 
@@ -314,10 +326,11 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT, 
GlobalCommT> implements
     }
 
     private class StatefulSinkAdapter extends PlainSinkAdapter
-            implements StatefulSink<InputT, WriterStateT> {
+            implements SupportsWriterState<InputT, WriterStateT> {
         @Override
         public StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
-                InitContext context, Collection<WriterStateT> recoveredState) 
throws IOException {
+                WriterInitContext context, Collection<WriterStateT> 
recoveredState)
+                throws IOException {
             org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, 
WriterStateT> writer =
                     sink.createWriter(
                             new InitContextAdapter(context), new 
ArrayList<>(recoveredState));
@@ -335,7 +348,7 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT, 
GlobalCommT> implements
     }
 
     private class TwoPhaseCommittingSinkAdapter extends PlainSinkAdapter
-            implements TwoPhaseCommittingSink<InputT, CommT>, 
WithCompatibleState {
+            implements SupportsCommitter<CommT>, 
SupportsWriterState.WithCompatibleState {
         @Override
         public Committer<CommT> createCommitter(CommitterInitContext context) 
throws IOException {
             return new CommitterAdapter<>(
@@ -358,7 +371,7 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT, 
GlobalCommT> implements
     }
 
     private class GlobalCommittingSinkAdapter extends 
TwoPhaseCommittingSinkAdapter
-            implements WithPostCommitTopology<InputT, CommT> {
+            implements SupportsPostCommitTopology<CommT> {
 
         @Override
         public void 
addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
@@ -371,7 +384,9 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT, 
GlobalCommT> implements
     }
 
     private class StatefulTwoPhaseCommittingSinkAdapter extends 
StatefulSinkAdapter
-            implements TwoPhaseCommittingSink<InputT, CommT>, 
WithCompatibleState {
+            implements Sink<InputT>,
+                    SupportsCommitter<CommT>,
+                    SupportsWriterState.WithCompatibleState {
         TwoPhaseCommittingSinkAdapter adapter = new 
TwoPhaseCommittingSinkAdapter();
 
         @Override
@@ -392,7 +407,7 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT, 
GlobalCommT> implements
 
     private class StatefulGlobalTwoPhaseCommittingSinkAdapter
             extends StatefulTwoPhaseCommittingSinkAdapter
-            implements WithPostCommitTopology<InputT, CommT> {
+            implements SupportsPostCommitTopology<CommT> {
         GlobalCommittingSinkAdapter globalCommittingSinkAdapter = new 
GlobalCommittingSinkAdapter();
 
         @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java
index 35f68f2495e..edb2f5d9829 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java
@@ -22,10 +22,10 @@ import org.apache.flink.api.connector.sink.Committer;
 import org.apache.flink.api.connector.sink.GlobalCommitter;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
 
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -56,27 +56,27 @@ class SinkV1AdapterTest {
     private static List<Arguments> provideSinkCombinations() {
         return Arrays.asList(
                 Arguments.of(new DefaultSinkV1(), 
Collections.singletonList(Sink.class)),
-                Arguments.of(new StateFulSinkV1(), Arrays.asList(Sink.class, 
StatefulSink.class)),
                 Arguments.of(
-                        new CommittingSinkV1(),
-                        Arrays.asList(Sink.class, 
TwoPhaseCommittingSink.class)),
+                        new StateFulSinkV1(), Arrays.asList(Sink.class, 
SupportsWriterState.class)),
+                Arguments.of(
+                        new CommittingSinkV1(), Arrays.asList(Sink.class, 
SupportsCommitter.class)),
                 Arguments.of(
                         new StatefulCommittingSinkV1(),
                         Arrays.asList(
-                                Sink.class, StatefulSink.class, 
TwoPhaseCommittingSink.class)),
+                                Sink.class, SupportsWriterState.class, 
SupportsCommitter.class)),
                 Arguments.of(
                         new GlobalCommittingSinkV1(),
                         Arrays.asList(
                                 Sink.class,
-                                TwoPhaseCommittingSink.class,
-                                WithPostCommitTopology.class)),
+                                SupportsCommitter.class,
+                                SupportsPostCommitTopology.class)),
                 Arguments.of(
                         new StatefulGlobalCommittingSinkV1(),
                         Arrays.asList(
                                 Sink.class,
-                                StatefulSink.class,
-                                TwoPhaseCommittingSink.class,
-                                WithPostCommitTopology.class)));
+                                SupportsWriterState.class,
+                                SupportsCommitter.class,
+                                SupportsPostCommitTopology.class)));
     }
 
     private static class DefaultSinkV1
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
index c516db87467..6d4c3586f38 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
 
 import java.util.Collections;
 import java.util.List;
@@ -29,7 +29,7 @@ class WithAdapterCommitterOperatorTest extends 
CommitterOperatorTestBase {
     SinkAndCounters sinkWithPostCommit() {
         ForwardingCommitter committer = new ForwardingCommitter();
         return new SinkAndCounters(
-                (TwoPhaseCommittingSink<?, String>)
+                (SupportsCommitter<String>)
                         TestSink.newBuilder()
                                 .setCommitter(committer)
                                 .setDefaultGlobalCommitter()
@@ -43,7 +43,7 @@ class WithAdapterCommitterOperatorTest extends 
CommitterOperatorTestBase {
     @Override
     SinkAndCounters sinkWithPostCommitWithRetry() {
         return new SinkAndCounters(
-                (TwoPhaseCommittingSink<?, String>)
+                (SupportsCommitter<String>)
                         TestSink.newBuilder()
                                 .setCommitter(new 
TestSink.RetryOnceCommitter())
                                 .setDefaultGlobalCommitter()
@@ -58,7 +58,7 @@ class WithAdapterCommitterOperatorTest extends 
CommitterOperatorTestBase {
     SinkAndCounters sinkWithoutPostCommit() {
         ForwardingCommitter committer = new ForwardingCommitter();
         return new SinkAndCounters(
-                (TwoPhaseCommittingSink<?, String>)
+                (SupportsCommitter<String>)
                         TestSink.newBuilder()
                                 .setCommitter(committer)
                                 .setCommittableSerializer(

Reply via email to