This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 92951a05127 [FLINK-33295] Separate SinkV2 and SinkV1Adapter tests
92951a05127 is described below
commit 92951a05127f1e0e2ab0ea04ae022659fc5276ab
Author: pvary <[email protected]>
AuthorDate: Wed Nov 8 17:55:43 2023 +0100
[FLINK-33295] Separate SinkV2 and SinkV1Adapter tests
Co-authored-by: Peter Vary <[email protected]>
---
.../base/sink/writer/TestSinkInitContext.java | 4 +-
.../connector/file/sink/writer/FileWriterTest.java | 8 +-
.../groups/InternalSinkWriterMetricGroup.java | 16 +-
.../metrics/groups/MetricsGroupTestUtils.java | 47 +++
.../api/datastream/DataStreamSinkTest.java | 8 +-
.../streaming/api/functions/PrintSinkTest.java | 5 +-
.../SinkTransformationTranslatorITCaseBase.java | 225 +++++++++++
...a => SinkV1TransformationTranslatorITCase.java} | 190 +--------
.../SinkV2TransformationTranslatorITCase.java | 100 +++++
...torTest.java => CommitterOperatorTestBase.java} | 122 ++----
.../runtime/operators/sink/SinkTestUtil.java | 4 +-
.../sink/SinkV2CommitterOperatorTest.java | 75 ++++
.../sink/SinkV2SinkWriterOperatorTest.java | 149 +++++++
...orTest.java => SinkWriterOperatorTestBase.java} | 212 ++++------
.../streaming/runtime/operators/sink/TestSink.java | 50 +--
.../runtime/operators/sink/TestSinkV2.java | 434 +++++++++++++++++++++
.../sink/WithAdapterCommitterOperatorTest.java | 83 ++++
.../sink/WithAdapterSinkWriterOperatorTest.java | 132 +++++++
.../flink/test/streaming/runtime/SinkV2ITCase.java | 138 +++++++
.../streaming/runtime/SinkV2MetricsITCase.java | 183 +++++++++
20 files changed, 1725 insertions(+), 460 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index 601d4f9d427..1f70b03413c 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -28,7 +28,7 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -51,7 +51,7 @@ public class TestSinkInitContext implements Sink.InitContext {
private final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
private final SinkWriterMetricGroup metricGroup =
- InternalSinkWriterMetricGroup.mock(
+ MetricsGroupTestUtils.mockWriterMetricGroup(
metricListener.getMetricGroup(), operatorIOMetricGroup);
private final MailboxExecutor mailboxExecutor;
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index 0149bf3e6da..cd0dda1d978 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
@@ -292,7 +292,7 @@ class FileWriterTest {
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
final SinkWriterMetricGroup sinkWriterMetricGroup =
- InternalSinkWriterMetricGroup.mock(
+ MetricsGroupTestUtils.mockWriterMetricGroup(
metricListener.getMetricGroup(),
operatorIOMetricGroup);
Counter recordsCounter =
sinkWriterMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
@@ -471,7 +471,7 @@ class FileWriterTest {
basePath,
rollingPolicy,
outputFileConfig,
-
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
+
MetricsGroupTestUtils.mockWriterMetricGroup(metricListener.getMetricGroup()));
}
private FileWriter<String> createWriter(
@@ -484,7 +484,7 @@ class FileWriterTest {
throws IOException {
return new FileWriter<>(
basePath,
-
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+
MetricsGroupTestUtils.mockWriterMetricGroup(metricListener.getMetricGroup()),
bucketAssigner,
new DefaultFileWriterBucketFactory<>(),
new RowWiseBucketWriter<>(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
index 81aa8d78ce3..27d0c72ed5c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
@@ -26,7 +26,6 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.metrics.MetricNames;
/** Special {@link org.apache.flink.metrics.MetricGroup} representing an
Operator. */
@@ -40,7 +39,8 @@ public class InternalSinkWriterMetricGroup extends
ProxyMetricGroup<MetricGroup>
private final Counter numBytesWritten;
private final OperatorIOMetricGroup operatorIOMetricGroup;
- private InternalSinkWriterMetricGroup(
+ @VisibleForTesting
+ InternalSinkWriterMetricGroup(
MetricGroup parentMetricGroup, OperatorIOMetricGroup
operatorIOMetricGroup) {
super(parentMetricGroup);
numRecordsOutErrors =
parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
@@ -61,18 +61,6 @@ public class InternalSinkWriterMetricGroup extends
ProxyMetricGroup<MetricGroup>
operatorMetricGroup, operatorMetricGroup.getIOMetricGroup());
}
- @VisibleForTesting
- public static InternalSinkWriterMetricGroup mock(MetricGroup metricGroup) {
- return new InternalSinkWriterMetricGroup(
- metricGroup,
UnregisteredMetricsGroup.createOperatorIOMetricGroup());
- }
-
- @VisibleForTesting
- public static InternalSinkWriterMetricGroup mock(
- MetricGroup metricGroup, OperatorIOMetricGroup
operatorIOMetricGroup) {
- return new InternalSinkWriterMetricGroup(metricGroup,
operatorIOMetricGroup);
- }
-
@Override
public OperatorIOMetricGroup getIOMetricGroup() {
return operatorIOMetricGroup;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricsGroupTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricsGroupTestUtils.java
new file mode 100644
index 00000000000..fae09a39e9b
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricsGroupTestUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+/** Util class to create metric groups for SinkV2 tests. */
+public class MetricsGroupTestUtils {
+
+ @VisibleForTesting
+ public static InternalSinkWriterMetricGroup mockWriterMetricGroup() {
+ return new InternalSinkWriterMetricGroup(
+ new UnregisteredMetricsGroup(),
+ UnregisteredMetricsGroup.createOperatorIOMetricGroup());
+ }
+
+ @VisibleForTesting
+ public static InternalSinkWriterMetricGroup
mockWriterMetricGroup(MetricGroup metricGroup) {
+ return new InternalSinkWriterMetricGroup(
+ metricGroup,
UnregisteredMetricsGroup.createOperatorIOMetricGroup());
+ }
+
+ @VisibleForTesting
+ public static InternalSinkWriterMetricGroup mockWriterMetricGroup(
+ MetricGroup metricGroup, OperatorIOMetricGroup
operatorIOMetricGroup) {
+ return new InternalSinkWriterMetricGroup(metricGroup,
operatorIOMetricGroup);
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
index fb2e9d4cceb..401ed55215c 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
-import org.apache.flink.streaming.runtime.operators.sink.TestSink;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.junit.Test;
@@ -33,13 +33,15 @@ public class DataStreamSinkTest {
public void testGettingTransformationWithNewSinkAPI() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final Transformation<?> transformation =
- env.fromElements(1,
2).sinkTo(TestSink.newBuilder().build()).getTransformation();
+ env.fromElements(1, 2)
+ .sinkTo(TestSinkV2.<Integer>newBuilder().build())
+ .getTransformation();
assertTrue(transformation instanceof SinkTransformation);
}
@Test(expected = UnsupportedOperationException.class)
public void throwExceptionWhenSetUidWithNewSinkAPI() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.fromElements(1,
2).sinkTo(TestSink.newBuilder().build()).setUidHash("Test");
+ env.fromElements(1,
2).sinkTo(TestSinkV2.<Integer>newBuilder().build()).setUidHash("Test");
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
index e8760d6d5de..352118db5d4 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
@@ -26,8 +26,7 @@ import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.streaming.api.functions.sink.PrintSink;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FlinkRuntimeException;
@@ -200,7 +199,7 @@ class PrintSinkTest {
@Override
public SinkWriterMetricGroup metricGroup() {
- return InternalSinkWriterMetricGroup.mock(new
UnregisteredMetricsGroup());
+ return MetricsGroupTestUtils.mockWriterMetricGroup();
}
@Override
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
new file mode 100644
index 00000000000..3c93b178b51
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link
org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ *
+ * <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or
re-order test cases.
+ */
+@RunWith(Parameterized.class)
+public abstract class SinkTransformationTranslatorITCaseBase<SinkT> extends
TestLogger {
+
+ @Parameterized.Parameters(name = "Execution Mode: {0}")
+ public static Collection<Object> data() {
+ return Arrays.asList(RuntimeExecutionMode.STREAMING,
RuntimeExecutionMode.BATCH);
+ }
+
+ @Parameterized.Parameter() public RuntimeExecutionMode
runtimeExecutionMode;
+
+ static final String NAME = "FileSink";
+ static final String SLOT_SHARE_GROUP = "FileGroup";
+ static final String UID = "FileUid";
+ static final int PARALLELISM = 2;
+
+ abstract SinkT simpleSink();
+
+ abstract SinkT sinkWithCommitter();
+
+ abstract DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, SinkT
sink);
+
+ @Test
+ public void generateWriterTopology() {
+ final StreamGraph streamGraph = buildGraph(simpleSink(),
runtimeExecutionMode);
+
+ final StreamNode sourceNode = findNodeName(streamGraph, node ->
node.contains("Source"));
+ final StreamNode writerNode = findWriter(streamGraph);
+
+ assertThat(streamGraph.getStreamNodes().size(), equalTo(2));
+
+ validateTopology(
+ sourceNode,
+ IntSerializer.class,
+ writerNode,
+ SinkWriterOperatorFactory.class,
+ PARALLELISM,
+ -1);
+ }
+
+ @Test
+ public void generateWriterCommitterTopology() {
+
+ final StreamGraph streamGraph = buildGraph(sinkWithCommitter(),
runtimeExecutionMode);
+
+ final StreamNode sourceNode = findNodeName(streamGraph, node ->
node.contains("Source"));
+ final StreamNode writerNode = findWriter(streamGraph);
+
+ validateTopology(
+ sourceNode,
+ IntSerializer.class,
+ writerNode,
+ SinkWriterOperatorFactory.class,
+ PARALLELISM,
+ -1);
+
+ final StreamNode committerNode =
+ findNodeName(streamGraph, name -> name.contains("Committer"));
+
+ assertThat(streamGraph.getStreamNodes().size(), equalTo(3));
+
+ validateTopology(
+ writerNode,
+ SimpleVersionedSerializerTypeSerializerProxy.class,
+ committerNode,
+ CommitterOperatorFactory.class,
+ PARALLELISM,
+ -1);
+ }
+
+ StreamNode findWriter(StreamGraph streamGraph) {
+ return findNodeName(
+ streamGraph, name -> name.contains("Writer") &&
!name.contains("Committer"));
+ }
+
+ StreamNode findCommitter(StreamGraph streamGraph) {
+ return findNodeName(
+ streamGraph,
+ name -> name.contains("Committer") && !name.contains("Global
Committer"));
+ }
+
+ StreamNode findGlobalCommitter(StreamGraph streamGraph) {
+ return findNodeName(streamGraph, name -> name.contains("Global
Committer"));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void throwExceptionWithoutSettingUid() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final Configuration config = new Configuration();
+ config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
+ env.configure(config, getClass().getClassLoader());
+ // disable auto generating uid
+ env.getConfig().disableAutoGeneratedUIDs();
+ sinkTo(env.fromElements(1, 2), simpleSink());
+ env.getStreamGraph();
+ }
+
+ @Test
+ public void disableOperatorChain() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final DataStreamSource<Integer> src = env.fromElements(1, 2);
+ final DataStreamSink<Integer> dataStreamSink = sinkTo(src,
sinkWithCommitter()).name(NAME);
+ dataStreamSink.disableChaining();
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+ final StreamNode writer = findWriter(streamGraph);
+ final StreamNode committer = findCommitter(streamGraph);
+
+ assertThat(writer.getOperatorFactory().getChainingStrategy(),
is(ChainingStrategy.NEVER));
+ assertThat(
+ committer.getOperatorFactory().getChainingStrategy(),
is(ChainingStrategy.NEVER));
+ }
+
+ void validateTopology(
+ StreamNode src,
+ Class<?> srcOutTypeInfo,
+ StreamNode dest,
+ Class<? extends StreamOperatorFactory> operatorFactoryClass,
+ int expectedParallelism,
+ int expectedMaxParallelism) {
+
+ // verify src node
+ final StreamEdge srcOutEdge = src.getOutEdges().get(0);
+ assertThat(srcOutEdge.getTargetId(), equalTo(dest.getId()));
+ assertThat(src.getTypeSerializerOut(), instanceOf(srcOutTypeInfo));
+
+ // verify dest node input
+ final StreamEdge destInputEdge = dest.getInEdges().get(0);
+ assertThat(destInputEdge.getSourceId(), equalTo(src.getId()));
+ assertThat(dest.getTypeSerializersIn()[0], instanceOf(srcOutTypeInfo));
+
+ // make sure 2 sink operators have different names/uid
+ assertThat(dest.getOperatorName(),
not(equalTo(src.getOperatorName())));
+ assertThat(dest.getTransformationUID(),
not(equalTo(src.getTransformationUID())));
+
+ assertThat(dest.getOperatorFactory(),
instanceOf(operatorFactoryClass));
+ assertThat(dest.getParallelism(), equalTo(expectedParallelism));
+ assertThat(dest.getMaxParallelism(), equalTo(expectedMaxParallelism));
+ assertThat(dest.getOperatorFactory().getChainingStrategy(),
is(ChainingStrategy.ALWAYS));
+ assertThat(dest.getSlotSharingGroup(), equalTo(SLOT_SHARE_GROUP));
+ }
+
+ StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode
runtimeExecutionMode) {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final Configuration config = new Configuration();
+ config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
+ env.configure(config, getClass().getClassLoader());
+ final DataStreamSource<Integer> src = env.fromElements(1, 2);
+ final DataStreamSink<Integer> dataStreamSink = sinkTo(src.rebalance(),
sink);
+ setSinkProperty(dataStreamSink);
+ // Trigger the plan generation but do not clear the transformations
+ env.getExecutionPlan();
+ return env.getStreamGraph();
+ }
+
+ private void setSinkProperty(DataStreamSink<Integer> dataStreamSink) {
+ dataStreamSink.name(NAME);
+ dataStreamSink.uid(UID);
+
dataStreamSink.setParallelism(SinkTransformationTranslatorITCaseBase.PARALLELISM);
+ dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP);
+ }
+
+ StreamNode findNodeName(StreamGraph streamGraph, Predicate<String>
predicate) {
+ return streamGraph.getStreamNodes().stream()
+ .filter(node -> predicate.test(node.getOperatorName()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("Can not find the
node"));
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java
similarity index 52%
rename from
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java
rename to
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java
index 4d0b2323c78..1b5bd22142f 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java
@@ -20,33 +20,23 @@ package org.apache.flink.streaming.api.graph;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
-import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.function.Predicate;
-
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@@ -56,69 +46,22 @@ import static org.junit.Assert.assertEquals;
* <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or
re-order test cases.
*/
@RunWith(Parameterized.class)
-public class SinkTransformationTranslatorITCase extends TestLogger {
+public class SinkV1TransformationTranslatorITCase
+ extends SinkTransformationTranslatorITCaseBase<Sink<Integer, ?, ?, ?>>
{
- @Parameterized.Parameters(name = "Execution Mode: {0}")
- public static Collection<Object> data() {
- return Arrays.asList(RuntimeExecutionMode.STREAMING,
RuntimeExecutionMode.BATCH);
+ @Override
+ Sink<Integer, ?, ?, ?> simpleSink() {
+ return TestSink.newBuilder().build();
}
- @Parameterized.Parameter() public RuntimeExecutionMode
runtimeExecutionMode;
-
- static final String NAME = "FileSink";
- static final String SLOT_SHARE_GROUP = "FileGroup";
- static final String UID = "FileUid";
- static final int PARALLELISM = 2;
-
- @Test
- public void generateWriterTopology() {
- final StreamGraph streamGraph =
- buildGraph(TestSink.newBuilder().build(),
runtimeExecutionMode);
-
- final StreamNode sourceNode = findNodeName(streamGraph, node ->
node.contains("Source"));
- final StreamNode writerNode = findWriter(streamGraph);
-
- assertThat(streamGraph.getStreamNodes().size(), equalTo(2));
-
- validateTopology(
- sourceNode,
- IntSerializer.class,
- writerNode,
- SinkWriterOperatorFactory.class,
- PARALLELISM,
- -1);
+ @Override
+ Sink<Integer, ?, ?, ?> sinkWithCommitter() {
+ return TestSink.newBuilder().setDefaultCommitter().build();
}
- @Test
- public void generateWriterCommitterTopology() {
-
- final StreamGraph streamGraph =
- buildGraph(
- TestSink.newBuilder().setDefaultCommitter().build(),
runtimeExecutionMode);
-
- final StreamNode sourceNode = findNodeName(streamGraph, node ->
node.contains("Source"));
- final StreamNode writerNode = findWriter(streamGraph);
-
- validateTopology(
- sourceNode,
- IntSerializer.class,
- writerNode,
- SinkWriterOperatorFactory.class,
- PARALLELISM,
- -1);
-
- final StreamNode committerNode =
- findNodeName(streamGraph, name -> name.contains("Committer"));
-
- assertThat(streamGraph.getStreamNodes().size(), equalTo(3));
-
- validateTopology(
- writerNode,
- SimpleVersionedSerializerTypeSerializerProxy.class,
- committerNode,
- CommitterOperatorFactory.class,
- PARALLELISM,
- -1);
+ @Override
+ DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer,
?, ?, ?> sink) {
+ return stream.sinkTo(sink);
}
@Test
@@ -219,56 +162,6 @@ public class SinkTransformationTranslatorITCase extends
TestLogger {
1);
}
- private StreamNode findWriter(StreamGraph streamGraph) {
- return findNodeName(
- streamGraph, name -> name.contains("Writer") &&
!name.contains("Committer"));
- }
-
- private StreamNode findCommitter(StreamGraph streamGraph) {
- return findNodeName(streamGraph, name -> name.contains("Committer"));
- }
-
- private StreamNode findGlobalCommitter(StreamGraph streamGraph) {
- return findNodeName(streamGraph, name -> name.contains("Global
Committer"));
- }
-
- @Test(expected = IllegalStateException.class)
- public void throwExceptionWithoutSettingUid() {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- final Configuration config = new Configuration();
- config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
- env.configure(config, getClass().getClassLoader());
- // disable auto generating uid
- env.getConfig().disableAutoGeneratedUIDs();
- env.fromElements(1, 2).sinkTo(TestSink.newBuilder().build());
- env.getStreamGraph();
- }
-
- @Test
- public void disableOperatorChain() {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- final DataStreamSource<Integer> src = env.fromElements(1, 2);
- final DataStreamSink<Integer> dataStreamSink =
- src.sinkTo(
- TestSink.newBuilder()
- .setDefaultCommitter()
- .setDefaultGlobalCommitter()
- .build())
- .name(NAME);
- dataStreamSink.disableChaining();
-
- final StreamGraph streamGraph = env.getStreamGraph();
- final StreamNode writer = findWriter(streamGraph);
- final StreamNode globalCommitter = findCommitter(streamGraph);
-
- assertThat(writer.getOperatorFactory().getChainingStrategy(),
is(ChainingStrategy.NEVER));
- assertThat(
- globalCommitter.getOperatorFactory().getChainingStrategy(),
- is(ChainingStrategy.NEVER));
- }
-
@Test
public void testSettingOperatorUidHash() {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -319,61 +212,4 @@ public class SinkTransformationTranslatorITCase extends
TestLogger {
findGlobalCommitter(streamGraph).getTransformationUID(),
String.format("Sink %s Global Committer", sinkUid));
}
-
- private void validateTopology(
- StreamNode src,
- Class<?> srcOutTypeInfo,
- StreamNode dest,
- Class<? extends StreamOperatorFactory> operatorFactoryClass,
- int expectedParallelism,
- int expectedMaxParallelism) {
-
- // verify src node
- final StreamEdge srcOutEdge = src.getOutEdges().get(0);
- assertThat(srcOutEdge.getTargetId(), equalTo(dest.getId()));
- assertThat(src.getTypeSerializerOut(), instanceOf(srcOutTypeInfo));
-
- // verify dest node input
- final StreamEdge destInputEdge = dest.getInEdges().get(0);
- assertThat(destInputEdge.getSourceId(), equalTo(src.getId()));
- assertThat(dest.getTypeSerializersIn()[0], instanceOf(srcOutTypeInfo));
-
- // make sure 2 sink operators have different names/uid
- assertThat(dest.getOperatorName(),
not(equalTo(src.getOperatorName())));
- assertThat(dest.getTransformationUID(),
not(equalTo(src.getTransformationUID())));
-
- assertThat(dest.getOperatorFactory(),
instanceOf(operatorFactoryClass));
- assertThat(dest.getParallelism(), equalTo(expectedParallelism));
- assertThat(dest.getMaxParallelism(), equalTo(expectedMaxParallelism));
- assertThat(dest.getOperatorFactory().getChainingStrategy(),
is(ChainingStrategy.ALWAYS));
- assertThat(dest.getSlotSharingGroup(), equalTo(SLOT_SHARE_GROUP));
- }
-
- private StreamGraph buildGraph(TestSink sink, RuntimeExecutionMode
runtimeExecutionMode) {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- final Configuration config = new Configuration();
- config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
- env.configure(config, getClass().getClassLoader());
- final DataStreamSource<Integer> src = env.fromElements(1, 2);
- final DataStreamSink<Integer> dataStreamSink =
src.rebalance().sinkTo(sink);
- setSinkProperty(dataStreamSink);
- // Trigger the plan generation but do not clear the transformations
- env.getExecutionPlan();
- return env.getStreamGraph();
- }
-
- private void setSinkProperty(DataStreamSink<Integer> dataStreamSink) {
- dataStreamSink.name(NAME);
- dataStreamSink.uid(UID);
-
dataStreamSink.setParallelism(SinkTransformationTranslatorITCase.PARALLELISM);
- dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP);
- }
-
- private StreamNode findNodeName(StreamGraph streamGraph, Predicate<String>
predicate) {
- return streamGraph.getStreamNodes().stream()
- .filter(node -> predicate.test(node.getOperatorName()))
- .findFirst()
- .orElseThrow(() -> new IllegalStateException("Can not find the
node"));
- }
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
new file mode 100644
index 00000000000..612cb9d7808
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link
org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ *
+ * <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or
re-order test cases.
+ */
+@RunWith(Parameterized.class)
+public class SinkV2TransformationTranslatorITCase
+ extends SinkTransformationTranslatorITCaseBase<Sink<Integer>> {
+
+ @Override
+ Sink<Integer> simpleSink() {
+ return TestSinkV2.<Integer>newBuilder().build();
+ }
+
+ @Override
+ Sink<Integer> sinkWithCommitter() {
+ return TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build();
+ }
+
+ @Override
+ DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer>
sink) {
+ return stream.sinkTo(sink);
+ }
+
+ @Test
+ public void testSettingOperatorUidHash() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final DataStreamSource<Integer> src = env.fromElements(1, 2);
+ final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
+ final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
+ final CustomSinkOperatorUidHashes operatorsUidHashes =
+ CustomSinkOperatorUidHashes.builder()
+ .setWriterUidHash(writerHash)
+ .setCommitterUidHash(committerHash)
+ .build();
+ src.sinkTo(
+
TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build(),
+ operatorsUidHashes)
+ .name(NAME);
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+
+ assertEquals(findWriter(streamGraph).getUserHash(), writerHash);
+ assertEquals(findCommitter(streamGraph).getUserHash(), committerHash);
+ }
+
+ /**
+ * When ever you need to change something in this test case please think
about possible state
+ * upgrade problems introduced by your changes.
+ */
+ @Test
+ public void testSettingOperatorUids() {
+ final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final DataStreamSource<Integer> src = env.fromElements(1, 2);
+
src.sinkTo(TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build())
+ .name(NAME)
+ .uid(sinkUid);
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+ assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid);
+ assertEquals(
+ findCommitter(streamGraph).getTransformationUID(),
+ String.format("Sink Committer: %s", sinkUid));
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
similarity index 77%
rename from
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
rename to
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index 7ae860c54e0..fe3625ec8ce 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -18,8 +18,6 @@
package org.apache.flink.streaming.runtime.operators.sink;
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -34,45 +32,31 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import java.util.Collections;
import java.util.List;
+import java.util.function.IntSupplier;
import static
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput;
import static
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableSummary;
import static
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableWithLinage;
import static org.assertj.core.api.Assertions.assertThat;
-class CommitterOperatorTest {
+abstract class CommitterOperatorTestBase {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testEmitCommittables(boolean withPostCommitTopology) throws Exception
{
- final ForwardingCommitter committer = new ForwardingCommitter();
-
- Sink<Integer> sink;
+ SinkAndCounters sinkAndCounters;
if (withPostCommitTopology) {
// Insert global committer to simulate post commit topology
- sink =
- TestSink.newBuilder()
- .setCommitter(committer)
- .setDefaultGlobalCommitter()
-
.setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE)
- .build()
- .asV2();
+ sinkAndCounters = sinkWithPostCommit();
} else {
- sink =
- TestSink.newBuilder()
- .setCommitter(committer)
-
.setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE)
- .build()
- .asV2();
+ sinkAndCounters = sinkWithoutPostCommit();
}
final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
testHarness =
new OneInputStreamOperatorTestHarness<>(
- new CommitterOperatorFactory<>(
- (TwoPhaseCommittingSink<?, String>)
sink, false, true));
+ new
CommitterOperatorFactory<>(sinkAndCounters.sink, false, true));
testHarness.open();
final CommittableSummary<String> committableSummary =
@@ -85,7 +69,7 @@ class CommitterOperatorTest {
// Trigger commit
testHarness.notifyOfCompletedCheckpoint(1);
- assertThat(committer.getSuccessfulCommits()).isEqualTo(1);
+ assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
if (withPostCommitTopology) {
final List<StreamElement> output =
fromOutput(testHarness.getOutput());
SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
@@ -102,10 +86,10 @@ class CommitterOperatorTest {
@Test
void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws
Exception {
- final ForwardingCommitter committer = new ForwardingCommitter();
+ SinkAndCounters sinkAndCounters = sinkWithPostCommit();
final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
- testHarness = createTestHarness(committer, false, true);
+ testHarness = createTestHarness(sinkAndCounters.sink, false,
true);
testHarness.open();
testHarness.setProcessingTime(0);
@@ -119,7 +103,7 @@ class CommitterOperatorTest {
testHarness.notifyOfCompletedCheckpoint(1);
assertThat(testHarness.getOutput()).isEmpty();
- assertThat(committer.getSuccessfulCommits()).isEqualTo(0);
+ assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero();
final CommittableWithLineage<String> second = new
CommittableWithLineage<>("2", 1L, 1);
testHarness.processElement(new StreamRecord<>(second));
@@ -129,7 +113,7 @@ class CommitterOperatorTest {
final List<StreamElement> output = fromOutput(testHarness.getOutput());
assertThat(output).hasSize(3);
- assertThat(committer.getSuccessfulCommits()).isEqualTo(2);
+ assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
.hasOverallCommittables(committableSummary.getNumberOfCommittables())
@@ -143,10 +127,11 @@ class CommitterOperatorTest {
@Test
void testImmediatelyCommitLateCommittables() throws Exception {
- final ForwardingCommitter committer = new ForwardingCommitter();
+ SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+
final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
- testHarness = createTestHarness(committer, false, true);
+ testHarness = createTestHarness(sinkAndCounters.sink, false,
true);
testHarness.open();
final CommittableSummary<String> committableSummary =
@@ -166,7 +151,7 @@ class CommitterOperatorTest {
final List<StreamElement> output = fromOutput(testHarness.getOutput());
assertThat(output).hasSize(2);
- assertThat(committer.getSuccessfulCommits()).isEqualTo(1);
+ assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
.hasOverallCommittables(committableSummary.getNumberOfCommittables())
@@ -179,10 +164,10 @@ class CommitterOperatorTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws
Exception {
- final ForwardingCommitter committer = new ForwardingCommitter();
+ SinkAndCounters sinkAndCounters = sinkWithPostCommit();
final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
- testHarness = createTestHarness(committer, isBatchMode,
!isBatchMode);
+ testHarness = createTestHarness(sinkAndCounters.sink,
isBatchMode, !isBatchMode);
testHarness.open();
final CommittableSummary<String> committableSummary =
@@ -199,7 +184,7 @@ class CommitterOperatorTest {
testHarness.endInput();
if (!isBatchMode) {
- assertThat(testHarness.getOutput()).hasSize(0);
+ assertThat(testHarness.getOutput()).isEmpty();
// notify final checkpoint complete
testHarness.notifyOfCompletedCheckpoint(1);
}
@@ -227,7 +212,7 @@ class CommitterOperatorTest {
CommittableMessage<String>, CommittableMessage<String>>
testHarness =
createTestHarness(
- new TestSink.RetryOnceCommitter(),
+ sinkWithPostCommitWithRetry().sink,
false,
true,
1,
@@ -262,15 +247,15 @@ class CommitterOperatorTest {
assertThat(testHarness.getOutput()).isEmpty();
testHarness.close();
- final ForwardingCommitter committer = new ForwardingCommitter();
-
// create new testHarness but with different parallelism level and
subtaskId that original
// one.
// we will make sure that new subtaskId was used during committable
recovery.
+ SinkAndCounters sinkAndCounters = sinkWithPostCommit();
final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
restored =
- createTestHarness(committer, false, true, 10, 10,
subtaskIdAfterRecovery);
+ createTestHarness(
+ sinkAndCounters.sink, false, true, 10, 10,
subtaskIdAfterRecovery);
restored.initializeState(snapshot);
restored.open();
@@ -278,7 +263,7 @@ class CommitterOperatorTest {
// Previous committables are immediately committed if possible
final List<StreamElement> output = fromOutput(restored.getOutput());
assertThat(output).hasSize(3);
- assertThat(committer.getSuccessfulCommits()).isEqualTo(2);
+ assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
.hasCheckpointId(checkpointId)
.hasFailedCommittables(0)
@@ -300,22 +285,14 @@ class CommitterOperatorTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled)
throws Exception {
- final Sink<Integer> sink =
- TestSink.newBuilder()
- .setDefaultCommitter()
- .setDefaultGlobalCommitter()
-
.setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE)
- .build()
- .asV2();
+ final SinkAndCounters sinkAndCounters = sinkWithPostCommit();
final OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
testHarness =
new OneInputStreamOperatorTestHarness<>(
new CommitterOperatorFactory<>(
- (TwoPhaseCommittingSink<?, String>)
sink,
- false,
- isCheckpointingEnabled));
+ sinkAndCounters.sink, false,
isCheckpointingEnabled));
testHarness.open();
final CommittableSummary<String> committableSummary =
@@ -363,28 +340,18 @@ class CommitterOperatorTest {
private OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
createTestHarness(
- Committer<String> committer,
+ TwoPhaseCommittingSink<?, String> sink,
boolean isBatchMode,
boolean isCheckpointingEnabled)
throws Exception {
return new OneInputStreamOperatorTestHarness<>(
- new CommitterOperatorFactory<>(
- (TwoPhaseCommittingSink<?, String>)
- TestSink.newBuilder()
- .setCommitter(committer)
- .setDefaultGlobalCommitter()
- .setCommittableSerializer(
-
TestSink.StringCommittableSerializer.INSTANCE)
- .build()
- .asV2(),
- isBatchMode,
- isCheckpointingEnabled));
+ new CommitterOperatorFactory<>(sink, isBatchMode,
isCheckpointingEnabled));
}
private OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
createTestHarness(
- Committer<String> committer,
+ TwoPhaseCommittingSink<?, String> sink,
boolean isBatchMode,
boolean isCheckpointingEnabled,
int maxParallelism,
@@ -392,36 +359,25 @@ class CommitterOperatorTest {
int subtaskId)
throws Exception {
return new OneInputStreamOperatorTestHarness<>(
- new CommitterOperatorFactory<>(
- (TwoPhaseCommittingSink<?, String>)
- TestSink.newBuilder()
- .setCommitter(committer)
- .setDefaultGlobalCommitter()
- .setCommittableSerializer(
-
TestSink.StringCommittableSerializer.INSTANCE)
- .build()
- .asV2(),
- isBatchMode,
- isCheckpointingEnabled),
+ new CommitterOperatorFactory<>(sink, isBatchMode,
isCheckpointingEnabled),
maxParallelism,
parallelism,
subtaskId);
}
- private static class ForwardingCommitter extends TestSink.DefaultCommitter
{
- private int successfulCommits = 0;
+ abstract SinkAndCounters sinkWithPostCommit();
- @Override
- public List<String> commit(List<String> committables) {
- successfulCommits += committables.size();
- return Collections.emptyList();
- }
+ abstract SinkAndCounters sinkWithPostCommitWithRetry();
+
+ abstract SinkAndCounters sinkWithoutPostCommit();
- @Override
- public void close() throws Exception {}
+ static class SinkAndCounters {
+ TwoPhaseCommittingSink<?, String> sink;
+ IntSupplier commitCounter;
- public int getSuccessfulCommits() {
- return successfulCommits;
+ public SinkAndCounters(TwoPhaseCommittingSink<?, String> sink,
IntSupplier commitCounter) {
+ this.sink = sink;
+ this.commitCounter = commitCounter;
}
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
index c309835cc75..b457b5a0b78 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
@@ -52,7 +52,7 @@ class SinkTestUtil {
static byte[] toBytes(String obj) {
try {
return SimpleVersionedSerialization.writeVersionAndSerialize(
- TestSink.StringCommittableSerializer.INSTANCE, obj);
+ TestSinkV2.StringSerializer.INSTANCE, obj);
} catch (IOException e) {
throw new IllegalStateException(e);
}
@@ -83,7 +83,7 @@ class SinkTestUtil {
static String fromBytes(byte[] obj) {
try {
return SimpleVersionedSerialization.readVersionAndDeSerialize(
- TestSink.StringCommittableSerializer.INSTANCE, obj);
+ TestSinkV2.StringSerializer.INSTANCE, obj);
} catch (IOException e) {
throw new IllegalStateException(e);
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
new file mode 100644
index 00000000000..ed8e53ff342
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+import java.util.Collection;
+
+class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase {
+ @Override
+ SinkAndCounters sinkWithPostCommit() {
+ ForwardingCommitter committer = new ForwardingCommitter();
+ return new SinkAndCounters(
+ (TwoPhaseCommittingSink<?, String>)
+ TestSinkV2.newBuilder()
+ .setCommitter(committer)
+
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
+ .setWithPostCommitTopology(true)
+ .build(),
+ () -> committer.successfulCommits);
+ }
+
+ @Override
+ SinkAndCounters sinkWithPostCommitWithRetry() {
+ return new SinkAndCounters(
+ (TwoPhaseCommittingSink<?, String>)
+ TestSinkV2.newBuilder()
+ .setCommitter(new
TestSinkV2.RetryOnceCommitter())
+
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
+ .setWithPostCommitTopology(true)
+ .build(),
+ () -> 0);
+ }
+
+ @Override
+ SinkAndCounters sinkWithoutPostCommit() {
+ ForwardingCommitter committer = new ForwardingCommitter();
+ return new SinkAndCounters(
+ (TwoPhaseCommittingSink<?, String>)
+ TestSinkV2.newBuilder()
+ .setCommitter(committer)
+
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
+ .setWithPostCommitTopology(false)
+ .build(),
+ () -> committer.successfulCommits);
+ }
+
+ private static class ForwardingCommitter extends
TestSinkV2.DefaultCommitter {
+ private int successfulCommits = 0;
+
+ @Override
+ public void commit(Collection<CommitRequest<String>> committables) {
+ successfulCommits += committables.size();
+ }
+
+ @Override
+ public void close() throws Exception {}
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
new file mode 100644
index 00000000000..5bb0135d69b
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase {
+
+ @Override
+ SinkAndSuppliers sinkWithoutCommitter() {
+ TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new
TestSinkV2.DefaultSinkWriter<>();
+ return new SinkAndSuppliers(
+ TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> -1,
+ TestSinkV2.StringSerializer::new);
+ }
+
+ @Override
+ SinkAndSuppliers sinkWithCommitter() {
+ TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
+ new TestSinkV2.DefaultCommittingSinkWriter<>();
+ return new SinkAndSuppliers(
+ TestSinkV2.<Integer>newBuilder()
+ .setWriter(sinkWriter)
+ .setDefaultCommitter()
+ .build(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> -1,
+ TestSinkV2.StringSerializer::new);
+ }
+
+ @Override
+ SinkAndSuppliers sinkWithTimeBasedWriter() {
+ TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new
TimeBasedBufferingSinkWriter();
+ return new SinkAndSuppliers(
+ TestSinkV2.<Integer>newBuilder()
+ .setWriter(sinkWriter)
+ .setDefaultCommitter()
+ .build(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> -1,
+ TestSinkV2.StringSerializer::new);
+ }
+
+ @Override
+ SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String
stateName) {
+ SnapshottingBufferingSinkWriter sinkWriter = new
SnapshottingBufferingSinkWriter();
+ TestSinkV2.Builder<Integer> builder =
+ TestSinkV2.newBuilder()
+ .setWriter(sinkWriter)
+ .setDefaultCommitter()
+ .setWithPostCommitTopology(true);
+ if (withState) {
+ builder.setWriterState(true);
+ }
+ if (stateName != null) {
+ builder.setCompatibleStateNames(stateName);
+ }
+ return new SinkAndSuppliers(
+ builder.build(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> sinkWriter.lastCheckpointId,
+ () -> new TestSinkV2.StringSerializer());
+ }
+
+ private static class TimeBasedBufferingSinkWriter
+ extends TestSinkV2.DefaultCommittingSinkWriter<Integer>
+ implements ProcessingTimeService.ProcessingTimeCallback {
+
+ private final List<String> cachedCommittables = new ArrayList<>();
+ private ProcessingTimeService processingTimeService;
+
+ @Override
+ public void write(Integer element, Context context) {
+ cachedCommittables.add(
+ Tuple3.of(element, context.timestamp(),
context.currentWatermark()).toString());
+ }
+
+ @Override
+ public void onProcessingTime(long time) {
+ elements.addAll(cachedCommittables);
+ cachedCommittables.clear();
+ this.processingTimeService.registerTimer(time + 1000, this);
+ }
+
+ @Override
+ public void init(org.apache.flink.api.connector.sink2.Sink.InitContext
context) {
+ this.processingTimeService = context.getProcessingTimeService();
+ this.processingTimeService.registerTimer(1000, this);
+ }
+ }
+
+ private static class SnapshottingBufferingSinkWriter
+ extends TestSinkV2.DefaultStatefulSinkWriter {
+ public static final int NOT_SNAPSHOTTED = -1;
+ long lastCheckpointId = NOT_SNAPSHOTTED;
+ boolean endOfInput = false;
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException,
InterruptedException {
+ this.endOfInput = endOfInput;
+ }
+
+ @Override
+ public List<String> snapshotState(long checkpointId) throws
IOException {
+ lastCheckpointId = checkpointId;
+ return super.snapshotState(checkpointId);
+ }
+
+ @Override
+ public Collection<String> prepareCommit() {
+ if (!endOfInput) {
+ return ImmutableList.of();
+ }
+ List<String> result = elements;
+ elements = new ArrayList<>();
+ return result;
+ }
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
similarity index 77%
rename from
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
rename to
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
index 86cc85c3155..464b08c8f67 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -62,29 +62,31 @@ import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
import static
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput;
import static org.assertj.core.api.Assertions.assertThat;
-class SinkWriterOperatorTest {
+abstract class SinkWriterOperatorTestBase {
@Test
void testNotEmitCommittablesWithoutCommitter() throws Exception {
- final TestSink.DefaultSinkWriter<Integer> sinkWriter = new
TestSink.DefaultSinkWriter<>();
+ SinkAndSuppliers sinkAndSuppliers = sinkWithoutCommitter();
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(
- new SinkWriterOperatorFactory<>(
-
TestSink.newBuilder().setWriter(sinkWriter).build().asV2()));
+ new
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
testHarness.open();
testHarness.processElement(1, 1);
assertThat(testHarness.getOutput()).isEmpty();
- assertThat(sinkWriter.elements).containsOnly("(1,1," + Long.MIN_VALUE
+ ")");
+ assertThat(sinkAndSuppliers.elementSupplier.get())
+ .containsOnly("(1,1," + Long.MIN_VALUE + ")");
testHarness.prepareSnapshotPreBarrier(1);
assertThat(testHarness.getOutput()).isEmpty();
// Elements are flushed
- assertThat(sinkWriter.elements).isEmpty();
+ assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty();
testHarness.close();
}
@@ -92,11 +94,10 @@ class SinkWriterOperatorTest {
void testWatermarkPropagatedToSinkWriter() throws Exception {
final long initialTime = 0;
- final TestSink.DefaultSinkWriter<Integer> writer = new
TestSink.DefaultSinkWriter<>();
+ SinkAndSuppliers sinkAndSuppliers = sinkWithoutCommitter();
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(
- new SinkWriterOperatorFactory<>(
-
TestSink.newBuilder().setWriter(writer).build().asV2()));
+ new
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
testHarness.open();
testHarness.processWatermark(initialTime);
@@ -104,7 +105,7 @@ class SinkWriterOperatorTest {
assertThat(testHarness.getOutput())
.containsExactly(new Watermark(initialTime), new
Watermark(initialTime + 1));
- assertThat(writer.watermarks)
+ assertThat(sinkAndSuppliers.watermarkSupplier.get())
.containsExactly(
new
org.apache.flink.api.common.eventtime.Watermark(initialTime),
new
org.apache.flink.api.common.eventtime.Watermark(initialTime + 1));
@@ -117,12 +118,7 @@ class SinkWriterOperatorTest {
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(
- new SinkWriterOperatorFactory<>(
- TestSink.newBuilder()
- .setDefaultCommitter()
- .setWriter(new
TimeBasedBufferingSinkWriter())
- .build()
- .asV2()));
+ new
SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().sink));
testHarness.open();
@@ -149,8 +145,7 @@ class SinkWriterOperatorTest {
void testEmitOnFlushWithCommitter() throws Exception {
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(
- new SinkWriterOperatorFactory<>(
-
TestSink.newBuilder().setDefaultCommitter().build().asV2()));
+ new
SinkWriterOperatorFactory<>(sinkWithCommitter().sink));
testHarness.open();
assertThat(testHarness.getOutput()).isEmpty();
@@ -168,8 +163,7 @@ class SinkWriterOperatorTest {
@Test
void testEmitOnEndOfInputInBatchMode() throws Exception {
final SinkWriterOperatorFactory<Integer, Integer>
writerOperatorFactory =
- new SinkWriterOperatorFactory<>(
-
TestSink.newBuilder().setDefaultCommitter().build().asV2());
+ new SinkWriterOperatorFactory<>(sinkWithCommitter().sink);
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
@@ -187,10 +181,9 @@ class SinkWriterOperatorTest {
final long initialTime = 0;
- final SnapshottingBufferingSinkWriter snapshottingWriter =
- new SnapshottingBufferingSinkWriter();
+ final SinkAndSuppliers sinkAndSuppliers =
sinkWithSnapshottingWriter(stateful, null);
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>> testHarness =
- createTestHarnessWithBufferingSinkWriter(snapshottingWriter,
stateful);
+
createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink);
testHarness.open();
@@ -204,15 +197,14 @@ class SinkWriterOperatorTest {
// we see the watermark and the committable summary, so the
committables must be stored in
// state
assertThat(testHarness.getOutput()).hasSize(2).contains(new
Watermark(initialTime));
- assertThat(snapshottingWriter.lastCheckpointId)
- .isEqualTo(stateful ? 1L :
SnapshottingBufferingSinkWriter.NOT_SNAPSHOTTED);
+ assertThat(sinkAndSuppliers.lastCheckpointSupplier.getAsLong())
+ .isEqualTo(stateful ? 1L : -1L);
testHarness.close();
+ final SinkAndSuppliers restoredSink =
sinkWithSnapshottingWriter(stateful, null);
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>>
- restoredTestHarness =
- createTestHarnessWithBufferingSinkWriter(
- new SnapshottingBufferingSinkWriter(),
stateful);
+ restoredTestHarness =
createTestHarnessWithBufferingSinkWriter(restoredSink.sink);
restoredTestHarness.initializeState(snapshot);
restoredTestHarness.open();
@@ -221,6 +213,7 @@ class SinkWriterOperatorTest {
restoredTestHarness.endInput();
final long checkpointId = 2;
restoredTestHarness.prepareSnapshotPreBarrier(checkpointId);
+ restoredTestHarness.notifyOfCompletedCheckpoint(checkpointId);
if (stateful) {
assertBasicOutput(restoredTestHarness.getOutput(), 2,
Long.MAX_VALUE);
@@ -246,16 +239,20 @@ class SinkWriterOperatorTest {
"bit", "mention", "thick", "stick", "stir", "easy",
"sleep", "forth",
"cost", "prompt");
+ SinkAndSuppliers sinkAndSuppliers =
+ sinkWithSnapshottingWriter(stateful,
DummySinkOperator.DUMMY_SINK_STATE_NAME);
final OneInputStreamOperatorTestHarness<String, String> previousSink =
new OneInputStreamOperatorTestHarness<>(
- new DummySinkOperator(), StringSerializer.INSTANCE);
+ new
DummySinkOperator(sinkAndSuppliers.serializerSupplier.get()),
+ StringSerializer.INSTANCE);
OperatorSubtaskState previousSinkState =
TestHarnessUtil.buildSubtaskState(previousSink,
previousSinkInputs);
// 2. Load previous sink state and verify the output
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>>
- compatibleWriterOperator =
createCompatibleStateTestHarness(stateful);
+ compatibleWriterOperator =
+
createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink);
final List<String> expectedOutput1 =
stateful ? new ArrayList<>(previousSinkInputs) : new
ArrayList<>();
@@ -280,8 +277,11 @@ class SinkWriterOperatorTest {
assertEmitted(expectedOutput1, compatibleWriterOperator.getOutput());
// 3. Restore the sink without previous sink's state
+ SinkAndSuppliers sinkAndSuppliers2 =
+ sinkWithSnapshottingWriter(stateful,
DummySinkOperator.DUMMY_SINK_STATE_NAME);
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>>
- restoredSinkOperator =
createCompatibleStateTestHarness(stateful);
+ restoredSinkOperator =
+
createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers2.sink);
final List<String> expectedOutput2 =
Arrays.asList(
Tuple3.of(2, 2, Long.MIN_VALUE).toString(),
@@ -306,22 +306,18 @@ class SinkWriterOperatorTest {
void testRestoreCommitterState() throws Exception {
final List<String> committables = Arrays.asList("state1", "state2");
+ SinkAndSuppliers sinkAndSuppliers = sinkWithCommitter();
final OneInputStreamOperatorTestHarness<String, String> committer =
new OneInputStreamOperatorTestHarness<>(
- new TestCommitterOperator(),
StringSerializer.INSTANCE);
+ new
TestCommitterOperator(sinkAndSuppliers.serializerSupplier.get()),
+ StringSerializer.INSTANCE);
final OperatorSubtaskState committerState =
TestHarnessUtil.buildSubtaskState(committer, committables);
- final TestSink.DefaultSinkWriter<Integer> sinkWriter = new
TestSink.DefaultSinkWriter<>();
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(
- new SinkWriterOperatorFactory<>(
- TestSink.newBuilder()
- .setDefaultCommitter()
- .setWriter(sinkWriter)
- .build()
- .asV2()));
+ new
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
testHarness.initializeState(committerState);
@@ -361,21 +357,16 @@ class SinkWriterOperatorTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled)
throws Exception {
- final TestSink.DefaultSinkWriter<Integer> sinkWriter = new
TestSink.DefaultSinkWriter<>();
+ SinkAndSuppliers sinkAndSuppliers = sinkWithCommitter();
final OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(
- new SinkWriterOperatorFactory<>(
- TestSink.newBuilder()
- .setWriter(sinkWriter)
- .setDefaultCommitter()
- .build()
- .asV2()));
+ new
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
testHarness.open();
testHarness.processElement(1, 1);
assertThat(testHarness.getOutput()).isEmpty();
final String record = "(1,1," + Long.MIN_VALUE + ")";
- assertThat(sinkWriter.elements).containsOnly(record);
+
assertThat(sinkAndSuppliers.elementSupplier.get()).containsOnly(record);
testHarness.endInput();
@@ -384,7 +375,7 @@ class SinkWriterOperatorTest {
}
assertEmitted(Collections.singletonList(record),
testHarness.getOutput());
- assertThat(sinkWriter.elements).isEmpty();
+ assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty();
testHarness.close();
}
@@ -472,33 +463,9 @@ class SinkWriterOperatorTest {
}
private static OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>>
- createTestHarnessWithBufferingSinkWriter(
- SnapshottingBufferingSinkWriter snapshottingWriter,
boolean stateful)
- throws Exception {
- final TestSink.Builder<Integer> builder =
-
TestSink.newBuilder().setDefaultCommitter().setWriter(snapshottingWriter);
- if (stateful) {
- builder.withWriterState();
- }
+ createTestHarnessWithBufferingSinkWriter(Sink sink) throws
Exception {
final SinkWriterOperatorFactory<Integer, Integer>
writerOperatorFactory =
- new SinkWriterOperatorFactory<>(builder.build().asV2());
- return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
- }
-
- private static OneInputStreamOperatorTestHarness<Integer,
CommittableMessage<Integer>>
- createCompatibleStateTestHarness(boolean stateful) throws
Exception {
- final SnapshottingBufferingSinkWriter snapshottingWriter =
- new SnapshottingBufferingSinkWriter();
- final TestSink.Builder<Integer> builder =
- TestSink.newBuilder()
- .setDefaultCommitter()
-
.setCompatibleStateNames(DummySinkOperator.DUMMY_SINK_STATE_NAME)
- .setWriter(snapshottingWriter);
- if (stateful) {
- builder.withWriterState();
- }
- final SinkWriterOperatorFactory<Integer, Integer>
writerOperatorFactory =
- new SinkWriterOperatorFactory<>(builder.build().asV2());
+ new SinkWriterOperatorFactory<>(sink);
return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
}
@@ -527,30 +494,6 @@ class SinkWriterOperatorTest {
}
}
- private static class TimeBasedBufferingSinkWriter extends
TestSink.DefaultSinkWriter<Integer>
- implements Sink.ProcessingTimeService.ProcessingTimeCallback {
-
- private final List<String> cachedCommittables = new ArrayList<>();
-
- @Override
- public void write(Integer element, Context context) {
- cachedCommittables.add(
- Tuple3.of(element, context.timestamp(),
context.currentWatermark()).toString());
- }
-
- void setProcessingTimerService(Sink.ProcessingTimeService
processingTimerService) {
- super.setProcessingTimerService(processingTimerService);
- this.processingTimerService.registerProcessingTimer(1000, this);
- }
-
- @Override
- public void onProcessingTime(long time) {
- elements.addAll(cachedCommittables);
- cachedCommittables.clear();
- this.processingTimerService.registerProcessingTimer(time + 1000,
this);
- }
- }
-
private static class TestCommitterOperator extends
AbstractStreamOperator<String>
implements OneInputStreamOperator<String, String> {
@@ -559,6 +502,11 @@ class SinkWriterOperatorTest {
"streaming_committer_raw_states",
BytePrimitiveArraySerializer.INSTANCE);
private ListState<List<String>> committerState;
private final List<String> buffer = new ArrayList<>();
+ private final SimpleVersionedSerializer<String> serializer;
+
+ public TestCommitterOperator(SimpleVersionedSerializer<String>
serializer) {
+ this.serializer = serializer;
+ }
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
@@ -567,8 +515,7 @@ class SinkWriterOperatorTest {
new SimpleVersionedListState<>(
context.getOperatorStateStore()
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
- new TestingCommittableSerializer(
-
TestSink.StringCommittableSerializer.INSTANCE));
+ new TestingCommittableSerializer(serializer));
}
@Override
@@ -592,13 +539,18 @@ class SinkWriterOperatorTest {
new ListStateDescriptor<>(
DUMMY_SINK_STATE_NAME,
BytePrimitiveArraySerializer.INSTANCE);
ListState<String> sinkState;
+ private final SimpleVersionedSerializer<String> serializer;
+
+ public DummySinkOperator(SimpleVersionedSerializer<String> serializer)
{
+ this.serializer = serializer;
+ }
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
sinkState =
new SimpleVersionedListState<>(
context.getOperatorStateStore().getListState(SINK_STATE_DESC),
- TestSink.StringCommittableSerializer.INSTANCE);
+ serializer);
}
@Override
@@ -607,33 +559,6 @@ class SinkWriterOperatorTest {
}
}
- private static class SnapshottingBufferingSinkWriter
- extends TestSink.DefaultSinkWriter<Integer> {
- public static final int NOT_SNAPSHOTTED = -1;
- long lastCheckpointId = NOT_SNAPSHOTTED;
-
- @Override
- public List<String> snapshotState(long checkpointId) {
- lastCheckpointId = checkpointId;
- return elements;
- }
-
- @Override
- void restoredFrom(List<String> states) {
- this.elements = new ArrayList<>(states);
- }
-
- @Override
- public List<String> prepareCommit(boolean flush) {
- if (!flush) {
- return Collections.emptyList();
- }
- List<String> result = elements;
- elements = new ArrayList<>();
- return result;
- }
- }
-
private static class TestingCommittableSerializer
extends SinkV1WriterCommittableSerializer<String> {
@@ -654,4 +579,33 @@ class SinkWriterOperatorTest {
return out.getCopyOfBuffer();
}
}
+
+ abstract SinkAndSuppliers sinkWithoutCommitter();
+
+ abstract SinkAndSuppliers sinkWithTimeBasedWriter();
+
+ abstract SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState,
String stateName);
+
+ abstract SinkAndSuppliers sinkWithCommitter();
+
+ static class SinkAndSuppliers {
+ org.apache.flink.api.connector.sink2.Sink<Integer> sink;
+ Supplier<List<String>> elementSupplier;
+ Supplier<List<org.apache.flink.api.common.eventtime.Watermark>>
watermarkSupplier;
+ LongSupplier lastCheckpointSupplier;
+ Supplier<SimpleVersionedSerializer<String>> serializerSupplier;
+
+ public SinkAndSuppliers(
+ org.apache.flink.api.connector.sink2.Sink<Integer> sink,
+ Supplier<List<String>> elementSupplier,
+
Supplier<List<org.apache.flink.api.common.eventtime.Watermark>>
watermarkSupplier,
+ LongSupplier lastCheckpointSupplier,
+ Supplier<SimpleVersionedSerializer<String>>
serializerSupplier) {
+ this.sink = sink;
+ this.elementSupplier = elementSupplier;
+ this.watermarkSupplier = watermarkSupplier;
+ this.lastCheckpointSupplier = lastCheckpointSupplier;
+ this.serializerSupplier = serializerSupplier;
+ }
+ }
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
index ce00362f6a0..742e4438b5d 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
@@ -49,7 +49,13 @@ import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertNotNull;
-/** A {@link Sink TestSink} for all the sink related tests. */
+/**
+ * A {@link Sink TestSink} for all the sink related tests. Use only for tests
where {@link
+ * SinkV1Adapter} should be tested.
+ *
+ * @deprecated Use {@link TestSinkV2} instead.
+ */
+@Deprecated
public class TestSink<T> implements Sink<T, String, String, String> {
public static final String END_OF_INPUT_STR = "end of input";
@@ -181,11 +187,6 @@ public class TestSink<T> implements Sink<T, String,
String, String> {
return this;
}
- public Builder<T> setGlobalCommitter(GlobalCommitter<String, String>
globalCommitter) {
- this.globalCommitter = globalCommitter;
- return this;
- }
-
public Builder<T> setGlobalCommittableSerializer(
SimpleVersionedSerializer<String> globalCommittableSerializer)
{
this.globalCommittableSerializer = globalCommittableSerializer;
@@ -363,10 +364,6 @@ public class TestSink<T> implements Sink<T, String,
String, String> {
private final String committedSuccessData;
- DefaultGlobalCommitter() {
- this("");
- }
-
DefaultGlobalCommitter(String committedSuccessData) {
this.committedSuccessData = committedSuccessData;
}
@@ -397,39 +394,6 @@ public class TestSink<T> implements Sink<T, String,
String, String> {
}
}
- /** A {@link GlobalCommitter} that always re-commits global committables
it received. */
- static class RetryOnceGlobalCommitter extends DefaultGlobalCommitter {
-
- private final Set<String> seen = new LinkedHashSet<>();
-
- @Override
- public List<String> filterRecoveredCommittables(List<String>
globalCommittables) {
- return globalCommittables;
- }
-
- @Override
- public String combine(List<String> committables) {
- return String.join("|", committables);
- }
-
- @Override
- public void endOfInput() {}
-
- @Override
- public List<String> commit(List<String> committables) {
- committables.forEach(
- c -> {
- if (seen.remove(c)) {
- checkNotNull(committedData);
- committedData.add(c);
- } else {
- seen.add(c);
- }
- });
- return new ArrayList<>(seen);
- }
- }
-
/**
* We introduce this {@link StringCommittableSerializer} is because that
all the fields of
* {@link TestSink} should be serializable.
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
new file mode 100644
index 00000000000..bb89bf0fe7c
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertNotNull;
+
+/** A {@link org.apache.flink.api.connector.sink2.Sink} for all the sink
related tests. */
+public class TestSinkV2<InputT> implements Sink<InputT> {
+
+ private final DefaultSinkWriter<InputT> writer;
+
+ private TestSinkV2(DefaultSinkWriter<InputT> writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public SinkWriter<InputT> createWriter(InitContext context) {
+ writer.init(context);
+ return writer;
+ }
+
+ DefaultSinkWriter<InputT> getWriter() {
+ return writer;
+ }
+
+ public static <InputT> Builder<InputT> newBuilder() {
+ return new Builder<>();
+ }
+
+ /** A builder class for {@link TestSinkV2}. */
+ public static class Builder<InputT> {
+ private DefaultSinkWriter<InputT> writer = null;
+ private DefaultCommitter committer;
+ private SimpleVersionedSerializer<String> committableSerializer;
+ private boolean withPostCommitTopology = false;
+ private boolean withWriterState = false;
+ private String compatibleStateNames;
+
+ public Builder<InputT> setWriter(DefaultSinkWriter<InputT> writer) {
+ this.writer = checkNotNull(writer);
+ return this;
+ }
+
+ public Builder<InputT> setCommitter(DefaultCommitter committer) {
+ this.committer = committer;
+ return this;
+ }
+
+ public Builder<InputT> setCommittableSerializer(
+ SimpleVersionedSerializer<String> committableSerializer) {
+ this.committableSerializer = committableSerializer;
+ return this;
+ }
+
+ public Builder<InputT> setDefaultCommitter() {
+ this.committer = new DefaultCommitter();
+ this.committableSerializer = StringSerializer.INSTANCE;
+ return this;
+ }
+
+ public Builder<InputT> setDefaultCommitter(
+ Supplier<Queue<Committer.CommitRequest<String>>>
queueSupplier) {
+ this.committer = new DefaultCommitter(queueSupplier);
+ this.committableSerializer = StringSerializer.INSTANCE;
+ return this;
+ }
+
+ public Builder<InputT> setWithPostCommitTopology(boolean
withPostCommitTopology) {
+ this.withPostCommitTopology = withPostCommitTopology;
+ return this;
+ }
+
+ public Builder<InputT> setWriterState(boolean withWriterState) {
+ this.withWriterState = withWriterState;
+ return this;
+ }
+
+ public Builder<InputT> setCompatibleStateNames(String
compatibleStateNames) {
+ this.compatibleStateNames = compatibleStateNames;
+ return this;
+ }
+
+ public TestSinkV2<InputT> build() {
+ if (committer == null) {
+ if (writer == null) {
+ writer = new DefaultSinkWriter<>();
+ }
+ // SinkV2 with a simple writer
+ return new TestSinkV2<>(writer);
+ } else {
+ if (writer == null) {
+ writer = new DefaultCommittingSinkWriter<>();
+ }
+ if (!withPostCommitTopology) {
+ // TwoPhaseCommittingSink with a stateless writer and a
committer
+ return new TestSinkV2TwoPhaseCommittingSink<>(
+ writer, committableSerializer, committer);
+ } else {
+ if (withWriterState) {
+ // TwoPhaseCommittingSink with a stateful writer and a
committer and post
+ // commit topology
+ Preconditions.checkArgument(
+ writer instanceof DefaultStatefulSinkWriter,
+ "Please provide a DefaultStatefulSinkWriter
instance");
+ return new TestStatefulSinkV2(
+ (DefaultStatefulSinkWriter) writer,
+ committableSerializer,
+ committer,
+ compatibleStateNames);
+ } else {
+ // TwoPhaseCommittingSink with a stateless writer and
a committer and post
+ // commit topology
+ Preconditions.checkArgument(
+ writer instanceof DefaultCommittingSinkWriter,
+ "Please provide a DefaultCommittingSinkWriter
instance");
+ return new TestSinkV2WithPostCommitTopology<>(
+ (DefaultCommittingSinkWriter) writer,
+ committableSerializer,
+ committer);
+ }
+ }
+ }
+ }
+ }
+
+ private static class TestSinkV2TwoPhaseCommittingSink<InputT> extends
TestSinkV2<InputT>
+ implements TwoPhaseCommittingSink<InputT, String> {
+ private final DefaultCommitter committer;
+ private final SimpleVersionedSerializer<String> committableSerializer;
+
+ public TestSinkV2TwoPhaseCommittingSink(
+ DefaultSinkWriter<InputT> writer,
+ SimpleVersionedSerializer<String> committableSerializer,
+ DefaultCommitter committer) {
+ super(writer);
+ this.committer = committer;
+ this.committableSerializer = committableSerializer;
+ }
+
+ @Override
+ public Committer<String> createCommitter() {
+ committer.init();
+ return committer;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<String> getCommittableSerializer() {
+ return committableSerializer;
+ }
+
+ @Override
+ public PrecommittingSinkWriter<InputT, String>
createWriter(InitContext context) {
+ return (PrecommittingSinkWriter<InputT, String>)
super.createWriter(context);
+ }
+ }
+
+ // -------------------------------------- Sink With PostCommitTopology
-------------------------
+
+ private static class TestSinkV2WithPostCommitTopology<InputT>
+ extends TestSinkV2TwoPhaseCommittingSink<InputT>
+ implements WithPostCommitTopology<InputT, String> {
+ public TestSinkV2WithPostCommitTopology(
+ DefaultSinkWriter<InputT> writer,
+ SimpleVersionedSerializer<String> committableSerializer,
+ DefaultCommitter committer) {
+ super(writer, committableSerializer, committer);
+ }
+
+ @Override
+ public void
addPostCommitTopology(DataStream<CommittableMessage<String>> committables) {
+ // We do not need to do anything for tests
+ }
+ }
+
+ private static class TestStatefulSinkV2<InputT> extends
TestSinkV2WithPostCommitTopology<InputT>
+ implements StatefulSink<InputT, String>,
StatefulSink.WithCompatibleState {
+ private String compatibleState;
+
+ public TestStatefulSinkV2(
+ DefaultStatefulSinkWriter<InputT> writer,
+ SimpleVersionedSerializer<String> committableSerializer,
+ DefaultCommitter committer,
+ String compatibleState) {
+ super(writer, committableSerializer, committer);
+ this.compatibleState = compatibleState;
+ }
+
+ @Override
+ public DefaultStatefulSinkWriter<InputT> createWriter(InitContext
context) {
+ return (DefaultStatefulSinkWriter<InputT>)
super.createWriter(context);
+ }
+
+ @Override
+ public StatefulSinkWriter<InputT, String> restoreWriter(
+ InitContext context, Collection<String> recoveredState) {
+ DefaultStatefulSinkWriter<InputT> statefulWriter =
+ (DefaultStatefulSinkWriter) getWriter();
+
+ statefulWriter.restore(recoveredState);
+ return statefulWriter;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<String> getWriterStateSerializer() {
+ return new StringSerializer();
+ }
+
+ @Override
+ public Collection<String> getCompatibleWriterStateNames() {
+ return compatibleState == null ? ImmutableSet.of() :
ImmutableSet.of(compatibleState);
+ }
+ }
+
+ // -------------------------------------- Sink Writer
------------------------------------------
+
+ /** Base class for out testing {@link SinkWriter}. */
+ public static class DefaultSinkWriter<InputT> implements
SinkWriter<InputT>, Serializable {
+
+ protected List<String> elements;
+
+ protected List<Watermark> watermarks;
+
+ protected DefaultSinkWriter() {
+ this.elements = new ArrayList<>();
+ this.watermarks = new ArrayList<>();
+ }
+
+ @Override
+ public void write(InputT element, Context context) {
+ elements.add(
+ Tuple3.of(element, context.timestamp(),
context.currentWatermark()).toString());
+ }
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException,
InterruptedException {
+ elements = new ArrayList<>();
+ }
+
+ @Override
+ public void writeWatermark(Watermark watermark) {
+ watermarks.add(watermark);
+ }
+
+ @Override
+ public void close() throws Exception {
+ // noting to do here
+ }
+
+ public void init(InitContext context) {
+ // context is not used in default case
+ }
+ }
+
+ /** Base class for out testing {@link
TwoPhaseCommittingSink.PrecommittingSinkWriter}. */
+ protected static class DefaultCommittingSinkWriter<InputT> extends
DefaultSinkWriter<InputT>
+ implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT,
String>,
+ Serializable {
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException,
InterruptedException {
+ // We empty the elements on prepareCommit
+ }
+
+ @Override
+ public Collection<String> prepareCommit() {
+ List<String> result = elements;
+ elements = new ArrayList<>();
+ return result;
+ }
+ }
+
+ /**
+ * Base class for out testing {@link StatefulSink.StatefulSinkWriter}.
Extends the {@link
+ * DefaultCommittingSinkWriter} for simplicity.
+ */
+ protected static class DefaultStatefulSinkWriter<InputT>
+ extends DefaultCommittingSinkWriter<InputT>
+ implements StatefulSink.StatefulSinkWriter<InputT, String> {
+
+ @Override
+ public List<String> snapshotState(long checkpointId) throws
IOException {
+ return elements;
+ }
+
+ protected void restore(Collection<String> recoveredState) {
+ this.elements = new ArrayList<>(recoveredState);
+ }
+ }
+
+ // -------------------------------------- Sink Committer
---------------------------------------
+
+ /** Base class for testing {@link Committer}. */
+ static class DefaultCommitter implements Committer<String>, Serializable {
+
+ @Nullable protected Queue<CommitRequest<String>> committedData;
+
+ private boolean isClosed;
+
+ @Nullable private final Supplier<Queue<CommitRequest<String>>>
queueSupplier;
+
+ public DefaultCommitter() {
+ this.committedData = new ConcurrentLinkedQueue<>();
+ this.isClosed = false;
+ this.queueSupplier = null;
+ }
+
+ public DefaultCommitter(@Nullable
Supplier<Queue<CommitRequest<String>>> queueSupplier) {
+ this.queueSupplier = queueSupplier;
+ this.isClosed = false;
+ this.committedData = null;
+ }
+
+ public List<CommitRequest<String>> getCommittedData() {
+ if (committedData != null) {
+ return new ArrayList<>(committedData);
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public void commit(Collection<CommitRequest<String>> committables) {
+ if (committedData == null) {
+ assertNotNull(queueSupplier);
+ committedData = queueSupplier.get();
+ }
+ committedData.addAll(committables);
+ }
+
+ public void close() throws Exception {
+ isClosed = true;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ public void init() {
+ // context is not used for this implementation
+ }
+ }
+
+ /** A {@link Committer} that always re-commits the committables data it
received. */
+ static class RetryOnceCommitter extends DefaultCommitter {
+
+ private final Set<CommitRequest<String>> seen = new LinkedHashSet<>();
+
+ @Override
+ public void commit(Collection<CommitRequest<String>> committables) {
+ committables.forEach(
+ c -> {
+ if (seen.remove(c)) {
+ checkNotNull(committedData);
+ committedData.add(c);
+ } else {
+ seen.add(c);
+ c.retryLater();
+ }
+ });
+ }
+ }
+
+ /**
+ * We introduce this {@link StringSerializer} is because that all the
fields of {@link
+ * TestSinkV2} should be serializable.
+ */
+ public static class StringSerializer
+ implements SimpleVersionedSerializer<String>, Serializable {
+
+ public static final StringSerializer INSTANCE = new StringSerializer();
+
+ @Override
+ public int getVersion() {
+ return SimpleVersionedStringSerializer.INSTANCE.getVersion();
+ }
+
+ @Override
+ public byte[] serialize(String obj) {
+ return SimpleVersionedStringSerializer.INSTANCE.serialize(obj);
+ }
+
+ @Override
+ public String deserialize(int version, byte[] serialized) throws
IOException {
+ return
SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized);
+ }
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
new file mode 100644
index 00000000000..c516db87467
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+import java.util.Collections;
+import java.util.List;
+
+class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase {
+
+ @Override
+ SinkAndCounters sinkWithPostCommit() {
+ ForwardingCommitter committer = new ForwardingCommitter();
+ return new SinkAndCounters(
+ (TwoPhaseCommittingSink<?, String>)
+ TestSink.newBuilder()
+ .setCommitter(committer)
+ .setDefaultGlobalCommitter()
+ .setCommittableSerializer(
+
TestSink.StringCommittableSerializer.INSTANCE)
+ .build()
+ .asV2(),
+ () -> committer.successfulCommits);
+ }
+
+ @Override
+ SinkAndCounters sinkWithPostCommitWithRetry() {
+ return new SinkAndCounters(
+ (TwoPhaseCommittingSink<?, String>)
+ TestSink.newBuilder()
+ .setCommitter(new
TestSink.RetryOnceCommitter())
+ .setDefaultGlobalCommitter()
+ .setCommittableSerializer(
+
TestSink.StringCommittableSerializer.INSTANCE)
+ .build()
+ .asV2(),
+ () -> 0);
+ }
+
+ @Override
+ SinkAndCounters sinkWithoutPostCommit() {
+ ForwardingCommitter committer = new ForwardingCommitter();
+ return new SinkAndCounters(
+ (TwoPhaseCommittingSink<?, String>)
+ TestSink.newBuilder()
+ .setCommitter(committer)
+ .setCommittableSerializer(
+
TestSink.StringCommittableSerializer.INSTANCE)
+ .build()
+ .asV2(),
+ () -> committer.successfulCommits);
+ }
+
+ private static class ForwardingCommitter extends TestSink.DefaultCommitter
{
+ private int successfulCommits = 0;
+
+ @Override
+ public List<String> commit(List<String> committables) {
+ successfulCommits += committables.size();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void close() throws Exception {}
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
new file mode 100644
index 00000000000..5af5ac5a679
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase {
+
+ @Override
+ SinkAndSuppliers sinkWithoutCommitter() {
+ TestSink.DefaultSinkWriter<Integer> sinkWriter = new
TestSink.DefaultSinkWriter<>();
+ return new SinkAndSuppliers(
+ TestSink.newBuilder().setWriter(sinkWriter).build().asV2(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> -1,
+ () -> new TestSink.StringCommittableSerializer());
+ }
+
+ @Override
+ SinkAndSuppliers sinkWithCommitter() {
+ TestSink.DefaultSinkWriter<Integer> sinkWriter = new
TestSink.DefaultSinkWriter<>();
+ return new SinkAndSuppliers(
+
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> -1,
+ () -> new TestSink.StringCommittableSerializer());
+ }
+
+ @Override
+ SinkAndSuppliers sinkWithTimeBasedWriter() {
+ TestSink.DefaultSinkWriter<Integer> sinkWriter = new
TimeBasedBufferingSinkWriter();
+ return new SinkAndSuppliers(
+
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> -1,
+ () -> new TestSink.StringCommittableSerializer());
+ }
+
+ @Override
+ SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String
stateName) {
+ SnapshottingBufferingSinkWriter sinkWriter = new
SnapshottingBufferingSinkWriter();
+ TestSink.Builder<Integer> builder =
+
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter();
+ if (withState) {
+ builder.withWriterState();
+ }
+ if (stateName != null) {
+ builder.setCompatibleStateNames(stateName);
+ }
+ return new SinkAndSuppliers(
+ builder.build().asV2(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> sinkWriter.lastCheckpointId,
+ () -> new TestSink.StringCommittableSerializer());
+ }
+
+ private static class TimeBasedBufferingSinkWriter extends
TestSink.DefaultSinkWriter<Integer>
+ implements Sink.ProcessingTimeService.ProcessingTimeCallback {
+
+ private final List<String> cachedCommittables = new ArrayList<>();
+
+ @Override
+ public void write(Integer element, Context context) {
+ cachedCommittables.add(
+ Tuple3.of(element, context.timestamp(),
context.currentWatermark()).toString());
+ }
+
+ void setProcessingTimerService(Sink.ProcessingTimeService
processingTimerService) {
+ super.setProcessingTimerService(processingTimerService);
+ this.processingTimerService.registerProcessingTimer(1000, this);
+ }
+
+ @Override
+ public void onProcessingTime(long time) {
+ elements.addAll(cachedCommittables);
+ cachedCommittables.clear();
+ this.processingTimerService.registerProcessingTimer(time + 1000,
this);
+ }
+ }
+
+ private static class SnapshottingBufferingSinkWriter
+ extends TestSink.DefaultSinkWriter<Integer> {
+ public static final int NOT_SNAPSHOTTED = -1;
+ long lastCheckpointId = NOT_SNAPSHOTTED;
+
+ @Override
+ public List<String> snapshotState(long checkpointId) {
+ lastCheckpointId = checkpointId;
+ return elements;
+ }
+
+ @Override
+ void restoredFrom(List<String> states) {
+ this.elements = new ArrayList<>(states);
+ }
+
+ @Override
+ public List<String> prepareCommit(boolean flush) {
+ if (!flush) {
+ return Collections.emptyList();
+ }
+ List<String> result = elements;
+ elements = new ArrayList<>();
+ return result;
+ }
+ }
+}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
new file mode 100644
index 00000000000..70d94004748
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Integration test for {@link org.apache.flink.api.connector.sink.Sink} run
time implementation.
+ */
+public class SinkV2ITCase extends AbstractTestBase {
+ static final List<Integer> SOURCE_DATA =
+ Arrays.asList(
+ 895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850,
630, 682, 765, 434, 970,
+ 714, 795, 288, 422);
+
+ // source send data two times
+ static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() *
2;
+
+ static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
+ SOURCE_DATA.stream()
+ // source send data two times
+ .flatMap(
+ x ->
+ Collections.nCopies(
+ 2, Tuple3.of(x, null,
Long.MIN_VALUE).toString())
+ .stream())
+ .collect(Collectors.toList());
+
+ static final List<String> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE =
+ SOURCE_DATA.stream()
+ .map(x -> Tuple3.of(x, null, Long.MIN_VALUE).toString())
+ .collect(Collectors.toList());
+
+ static final Queue<Committer.CommitRequest<String>> COMMIT_QUEUE =
+ new ConcurrentLinkedQueue<>();
+
+ static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA =
+ (BooleanSupplier & Serializable)
+ () -> COMMIT_QUEUE.size() ==
STREAMING_SOURCE_SEND_ELEMENTS_NUM;
+
+ @Before
+ public void init() {
+ COMMIT_QUEUE.clear();
+ }
+
+ @Test
+ public void writerAndCommitterExecuteInStreamingMode() throws Exception {
+ final StreamExecutionEnvironment env = buildStreamEnv();
+ final FiniteTestSource<Integer> source =
+ new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA,
SOURCE_DATA);
+
+ env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO)
+ .sinkTo(
+ TestSinkV2.<Integer>newBuilder()
+ .setDefaultCommitter(
+
(Supplier<Queue<Committer.CommitRequest<String>>>
+ & Serializable)
+ () -> COMMIT_QUEUE)
+ .build());
+ env.execute();
+ assertThat(
+ COMMIT_QUEUE.stream()
+ .map(Committer.CommitRequest::getCommittable)
+ .collect(Collectors.toList()),
+
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
+ }
+
+ @Test
+ public void writerAndCommitterExecuteInBatchMode() throws Exception {
+ final StreamExecutionEnvironment env = buildBatchEnv();
+
+ env.fromCollection(SOURCE_DATA)
+ .sinkTo(
+ TestSinkV2.<Integer>newBuilder()
+ .setDefaultCommitter(
+
(Supplier<Queue<Committer.CommitRequest<String>>>
+ & Serializable)
+ () -> COMMIT_QUEUE)
+ .build());
+ env.execute();
+ assertThat(
+ COMMIT_QUEUE.stream()
+ .map(Committer.CommitRequest::getCommittable)
+ .collect(Collectors.toList()),
+
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
+ }
+
+ private StreamExecutionEnvironment buildStreamEnv() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(100);
+ return env;
+ }
+
+ private StreamExecutionEnvironment buildBatchEnv() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ return env;
+ }
+}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
new file mode 100644
index 00000000000..76fcc7b66cc
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.stream.LongStream;
+
+import static
org.apache.flink.metrics.testutils.MetricAssertions.assertThatCounter;
+import static
org.apache.flink.metrics.testutils.MetricAssertions.assertThatGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+
+/** Tests whether all provided metrics of a {@link Sink} are of the expected
values (FLIP-33). */
+public class SinkV2MetricsITCase extends TestLogger {
+
+ private static final String TEST_SINK_NAME = "MetricTestSink";
+ // please refer to SinkTransformationTranslator#WRITER_NAME
+ private static final String DEFAULT_WRITER_NAME = "Writer";
+ private static final int DEFAULT_PARALLELISM = 4;
+
+ @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .build());
+
+ @Test
+ public void testMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ int numSplits = Math.max(1, env.getParallelism() - 2);
+
+ int numRecordsPerSplit = 10;
+
+ // make sure all parallel instances have processed the same amount of
records before
+ // validating metrics
+ SharedReference<CyclicBarrier> beforeBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ SharedReference<CyclicBarrier> afterBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ int stopAtRecord1 = 4;
+ int stopAtRecord2 = numRecordsPerSplit - 1;
+
+ env.fromSequence(0, numSplits - 1)
+ .<Long>flatMap(
+ (split, collector) ->
+ LongStream.range(0,
numRecordsPerSplit).forEach(collector::collect))
+ .returns(BasicTypeInfo.LONG_TYPE_INFO)
+ .map(
+ i -> {
+ if (i % numRecordsPerSplit == stopAtRecord1
+ || i % numRecordsPerSplit ==
stopAtRecord2) {
+ beforeBarrier.get().await();
+ afterBarrier.get().await();
+ }
+ return i;
+ })
+ .sinkTo(TestSinkV2.<Long>newBuilder().setWriter(new
MetricWriter()).build())
+ .name(TEST_SINK_NAME);
+ JobClient jobClient = env.executeAsync();
+ final JobID jobId = jobClient.getJobID();
+
+ beforeBarrier.get().await();
+ assertSinkMetrics(jobId, stopAtRecord1, env.getParallelism(),
numSplits);
+ afterBarrier.get().await();
+
+ beforeBarrier.get().await();
+ assertSinkMetrics(jobId, stopAtRecord2, env.getParallelism(),
numSplits);
+ afterBarrier.get().await();
+
+ jobClient.getJobExecutionResult().get();
+ }
+
+ @SuppressWarnings("checkstyle:WhitespaceAfter")
+ private void assertSinkMetrics(
+ JobID jobId, long processedRecordsPerSubtask, int parallelism, int
numSplits) {
+ List<OperatorMetricGroup> groups =
+ reporter.findOperatorMetricGroups(
+ jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
+ assertThat(groups, hasSize(parallelism));
+
+ int subtaskWithMetrics = 0;
+ for (OperatorMetricGroup group : groups) {
+ Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
+ // There are only 2 splits assigned; so two groups will not update
metrics.
+ if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount()
== 0) {
+ continue;
+ }
+ subtaskWithMetrics++;
+
+ // SinkWriterMetricGroup metrics
+ assertThatCounter(metrics.get(MetricNames.IO_NUM_RECORDS_OUT))
+ .isEqualTo(processedRecordsPerSubtask);
+ assertThatCounter(metrics.get(MetricNames.IO_NUM_BYTES_OUT))
+ .isEqualTo(processedRecordsPerSubtask *
MetricWriter.RECORD_SIZE_IN_BYTES);
+ // MetricWriter is just incrementing errors every even record
+ assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_OUT_ERRORS))
+ .isEqualTo((processedRecordsPerSubtask + 1) / 2);
+
+ // Test "send" metric series has the same value as "out" metric
series.
+ assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_SEND))
+ .isEqualTo(processedRecordsPerSubtask);
+ assertThatCounter(metrics.get(MetricNames.NUM_BYTES_SEND))
+ .isEqualTo(processedRecordsPerSubtask *
MetricWriter.RECORD_SIZE_IN_BYTES);
+ assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_SEND_ERRORS))
+ .isEqualTo((processedRecordsPerSubtask + 1) / 2);
+
+ // check if the latest send time is fetched
+ assertThatGauge(metrics.get(MetricNames.CURRENT_SEND_TIME))
+ .isEqualTo((processedRecordsPerSubtask - 1) *
MetricWriter.BASE_SEND_TIME);
+ }
+ assertThat(subtaskWithMetrics, equalTo(numSplits));
+ }
+
+ private static class MetricWriter extends
TestSinkV2.DefaultSinkWriter<Long> {
+ static final long BASE_SEND_TIME = 100;
+ static final long RECORD_SIZE_IN_BYTES = 10;
+ private SinkWriterMetricGroup metricGroup;
+ private long sendTime;
+
+ @Override
+ public void init(Sink.InitContext context) {
+ this.metricGroup = context.metricGroup();
+ metricGroup.setCurrentSendTimeGauge(() -> sendTime);
+ }
+
+ @Override
+ public void write(Long element, Context context) {
+ super.write(element, context);
+ sendTime = element * BASE_SEND_TIME;
+ metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
+ if (element % 2 == 0) {
+ metricGroup.getNumRecordsOutErrorsCounter().inc();
+ }
+
metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
+ }
+ }
+}