This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2cf213ec9e4 [FLINK-34208] Migrate SinkV1Adapter to the new SinkV2 API
2cf213ec9e4 is described below
commit 2cf213ec9e4db81815a9b37013904aa47a94aa01
Author: Peter Vary <[email protected]>
AuthorDate: Tue Jan 23 10:59:32 2024 +0100
[FLINK-34208] Migrate SinkV1Adapter to the new SinkV2 API
---
.../api/transformations/SinkV1Adapter.java | 51 ++++++++++++++--------
.../api/transformations/SinkV1AdapterTest.java | 24 +++++-----
.../sink/WithAdapterCommitterOperatorTest.java | 8 ++--
3 files changed, 49 insertions(+), 34 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
index 478898ff19b..5b250fd43c2 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
@@ -28,17 +28,17 @@ import
org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
-import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.UserCodeClassLoader;
@@ -72,7 +72,13 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT,
GlobalCommT> implements
}
@Override
- public SinkWriterV1Adapter<InputT, CommT, WriterStateT>
createWriter(InitContext context)
+ public org.apache.flink.api.connector.sink2.SinkWriter<InputT>
createWriter(InitContext context)
+ throws IOException {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public SinkWriterV1Adapter<InputT, CommT, WriterStateT>
createWriter(WriterInitContext context)
throws IOException {
org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT,
WriterStateT> writer =
sink.createWriter(new InitContextAdapter(context),
Collections.emptyList());
@@ -117,7 +123,7 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT,
GlobalCommT> implements
private static class SinkWriterV1Adapter<InputT, CommT, WriterStateT>
implements StatefulSinkWriter<InputT, WriterStateT>,
- PrecommittingSinkWriter<InputT, CommT> {
+ CommittingSinkWriter<InputT, CommT> {
private final org.apache.flink.api.connector.sink.SinkWriter<InputT,
CommT, WriterStateT>
writer;
@@ -184,9 +190,9 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT,
GlobalCommT> implements
private static class InitContextAdapter
implements org.apache.flink.api.connector.sink.Sink.InitContext {
- private final InitContext context;
+ private final WriterInitContext context;
- public InitContextAdapter(InitContext context) {
+ public InitContextAdapter(WriterInitContext context) {
this.context = context;
}
@@ -302,8 +308,14 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT,
GlobalCommT> implements
/** Main class to simulate SinkV1 with SinkV2. */
class PlainSinkAdapter implements Sink<InputT> {
@Override
- public SinkWriterV1Adapter<InputT, CommT, WriterStateT>
createWriter(InitContext context)
- throws IOException {
+ public org.apache.flink.api.connector.sink2.SinkWriter<InputT>
createWriter(
+ InitContext context) throws IOException {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public SinkWriterV1Adapter<InputT, CommT, WriterStateT> createWriter(
+ WriterInitContext context) throws IOException {
return SinkV1Adapter.this.createWriter(context);
}
@@ -314,10 +326,11 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT,
GlobalCommT> implements
}
private class StatefulSinkAdapter extends PlainSinkAdapter
- implements StatefulSink<InputT, WriterStateT> {
+ implements SupportsWriterState<InputT, WriterStateT> {
@Override
public StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
- InitContext context, Collection<WriterStateT> recoveredState)
throws IOException {
+ WriterInitContext context, Collection<WriterStateT>
recoveredState)
+ throws IOException {
org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT,
WriterStateT> writer =
sink.createWriter(
new InitContextAdapter(context), new
ArrayList<>(recoveredState));
@@ -335,7 +348,7 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT,
GlobalCommT> implements
}
private class TwoPhaseCommittingSinkAdapter extends PlainSinkAdapter
- implements TwoPhaseCommittingSink<InputT, CommT>,
WithCompatibleState {
+ implements SupportsCommitter<CommT>,
SupportsWriterState.WithCompatibleState {
@Override
public Committer<CommT> createCommitter(CommitterInitContext context)
throws IOException {
return new CommitterAdapter<>(
@@ -358,7 +371,7 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT,
GlobalCommT> implements
}
private class GlobalCommittingSinkAdapter extends
TwoPhaseCommittingSinkAdapter
- implements WithPostCommitTopology<InputT, CommT> {
+ implements SupportsPostCommitTopology<CommT> {
@Override
public void
addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
@@ -371,7 +384,9 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT,
GlobalCommT> implements
}
private class StatefulTwoPhaseCommittingSinkAdapter extends
StatefulSinkAdapter
- implements TwoPhaseCommittingSink<InputT, CommT>,
WithCompatibleState {
+ implements Sink<InputT>,
+ SupportsCommitter<CommT>,
+ SupportsWriterState.WithCompatibleState {
TwoPhaseCommittingSinkAdapter adapter = new
TwoPhaseCommittingSinkAdapter();
@Override
@@ -392,7 +407,7 @@ public class SinkV1Adapter<InputT, CommT, WriterStateT,
GlobalCommT> implements
private class StatefulGlobalTwoPhaseCommittingSinkAdapter
extends StatefulTwoPhaseCommittingSinkAdapter
- implements WithPostCommitTopology<InputT, CommT> {
+ implements SupportsPostCommitTopology<CommT> {
GlobalCommittingSinkAdapter globalCommittingSinkAdapter = new
GlobalCommittingSinkAdapter();
@Override
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java
index 35f68f2495e..edb2f5d9829 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java
@@ -22,10 +22,10 @@ import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -56,27 +56,27 @@ class SinkV1AdapterTest {
private static List<Arguments> provideSinkCombinations() {
return Arrays.asList(
Arguments.of(new DefaultSinkV1(),
Collections.singletonList(Sink.class)),
- Arguments.of(new StateFulSinkV1(), Arrays.asList(Sink.class,
StatefulSink.class)),
Arguments.of(
- new CommittingSinkV1(),
- Arrays.asList(Sink.class,
TwoPhaseCommittingSink.class)),
+ new StateFulSinkV1(), Arrays.asList(Sink.class,
SupportsWriterState.class)),
+ Arguments.of(
+ new CommittingSinkV1(), Arrays.asList(Sink.class,
SupportsCommitter.class)),
Arguments.of(
new StatefulCommittingSinkV1(),
Arrays.asList(
- Sink.class, StatefulSink.class,
TwoPhaseCommittingSink.class)),
+ Sink.class, SupportsWriterState.class,
SupportsCommitter.class)),
Arguments.of(
new GlobalCommittingSinkV1(),
Arrays.asList(
Sink.class,
- TwoPhaseCommittingSink.class,
- WithPostCommitTopology.class)),
+ SupportsCommitter.class,
+ SupportsPostCommitTopology.class)),
Arguments.of(
new StatefulGlobalCommittingSinkV1(),
Arrays.asList(
Sink.class,
- StatefulSink.class,
- TwoPhaseCommittingSink.class,
- WithPostCommitTopology.class)));
+ SupportsWriterState.class,
+ SupportsCommitter.class,
+ SupportsPostCommitTopology.class)));
}
private static class DefaultSinkV1
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
index c516db87467..6d4c3586f38 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.runtime.operators.sink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
import java.util.Collections;
import java.util.List;
@@ -29,7 +29,7 @@ class WithAdapterCommitterOperatorTest extends
CommitterOperatorTestBase {
SinkAndCounters sinkWithPostCommit() {
ForwardingCommitter committer = new ForwardingCommitter();
return new SinkAndCounters(
- (TwoPhaseCommittingSink<?, String>)
+ (SupportsCommitter<String>)
TestSink.newBuilder()
.setCommitter(committer)
.setDefaultGlobalCommitter()
@@ -43,7 +43,7 @@ class WithAdapterCommitterOperatorTest extends
CommitterOperatorTestBase {
@Override
SinkAndCounters sinkWithPostCommitWithRetry() {
return new SinkAndCounters(
- (TwoPhaseCommittingSink<?, String>)
+ (SupportsCommitter<String>)
TestSink.newBuilder()
.setCommitter(new
TestSink.RetryOnceCommitter())
.setDefaultGlobalCommitter()
@@ -58,7 +58,7 @@ class WithAdapterCommitterOperatorTest extends
CommitterOperatorTestBase {
SinkAndCounters sinkWithoutPostCommit() {
ForwardingCommitter committer = new ForwardingCommitter();
return new SinkAndCounters(
- (TwoPhaseCommittingSink<?, String>)
+ (SupportsCommitter<String>)
TestSink.newBuilder()
.setCommitter(committer)
.setCommittableSerializer(