This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fd673a2f462 [FLINK-33974] Implement the Sink transformation depending
on the new SinkV2 interfaces
fd673a2f462 is described below
commit fd673a2f46206ff65978f05fcb96b525696aead2
Author: pvary <[email protected]>
AuthorDate: Fri Jan 19 14:04:53 2024 +0100
[FLINK-33974] Implement the Sink transformation depending on the new SinkV2
interfaces
---
.../operators/sink/CommitterOperatorFactory.java | 12 ++-
.../runtime/operators/sink/SinkWriterOperator.java | 23 +++---
.../sink/StatefulSinkWriterStateHandler.java | 13 ++--
.../translators/SinkTransformationTranslator.java | 85 ++++++++++++++--------
.../SinkV2TransformationTranslatorITCase.java | 9 +--
.../api/graph/StreamGraphGeneratorTest.java | 23 ++++++
.../streaming/util/TestExpandingSinkWithMixin.java | 85 ++++++++++++++++++++++
7 files changed, 186 insertions(+), 64 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
index 1f013ca0580..162f8d04181 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
@@ -19,9 +19,9 @@
package org.apache.flink.streaming.runtime.operators.sink;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -41,14 +41,12 @@ public final class CommitterOperatorFactory<CommT>
implements OneInputStreamOperatorFactory<
CommittableMessage<CommT>, CommittableMessage<CommT>> {
- private final TwoPhaseCommittingSink<?, CommT> sink;
+ private final SupportsCommitter<CommT> sink;
private final boolean isBatchMode;
private final boolean isCheckpointingEnabled;
public CommitterOperatorFactory(
- TwoPhaseCommittingSink<?, CommT> sink,
- boolean isBatchMode,
- boolean isCheckpointingEnabled) {
+ SupportsCommitter<CommT> sink, boolean isBatchMode, boolean
isCheckpointingEnabled) {
this.sink = checkNotNull(sink);
this.isBatchMode = isBatchMode;
this.isCheckpointingEnabled = isCheckpointingEnabled;
@@ -65,7 +63,7 @@ public final class CommitterOperatorFactory<CommT>
processingTimeService,
sink.getCommittableSerializer(),
context -> sink.createCommitter(context),
- sink instanceof WithPostCommitTopology,
+ sink instanceof SupportsPostCommitTopology,
isBatchMode,
isCheckpointingEnabled);
committerOperator.setup(
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index c0a9892d5ee..a9d0f682a78 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -113,18 +113,17 @@ class SinkWriterOperator<InputT, CommT> extends
AbstractStreamOperator<Committab
this.processingTimeService = checkNotNull(processingTimeService);
this.mailboxExecutor = checkNotNull(mailboxExecutor);
this.context = new Context<>();
- this.emitDownstream = sink instanceof TwoPhaseCommittingSink;
+ this.emitDownstream = sink instanceof SupportsCommitter;
- if (sink instanceof StatefulSink) {
+ if (sink instanceof SupportsWriterState) {
writerStateHandler =
- new StatefulSinkWriterStateHandler<>((StatefulSink<InputT,
?>) sink);
+ new
StatefulSinkWriterStateHandler<>((SupportsWriterState<InputT, ?>) sink);
} else {
writerStateHandler = new StatelessSinkWriterStateHandler<>(sink);
}
- if (sink instanceof TwoPhaseCommittingSink) {
- committableSerializer =
- ((TwoPhaseCommittingSink<InputT, CommT>)
sink).getCommittableSerializer();
+ if (sink instanceof SupportsCommitter) {
+ committableSerializer = ((SupportsCommitter<CommT>)
sink).getCommittableSerializer();
} else {
committableSerializer = null;
}
@@ -188,13 +187,13 @@ class SinkWriterOperator<InputT, CommT> extends
AbstractStreamOperator<Committab
if (!emitDownstream) {
// To support SinkV1 topologies with only a writer we have to call
prepareCommit
// although no committables are forwarded
- if (sinkWriter instanceof PrecommittingSinkWriter) {
- ((PrecommittingSinkWriter<?, ?>) sinkWriter).prepareCommit();
+ if (sinkWriter instanceof CommittingSinkWriter) {
+ ((CommittingSinkWriter<?, ?>) sinkWriter).prepareCommit();
}
return;
}
Collection<CommT> committables =
- ((PrecommittingSinkWriter<?, CommT>)
sinkWriter).prepareCommit();
+ ((CommittingSinkWriter<?, CommT>) sinkWriter).prepareCommit();
StreamingRuntimeContext runtimeContext = getRuntimeContext();
final int indexOfThisSubtask =
runtimeContext.getTaskInfo().getIndexOfThisSubtask();
final int numberOfParallelSubtasks =
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
index 2e83a3b8a6c..3c70627a50e 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.SupportsWriterState;
import
org.apache.flink.api.connector.sink2.SupportsWriterState.WithCompatibleState;
@@ -79,10 +78,12 @@ final class StatefulSinkWriterStateHandler<InputT,
WriterStateT>
private StatefulSinkWriter<InputT, WriterStateT> sinkWriter;
- public StatefulSinkWriterStateHandler(StatefulSink<InputT, WriterStateT>
sink) {
- this.sink = sink;
+ public StatefulSinkWriterStateHandler(SupportsWriterState<InputT,
WriterStateT> sink) {
+ Preconditions.checkArgument(
+ sink instanceof Sink, "Should be an instance of " +
Sink.class.getName());
+ this.sink = (Sink<InputT>) sink;
Collection<String> previousSinkStateNames =
- sink instanceof StatefulSink.WithCompatibleState
+ sink instanceof SupportsWriterState.WithCompatibleState
? ((WithCompatibleState)
sink).getCompatibleWriterStateNames()
: Collections.emptyList();
this.writerStateSimpleVersionedSerializer =
sink.getWriterStateSerializer();
@@ -116,10 +117,6 @@ final class StatefulSinkWriterStateHandler<InputT,
WriterStateT>
Iterables.addAll(states, previousSinkState.get());
}
- if (!(sink instanceof SupportsWriterState)) {
- throw new IllegalArgumentException("Sink should implement
SupportsWriterState");
- }
-
sinkWriter = ((SupportsWriterState)
sink).restoreWriter(initContext, states);
} else {
sinkWriter = cast(sink.createWriter(initContext));
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index ce009320659..e924086a1d8 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -24,15 +24,15 @@ import
org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
-import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
-import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
-import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -44,6 +44,7 @@ import
org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
@@ -135,16 +136,27 @@ public class SinkTransformationTranslator<Input, Output>
DataStream<T> prewritten = inputStream;
- if (sink instanceof WithPreWriteTopology) {
+ if (sink instanceof SupportsPreWriteTopology) {
prewritten =
adjustTransformations(
prewritten,
- ((WithPreWriteTopology<T>)
sink)::addPreWriteTopology,
+ ((SupportsPreWriteTopology<T>)
sink)::addPreWriteTopology,
true,
sink instanceof
SupportsConcurrentExecutionAttempts);
}
- if (sink instanceof TwoPhaseCommittingSink) {
+ if (sink instanceof SupportsPreCommitTopology) {
+ Preconditions.checkArgument(
+ sink instanceof SupportsCommitter,
+ "Sink with SupportsPreCommitTopology should implement
SupportsCommitter");
+ }
+ if (sink instanceof SupportsPostCommitTopology) {
+ Preconditions.checkArgument(
+ sink instanceof SupportsCommitter,
+ "Sink with SupportsPostCommitTopology should implement
SupportsCommitter");
+ }
+
+ if (sink instanceof SupportsCommitter) {
addCommittingTopology(sink, prewritten);
} else {
adjustTransformations(
@@ -173,32 +185,27 @@ public class SinkTransformationTranslator<Input, Output>
}
}
- private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T>
inputStream) {
- TwoPhaseCommittingSink<T, CommT> committingSink =
- (TwoPhaseCommittingSink<T, CommT>) sink;
- TypeInformation<CommittableMessage<CommT>> typeInformation =
+ private <CommT, WriteResultT> void addCommittingTopology(
+ Sink<T> sink, DataStream<T> inputStream) {
+ SupportsCommitter<CommT> committingSink =
(SupportsCommitter<CommT>) sink;
+ TypeInformation<CommittableMessage<CommT>>
committableTypeInformation =
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
- DataStream<CommittableMessage<CommT>> written =
- adjustTransformations(
- inputStream,
- input ->
- input.transform(
- WRITER_NAME,
- typeInformation,
- new
SinkWriterOperatorFactory<>(sink)),
- false,
- sink instanceof
SupportsConcurrentExecutionAttempts);
+ DataStream<CommittableMessage<CommT>> precommitted;
+ if (sink instanceof SupportsPreCommitTopology) {
+ SupportsPreCommitTopology<WriteResultT, CommT>
preCommittingSink =
+ (SupportsPreCommitTopology<WriteResultT, CommT>) sink;
+ TypeInformation<CommittableMessage<WriteResultT>>
writeResultTypeInformation =
+
CommittableMessageTypeInfo.of(preCommittingSink::getWriteResultSerializer);
- DataStream<CommittableMessage<CommT>> precommitted =
addFailOverRegion(written);
+ DataStream<CommittableMessage<WriteResultT>> writerResult =
+ addWriter(sink, inputStream,
writeResultTypeInformation);
- if (sink instanceof WithPreCommitTopology) {
precommitted =
adjustTransformations(
- precommitted,
- ((WithPreCommitTopology<T, CommT>)
sink)::addPreCommitTopology,
- true,
- false);
+ writerResult,
preCommittingSink::addPreCommitTopology, true, false);
+ } else {
+ precommitted = addWriter(sink, inputStream,
committableTypeInformation);
}
DataStream<CommittableMessage<CommT>> committed =
@@ -207,7 +214,7 @@ public class SinkTransformationTranslator<Input, Output>
pc ->
pc.transform(
COMMITTER_NAME,
- typeInformation,
+ committableTypeInformation,
new CommitterOperatorFactory<>(
committingSink,
isBatchMode,
@@ -215,12 +222,12 @@ public class SinkTransformationTranslator<Input, Output>
false,
false);
- if (sink instanceof WithPostCommitTopology) {
+ if (sink instanceof SupportsPostCommitTopology) {
DataStream<CommittableMessage<CommT>> postcommitted =
addFailOverRegion(committed);
adjustTransformations(
postcommitted,
pc -> {
- ((WithPostCommitTopology<T, CommT>)
sink).addPostCommitTopology(pc);
+ ((SupportsPostCommitTopology<CommT>)
sink).addPostCommitTopology(pc);
return null;
},
true,
@@ -228,6 +235,24 @@ public class SinkTransformationTranslator<Input, Output>
}
}
+ private <WriteResultT> DataStream<CommittableMessage<WriteResultT>>
addWriter(
+ Sink<T> sink,
+ DataStream<T> inputStream,
+ TypeInformation<CommittableMessage<WriteResultT>>
typeInformation) {
+ DataStream<CommittableMessage<WriteResultT>> written =
+ adjustTransformations(
+ inputStream,
+ input ->
+ input.transform(
+ WRITER_NAME,
+ typeInformation,
+ new
SinkWriterOperatorFactory<>(sink)),
+ false,
+ sink instanceof
SupportsConcurrentExecutionAttempts);
+
+ return addFailOverRegion(written);
+ }
+
/**
* Adds a batch exchange that materializes the output first. This is a
no-op in STREAMING.
*/
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
index 612cb9d7808..97b23ababac 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
@@ -67,10 +67,7 @@ public class SinkV2TransformationTranslatorITCase
.setWriterUidHash(writerHash)
.setCommitterUidHash(committerHash)
.build();
- src.sinkTo(
-
TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build(),
- operatorsUidHashes)
- .name(NAME);
+ src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
final StreamGraph streamGraph = env.getStreamGraph();
@@ -87,9 +84,7 @@ public class SinkV2TransformationTranslatorITCase
final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<Integer> src = env.fromElements(1, 2);
-
src.sinkTo(TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build())
- .name(NAME)
- .uid(sinkUid);
+ src.sinkTo(sinkWithCommitter()).name(NAME).uid(sinkUid);
final StreamGraph streamGraph = env.getStreamGraph();
assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index f2a47b2a712..c9e56cad319 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -74,6 +74,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
import
org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.TestExpandingSink;
+import org.apache.flink.streaming.util.TestExpandingSinkWithMixin;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
@@ -908,6 +909,28 @@ public class StreamGraphGeneratorTest extends TestLogger {
});
}
+ @Test
+ public void testAutoParallelismForExpandedTransformationsWithMixin() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(2);
+
+ DataStream<Integer> sourceDataStream = env.fromData(1, 2, 3);
+ // Parallelism is set to -1 (default parallelism identifier) to
imitate the behavior of
+ // the table planner. Parallelism should be set automatically after
translating.
+ sourceDataStream.sinkTo(new
TestExpandingSinkWithMixin()).setParallelism(-1);
+
+ StreamGraph graph = env.getStreamGraph();
+
+ graph.getStreamNodes()
+ .forEach(
+ node -> {
+ if (!node.getOperatorName().startsWith("Source")) {
+
Assertions.assertThat(node.getParallelism()).isEqualTo(2);
+ }
+ });
+ }
+
@Test
public void testCacheTransformation() {
final TestingStreamExecutionEnvironment env = new
TestingStreamExecutionEnvironment();
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSinkWithMixin.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSinkWithMixin.java
new file mode 100644
index 00000000000..b3c7ba7bc92
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSinkWithMixin.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+
+import java.io.IOException;
+
+/** A test sink that expands into a simple subgraph. Do not use in runtime. */
+public class TestExpandingSinkWithMixin
+ implements Sink<Integer>,
+ SupportsCommitter<Integer>,
+ SupportsPreWriteTopology<Integer>,
+ SupportsPreCommitTopology<Integer, Integer>,
+ SupportsPostCommitTopology<Integer> {
+
+ @Override
+ public void addPostCommitTopology(DataStream<CommittableMessage<Integer>>
committables) {
+ committables.sinkTo(new DiscardingSink<>());
+ }
+
+ @Override
+ public DataStream<CommittableMessage<Integer>> addPreCommitTopology(
+ DataStream<CommittableMessage<Integer>> committables) {
+ return committables.map(value ->
value).returns(committables.getType());
+ }
+
+ @Override
+ public DataStream<Integer> addPreWriteTopology(DataStream<Integer>
inputDataStream) {
+ return inputDataStream.map(new NoOpIntMap());
+ }
+
+ @Override
+ public SinkWriter<Integer> createWriter(WriterInitContext context) throws
IOException {
+ return null;
+ }
+
+ @Override
+ public SinkWriter<Integer> createWriter(InitContext context) throws
IOException {
+ return null;
+ }
+
+ @Override
+ public Committer<Integer> createCommitter(CommitterInitContext context) {
+ return null;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Integer> getCommittableSerializer() {
+ return null;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Integer> getWriteResultSerializer() {
+ return null;
+ }
+}