This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 580c4ae0e4e [FLINK-33975] Tests for the new Sink V2 transformations
580c4ae0e4e is described below
commit 580c4ae0e4e934133e5be415a740d9f7ffbbda2a
Author: pvary <[email protected]>
AuthorDate: Tue Jan 23 09:22:10 2024 +0100
[FLINK-33975] Tests for the new Sink V2 transformations
---
.../flink/api/connector/sink2/Committer.java | 3 +-
.../connector/sink2/CommittableWithLineage.java | 3 +-
.../datastream/DataStreamSinkDeprecatedTest.java | 53 ++++
...V2TransformationTranslatorDeprecatedITCase.java | 99 ++++++
.../api/graph/StreamingJobGraphGeneratorTest.java | 278 +++++++++++++++-
.../operators/sink/CommitterOperatorTestBase.java | 10 +-
... => SinkV2CommitterOperatorDeprecatedTest.java} | 7 +-
.../sink/SinkV2CommitterOperatorTest.java | 10 +-
.../SinkV2SinkWriterOperatorDeprecatedTest.java | 155 +++++++++
.../runtime/operators/sink/TestSinkV2.java | 97 ++++--
.../sink/{ => deprecated}/TestSinkV2.java | 22 +-
...deITCase.java => HashcodeDeprecatedITCase.java} | 8 +-
.../table/planner/functions/HashcodeITCase.java | 4 +-
.../scheduling/SpeculativeSchedulerITCase.java | 129 ++++++++
...nkV2ITCase.java => SinkV2DeprecatedITCase.java} | 8 +-
.../flink/test/streaming/runtime/SinkV2ITCase.java | 50 +++
.../runtime/SinkV2MetricsDeprecatedITCase.java | 348 +++++++++++++++++++++
17 files changed, 1234 insertions(+), 50 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
index c51cce2e3e9..5e4af3eae5c 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
@@ -25,8 +25,7 @@ import java.util.Collection;
/**
* The {@code Committer} is responsible for committing the data staged by the
{@link
- * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a
two-phase commit
- * protocol.
+ * CommittingSinkWriter} in the second step of a two-phase commit protocol.
*
* <p>A commit must be idempotent: If some failure occurs in Flink during
commit phase, Flink will
* restart from previous checkpoint and re-attempt to commit all committables.
Thus, some or all
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
index a792a3ad48c..96683bfd2ca 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import javax.annotation.Nullable;
@@ -31,7 +30,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
/**
* Provides metadata. The exposed exchange type between {@link
- * TwoPhaseCommittingSink.PrecommittingSinkWriter} and {@link Committer}.
+ * org.apache.flink.api.connector.sink2.CommittingSinkWriter} and {@link
Committer}.
*/
@Experimental
public class CommittableWithLineage<CommT> implements
CommittableMessage<CommT> {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkDeprecatedTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkDeprecatedTest.java
new file mode 100644
index 00000000000..45140ff567c
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkDeprecatedTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for {@link DataStreamSink}.
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}.
+ */
+@Deprecated
+public class DataStreamSinkDeprecatedTest {
+
+ @Test
+ public void testGettingTransformationWithNewSinkAPI() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final Transformation<?> transformation =
+ env.fromData(1, 2)
+ .sinkTo(TestSinkV2.<Integer>newBuilder().build())
+ .getTransformation();
+ assertTrue(transformation instanceof SinkTransformation);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void throwExceptionWhenSetUidWithNewSinkAPI() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.fromData(1,
2).sinkTo(TestSinkV2.<Integer>newBuilder().build()).setUidHash("Test");
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorDeprecatedITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorDeprecatedITCase.java
new file mode 100644
index 00000000000..9456b4f45a0
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorDeprecatedITCase.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link
org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ *
+ * <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or
re-order test cases.
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}.
+ */
+@Deprecated
+@RunWith(Parameterized.class)
+public class SinkV2TransformationTranslatorDeprecatedITCase
+ extends SinkTransformationTranslatorITCaseBase<Sink<Integer>> {
+
+ @Override
+ Sink<Integer> simpleSink() {
+ return TestSinkV2.<Integer>newBuilder().build();
+ }
+
+ @Override
+ Sink<Integer> sinkWithCommitter() {
+ return TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build();
+ }
+
+ @Override
+ DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer>
sink) {
+ return stream.sinkTo(sink);
+ }
+
+ @Test
+ public void testSettingOperatorUidHash() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final DataStreamSource<Integer> src = env.fromElements(1, 2);
+ final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
+ final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
+ final CustomSinkOperatorUidHashes operatorsUidHashes =
+ CustomSinkOperatorUidHashes.builder()
+ .setWriterUidHash(writerHash)
+ .setCommitterUidHash(committerHash)
+ .build();
+ src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+
+ assertEquals(findWriter(streamGraph).getUserHash(), writerHash);
+ assertEquals(findCommitter(streamGraph).getUserHash(), committerHash);
+ }
+
+ /**
+ * When ever you need to change something in this test case please think
about possible state
+ * upgrade problems introduced by your changes.
+ */
+ @Test
+ public void testSettingOperatorUids() {
+ final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final DataStreamSource<Integer> src = env.fromElements(1, 2);
+ src.sinkTo(sinkWithCommitter()).name(NAME).uid(sinkUid);
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+ assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid);
+ assertEquals(
+ findCommitter(streamGraph).getTransformationUID(),
+ String.format("Sink Committer: %s", sinkUid));
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index d726c57df4e..1c716d4720a 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -36,7 +36,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.mocks.MockSource;
@@ -70,6 +75,11 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
@@ -105,6 +115,7 @@ import
org.apache.flink.streaming.api.transformations.MultipleInputTransformatio
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
@@ -128,6 +139,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.io.ObjectOutputStream;
+import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
@@ -2124,6 +2136,30 @@ class StreamingJobGraphGeneratorTest {
.hasRootCauseMessage("This provider is not serializable.");
}
+ @Test
+ void testSinkWithAllInterfaces() {
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(new
Configuration());
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ final DataStream<Integer> source = env.fromData(1, 2,
3).name("source");
+ source.rebalance().sinkTo(new
TestSinkWithAllInterfaces()).name("sink");
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+ final JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+ assertThat(jobGraph.getNumberOfVertices()).isEqualTo(6);
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ assertThat(jobVertex.getName())
+ .containsAnyOf(
+ "source",
+ "pre-writer",
+ "Writer",
+ "pre-committer",
+ "post-committer",
+ "Committer");
+ }
+ }
+
@Test
void testSupportConcurrentExecutionAttempts() {
final StreamExecutionEnvironment env =
@@ -2208,6 +2244,41 @@ class StreamingJobGraphGeneratorTest {
}
}
+ /** Should be removed along {@link TwoPhaseCommittingSink}. */
+ @Deprecated
+ @Test
+ void testSinkSupportConcurrentExecutionAttemptsWithDeprecatedSink() {
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(new
Configuration());
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ final DataStream<Integer> source = env.fromData(1, 2,
3).name("source");
+ source.rebalance()
+ .sinkTo(new
TestSinkWithSupportsConcurrentExecutionAttemptsDeprecated())
+ .name("sink");
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+ final JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+ assertThat(jobGraph.getNumberOfVertices()).isEqualTo(6);
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ if (jobVertex.getName().contains("source")) {
+
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
+ } else if (jobVertex.getName().contains("pre-writer")) {
+
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
+ } else if (jobVertex.getName().contains("Writer")) {
+
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
+ } else if (jobVertex.getName().contains("pre-committer")) {
+
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+ } else if (jobVertex.getName().contains("post-committer")) {
+
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+ } else if (jobVertex.getName().contains("Committer")) {
+
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+ } else {
+ Assertions.fail("Unexpected job vertex " +
jobVertex.getName());
+ }
+ }
+ }
+
@Test
void testSinkFunctionNotSupportConcurrentExecutionAttempts() {
testWhetherSinkFunctionSupportsConcurrentExecutionAttempts(
@@ -2324,7 +2395,9 @@ class StreamingJobGraphGeneratorTest {
public void invoke(T value, Context context) throws Exception {}
}
- private static class TestSinkWithSupportsConcurrentExecutionAttempts
+ /** Should be removed along {@link TwoPhaseCommittingSink}. */
+ @Deprecated
+ private static class
TestSinkWithSupportsConcurrentExecutionAttemptsDeprecated
implements SupportsConcurrentExecutionAttempts,
TwoPhaseCommittingSink<Integer, Void>,
WithPreWriteTopology<Integer>,
@@ -2409,6 +2482,209 @@ class StreamingJobGraphGeneratorTest {
}
}
+ private static class TestSinkWithSupportsConcurrentExecutionAttempts
+ implements SupportsConcurrentExecutionAttempts,
+ Sink<Integer>,
+ SupportsCommitter<Void>,
+ SupportsPreWriteTopology<Integer>,
+ SupportsPreCommitTopology<Void, Void>,
+ SupportsPostCommitTopology<Void> {
+
+ @Override
+ public SinkWriter<Integer> createWriter(InitContext context) throws
IOException {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public SinkWriter<Integer> createWriter(WriterInitContext context)
throws IOException {
+ return new CommittingSinkWriter<Integer, Void>() {
+ @Override
+ public Collection<Void> prepareCommit() throws IOException,
InterruptedException {
+ return null;
+ }
+
+ @Override
+ public void write(Integer element, Context context)
+ throws IOException, InterruptedException {}
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException,
InterruptedException {}
+
+ @Override
+ public void close() throws Exception {}
+ };
+ }
+
+ @Override
+ public Committer<Void> createCommitter(CommitterInitContext context)
throws IOException {
+ return new Committer<Void>() {
+ @Override
+ public void commit(Collection<CommitRequest<Void>>
committables)
+ throws IOException, InterruptedException {}
+
+ @Override
+ public void close() throws Exception {}
+ };
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Void> getCommittableSerializer() {
+ return new SimpleVersionedSerializer<Void>() {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(Void obj) throws IOException {
+ return new byte[0];
+ }
+
+ @Override
+ public Void deserialize(int version, byte[] serialized) throws
IOException {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void addPostCommitTopology(DataStream<CommittableMessage<Void>>
committables) {
+ committables
+ .map(v -> v)
+ .name("post-committer")
+ .returns(CommittableMessageTypeInfo.noOutput())
+ .rebalance();
+ }
+
+ @Override
+ public DataStream<CommittableMessage<Void>> addPreCommitTopology(
+ DataStream<CommittableMessage<Void>> committables) {
+ return committables
+ .map(v -> v)
+ .name("pre-committer")
+ .returns(CommittableMessageTypeInfo.noOutput())
+ .rebalance();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Void> getWriteResultSerializer() {
+ return null;
+ }
+
+ @Override
+ public DataStream<Integer> addPreWriteTopology(DataStream<Integer>
inputDataStream) {
+ return inputDataStream.map(v -> v).name("pre-writer").rebalance();
+ }
+ }
+
+ private static class TestSinkWithAllInterfaces
+ implements Sink<Integer>,
+ SupportsPreWriteTopology<Integer>,
+ SupportsCommitter<Long>,
+ SupportsPreCommitTopology<String, Long>,
+ SupportsPostCommitTopology<Long> {
+
+ @Override
+ @Deprecated
+ public SinkWriter<Integer> createWriter(InitContext context) throws
IOException {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public CommittingSinkWriter<Integer, String>
createWriter(WriterInitContext context)
+ throws IOException {
+ return new CommittingSinkWriter<Integer, String>() {
+ @Override
+ public Collection<String> prepareCommit() throws IOException,
InterruptedException {
+ return null;
+ }
+
+ @Override
+ public void write(Integer element, Context context)
+ throws IOException, InterruptedException {}
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException,
InterruptedException {}
+
+ @Override
+ public void close() throws Exception {}
+ };
+ }
+
+ @Override
+ public Committer<Long> createCommitter(CommitterInitContext context)
throws IOException {
+ return new Committer<Long>() {
+ @Override
+ public void commit(Collection<CommitRequest<Long>>
committables)
+ throws IOException, InterruptedException {}
+
+ @Override
+ public void close() throws Exception {}
+ };
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Long> getCommittableSerializer() {
+ return new LongSerializer();
+ }
+
+ @Override
+ public void addPostCommitTopology(DataStream<CommittableMessage<Long>>
committables) {
+ committables
+ .map(v -> (CommittableMessage<Void>) null)
+ .name("post-committer")
+ .returns(CommittableMessageTypeInfo.noOutput())
+ .rebalance();
+ }
+
+ @Override
+ public DataStream<CommittableMessage<Long>> addPreCommitTopology(
+ DataStream<CommittableMessage<String>> committables) {
+ return committables
+ .map(
+ v -> {
+ if (v instanceof CommittableSummary) {
+ return (CommittableSummary<Long>)
+ ((CommittableSummary) v).map();
+ } else {
+ CommittableWithLineage withLineage =
(CommittableWithLineage) v;
+ return (CommittableWithLineage<Long>)
+ withLineage.map(old ->
Long.valueOf(old.toString()));
+ }
+ })
+ .name("pre-committer")
+
.returns(CommittableMessageTypeInfo.of(LongSerializer::new))
+ .rebalance();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<String> getWriteResultSerializer() {
+ return new TestSinkV2.StringSerializer();
+ }
+
+ @Override
+ public DataStream<Integer> addPreWriteTopology(DataStream<Integer>
inputDataStream) {
+ return inputDataStream.map(v -> v).name("pre-writer").rebalance();
+ }
+ }
+
+ public static class LongSerializer implements
SimpleVersionedSerializer<Long>, Serializable {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(Long obj) throws IOException {
+ return new byte[0];
+ }
+
+ @Override
+ public Long deserialize(int version, byte[] serialized) throws
IOException {
+ return null;
+ }
+ }
+
private static class SerializationTestOperatorFactory
extends AbstractStreamOperatorFactory<Integer>
implements CoordinatedOperatorFactory<Integer> {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index fe3625ec8ce..d1269ed89c1 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.runtime.operators.sink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
@@ -340,7 +340,7 @@ abstract class CommitterOperatorTestBase {
private OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
createTestHarness(
- TwoPhaseCommittingSink<?, String> sink,
+ SupportsCommitter<String> sink,
boolean isBatchMode,
boolean isCheckpointingEnabled)
throws Exception {
@@ -351,7 +351,7 @@ abstract class CommitterOperatorTestBase {
private OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
createTestHarness(
- TwoPhaseCommittingSink<?, String> sink,
+ SupportsCommitter<String> sink,
boolean isBatchMode,
boolean isCheckpointingEnabled,
int maxParallelism,
@@ -372,10 +372,10 @@ abstract class CommitterOperatorTestBase {
abstract SinkAndCounters sinkWithoutPostCommit();
static class SinkAndCounters {
- TwoPhaseCommittingSink<?, String> sink;
+ SupportsCommitter<String> sink;
IntSupplier commitCounter;
- public SinkAndCounters(TwoPhaseCommittingSink<?, String> sink,
IntSupplier commitCounter) {
+ public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier
commitCounter) {
this.sink = sink;
this.commitCounter = commitCounter;
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java
similarity index 91%
copy from
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
copy to
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java
index ed8e53ff342..51d2fa0d8d0 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java
@@ -19,10 +19,15 @@
package org.apache.flink.streaming.runtime.operators.sink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
import java.util.Collection;
-class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase {
+/**
+ * Should be removed along with {@link
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}.
+ */
+@Deprecated
+class SinkV2CommitterOperatorDeprecatedTest extends CommitterOperatorTestBase {
@Override
SinkAndCounters sinkWithPostCommit() {
ForwardingCommitter committer = new ForwardingCommitter();
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
index ed8e53ff342..349e3961e43 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.runtime.operators.sink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
import java.util.Collection;
@@ -27,7 +27,7 @@ class SinkV2CommitterOperatorTest extends
CommitterOperatorTestBase {
SinkAndCounters sinkWithPostCommit() {
ForwardingCommitter committer = new ForwardingCommitter();
return new SinkAndCounters(
- (TwoPhaseCommittingSink<?, String>)
+ (SupportsCommitter<String>)
TestSinkV2.newBuilder()
.setCommitter(committer)
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
@@ -38,8 +38,8 @@ class SinkV2CommitterOperatorTest extends
CommitterOperatorTestBase {
@Override
SinkAndCounters sinkWithPostCommitWithRetry() {
- return new SinkAndCounters(
- (TwoPhaseCommittingSink<?, String>)
+ return new CommitterOperatorTestBase.SinkAndCounters(
+ (SupportsCommitter<String>)
TestSinkV2.newBuilder()
.setCommitter(new
TestSinkV2.RetryOnceCommitter())
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
@@ -52,7 +52,7 @@ class SinkV2CommitterOperatorTest extends
CommitterOperatorTestBase {
SinkAndCounters sinkWithoutPostCommit() {
ForwardingCommitter committer = new ForwardingCommitter();
return new SinkAndCounters(
- (TwoPhaseCommittingSink<?, String>)
+ (SupportsCommitter<String>)
TestSinkV2.newBuilder()
.setCommitter(committer)
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java
new file mode 100644
index 00000000000..bf06065b17c
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Should be removed along with {@link
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}.
+ */
+@Deprecated
+class SinkV2SinkWriterOperatorDeprecatedTest extends
SinkWriterOperatorTestBase {
+
+ @Override
+ SinkAndSuppliers sinkWithoutCommitter() {
+ TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new
TestSinkV2.DefaultSinkWriter<>();
+ return new SinkAndSuppliers(
+ TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> -1,
+ TestSinkV2.StringSerializer::new);
+ }
+
+ @Override
+ SinkAndSuppliers sinkWithCommitter() {
+ TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
+ new TestSinkV2.DefaultCommittingSinkWriter<>();
+ return new SinkAndSuppliers(
+ TestSinkV2.<Integer>newBuilder()
+ .setWriter(sinkWriter)
+ .setDefaultCommitter()
+ .build(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> -1,
+ TestSinkV2.StringSerializer::new);
+ }
+
+ @Override
+ SinkAndSuppliers sinkWithTimeBasedWriter() {
+ TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new
TimeBasedBufferingSinkWriter();
+ return new SinkAndSuppliers(
+ TestSinkV2.<Integer>newBuilder()
+ .setWriter(sinkWriter)
+ .setDefaultCommitter()
+ .build(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> -1,
+ TestSinkV2.StringSerializer::new);
+ }
+
+ @Override
+ SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String
stateName) {
+ SnapshottingBufferingSinkWriter sinkWriter = new
SnapshottingBufferingSinkWriter();
+ TestSinkV2.Builder<Integer> builder =
+ TestSinkV2.newBuilder()
+ .setWriter(sinkWriter)
+ .setDefaultCommitter()
+ .setWithPostCommitTopology(true);
+ if (withState) {
+ builder.setWriterState(true);
+ }
+ if (stateName != null) {
+ builder.setCompatibleStateNames(stateName);
+ }
+ return new SinkAndSuppliers(
+ builder.build(),
+ () -> sinkWriter.elements,
+ () -> sinkWriter.watermarks,
+ () -> sinkWriter.lastCheckpointId,
+ () -> new TestSinkV2.StringSerializer());
+ }
+
+ private static class TimeBasedBufferingSinkWriter
+ extends TestSinkV2.DefaultCommittingSinkWriter<Integer>
+ implements ProcessingTimeService.ProcessingTimeCallback {
+
+ private final List<String> cachedCommittables = new ArrayList<>();
+ private ProcessingTimeService processingTimeService;
+
+ @Override
+ public void write(Integer element, Context context) {
+ cachedCommittables.add(
+ Tuple3.of(element, context.timestamp(),
context.currentWatermark()).toString());
+ }
+
+ @Override
+ public void onProcessingTime(long time) {
+ elements.addAll(cachedCommittables);
+ cachedCommittables.clear();
+ this.processingTimeService.registerTimer(time + 1000, this);
+ }
+
+ @Override
+ public void init(Sink.InitContext context) {
+ this.processingTimeService = context.getProcessingTimeService();
+ this.processingTimeService.registerTimer(1000, this);
+ }
+ }
+
+ private static class SnapshottingBufferingSinkWriter
+ extends TestSinkV2.DefaultStatefulSinkWriter {
+ public static final int NOT_SNAPSHOTTED = -1;
+ long lastCheckpointId = NOT_SNAPSHOTTED;
+ boolean endOfInput = false;
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException,
InterruptedException {
+ this.endOfInput = endOfInput;
+ }
+
+ @Override
+ public List<String> snapshotState(long checkpointId) throws
IOException {
+ lastCheckpointId = checkpointId;
+ return super.snapshotState(checkpointId);
+ }
+
+ @Override
+ public Collection<String> prepareCommit() {
+ if (!endOfInput) {
+ return ImmutableList.of();
+ }
+ List<String> result = elements;
+ elements = new ArrayList<>();
+ return result;
+ }
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
index 66fd323d1ff..e13e5881d71 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
@@ -20,14 +20,22 @@ package org.apache.flink.streaming.runtime.operators.sink;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.util.Preconditions;
@@ -51,7 +59,7 @@ import java.util.function.Supplier;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertNotNull;
-/** A {@link org.apache.flink.api.connector.sink2.Sink} for all the sink
related tests. */
+/** A {@link Sink} for all the sink related tests. */
public class TestSinkV2<InputT> implements Sink<InputT> {
private final DefaultSinkWriter<InputT> writer;
@@ -79,6 +87,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
private DefaultCommitter committer;
private SimpleVersionedSerializer<String> committableSerializer;
private boolean withPostCommitTopology = false;
+ private boolean withPreCommitTopology = false;
private boolean withWriterState = false;
private String compatibleStateNames;
@@ -116,6 +125,11 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
return this;
}
+ public Builder<InputT> setWithPreCommitTopology(boolean
withPreCommitTopology) {
+ this.withPreCommitTopology = withPreCommitTopology;
+ return this;
+ }
+
public Builder<InputT> setWriterState(boolean withWriterState) {
this.withWriterState = withWriterState;
return this;
@@ -138,9 +152,21 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
writer = new DefaultCommittingSinkWriter<>();
}
if (!withPostCommitTopology) {
- // TwoPhaseCommittingSink with a stateless writer and a
committer
- return new TestSinkV2TwoPhaseCommittingSink<>(
- writer, committableSerializer, committer);
+ if (!withPreCommitTopology) {
+ // TwoPhaseCommittingSink with a stateless writer and
a committer
+ return new TestSinkV2TwoPhaseCommittingSink<>(
+ writer, committableSerializer, committer);
+ } else {
+ // TwoPhaseCommittingSink with a stateless writer, pre
commit topology,
+ // committer
+ Preconditions.checkArgument(
+ writer instanceof DefaultCommittingSinkWriter,
+ "Please provide a DefaultCommittingSinkWriter
instance");
+ return new TestSinkV2WithPreCommitTopology<>(
+ (DefaultCommittingSinkWriter) writer,
+ committableSerializer,
+ committer);
+ }
} else {
if (withWriterState) {
// TwoPhaseCommittingSink with a stateful writer and a
committer and post
@@ -170,7 +196,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
}
private static class TestSinkV2TwoPhaseCommittingSink<InputT> extends
TestSinkV2<InputT>
- implements TwoPhaseCommittingSink<InputT, String> {
+ implements SupportsCommitter<String> {
private final DefaultCommitter committer;
private final SimpleVersionedSerializer<String> committableSerializer;
@@ -184,7 +210,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
}
@Override
- public Committer<String> createCommitter() {
+ public Committer<String> createCommitter(CommitterInitContext context)
{
committer.init();
return committer;
}
@@ -195,8 +221,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
}
@Override
- public PrecommittingSinkWriter<InputT, String>
createWriter(InitContext context) {
- return (PrecommittingSinkWriter<InputT, String>)
super.createWriter(context);
+ public SinkWriter<InputT> createWriter(WriterInitContext context)
throws IOException {
+ return super.createWriter(context);
}
}
@@ -204,7 +230,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
private static class TestSinkV2WithPostCommitTopology<InputT>
extends TestSinkV2TwoPhaseCommittingSink<InputT>
- implements WithPostCommitTopology<InputT, String> {
+ implements SupportsPostCommitTopology<String> {
public TestSinkV2WithPostCommitTopology(
DefaultSinkWriter<InputT> writer,
SimpleVersionedSerializer<String> committableSerializer,
@@ -218,8 +244,42 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
}
}
+ private static class TestSinkV2WithPreCommitTopology<InputT>
+ extends TestSinkV2TwoPhaseCommittingSink<InputT>
+ implements SupportsPreCommitTopology<String, String> {
+ public TestSinkV2WithPreCommitTopology(
+ DefaultSinkWriter<InputT> writer,
+ SimpleVersionedSerializer<String> committableSerializer,
+ DefaultCommitter committer) {
+ super(writer, committableSerializer, committer);
+ }
+
+ @Override
+ public DataStream<CommittableMessage<String>> addPreCommitTopology(
+ DataStream<CommittableMessage<String>> committables) {
+ return committables
+ .map(
+ m -> {
+ if (m instanceof CommittableSummary) {
+ return m;
+ } else {
+ CommittableWithLineage<String> withLineage
=
+ (CommittableWithLineage<String>) m;
+ return withLineage.map(old -> old +
"Transformed");
+ }
+ })
+
.returns(CommittableMessageTypeInfo.of(StringSerializer::new));
+ }
+
+ @Override
+ public SimpleVersionedSerializer<String> getWriteResultSerializer() {
+ return new StringSerializer();
+ }
+ }
+
private static class TestStatefulSinkV2<InputT> extends
TestSinkV2WithPostCommitTopology<InputT>
- implements StatefulSink<InputT, String>,
StatefulSink.WithCompatibleState {
+ implements SupportsWriterState<InputT, String>,
+ SupportsWriterState.WithCompatibleState {
private String compatibleState;
public TestStatefulSinkV2(
@@ -238,7 +298,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
@Override
public StatefulSinkWriter<InputT, String> restoreWriter(
- InitContext context, Collection<String> recoveredState) {
+ WriterInitContext context, Collection<String> recoveredState) {
DefaultStatefulSinkWriter<InputT> statefulWriter =
(DefaultStatefulSinkWriter) getWriter();
@@ -297,10 +357,9 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
}
}
- /** Base class for out testing {@link
TwoPhaseCommittingSink.PrecommittingSinkWriter}. */
+ /** Base class for out testing {@link CommittingSinkWriter}. */
protected static class DefaultCommittingSinkWriter<InputT> extends
DefaultSinkWriter<InputT>
- implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT,
String>,
- Serializable {
+ implements CommittingSinkWriter<InputT, String>, Serializable {
@Override
public void flush(boolean endOfInput) throws IOException,
InterruptedException {
@@ -316,12 +375,12 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
}
/**
- * Base class for out testing {@link StatefulSink.StatefulSinkWriter}.
Extends the {@link
+ * Base class for out testing {@link StatefulSinkWriter}. Extends the
{@link
* DefaultCommittingSinkWriter} for simplicity.
*/
protected static class DefaultStatefulSinkWriter<InputT>
extends DefaultCommittingSinkWriter<InputT>
- implements StatefulSink.StatefulSinkWriter<InputT, String> {
+ implements StatefulSinkWriter<InputT, String> {
@Override
public List<String> snapshotState(long checkpointId) throws
IOException {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
similarity index 96%
copy from
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
copy to
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
index 66fd323d1ff..72b2810d1b4 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.sink;
+package org.apache.flink.streaming.runtime.operators.sink.deprecated;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink2.Committer;
@@ -51,7 +51,13 @@ import java.util.function.Supplier;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertNotNull;
-/** A {@link org.apache.flink.api.connector.sink2.Sink} for all the sink
related tests. */
+/**
+ * A {@link org.apache.flink.api.connector.sink2.Sink} for all the sink
related tests.
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}.
+ */
+@Deprecated
public class TestSinkV2<InputT> implements Sink<InputT> {
private final DefaultSinkWriter<InputT> writer;
@@ -262,11 +268,11 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
/** Base class for out testing {@link SinkWriter}. */
public static class DefaultSinkWriter<InputT> implements
SinkWriter<InputT>, Serializable {
- protected List<String> elements;
+ public List<String> elements;
- protected List<Watermark> watermarks;
+ public List<Watermark> watermarks;
- protected DefaultSinkWriter() {
+ public DefaultSinkWriter() {
this.elements = new ArrayList<>();
this.watermarks = new ArrayList<>();
}
@@ -298,7 +304,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
}
/** Base class for out testing {@link
TwoPhaseCommittingSink.PrecommittingSinkWriter}. */
- protected static class DefaultCommittingSinkWriter<InputT> extends
DefaultSinkWriter<InputT>
+ public static class DefaultCommittingSinkWriter<InputT> extends
DefaultSinkWriter<InputT>
implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT,
String>,
Serializable {
@@ -319,7 +325,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
* Base class for out testing {@link StatefulSink.StatefulSinkWriter}.
Extends the {@link
* DefaultCommittingSinkWriter} for simplicity.
*/
- protected static class DefaultStatefulSinkWriter<InputT>
+ public static class DefaultStatefulSinkWriter<InputT>
extends DefaultCommittingSinkWriter<InputT>
implements StatefulSink.StatefulSinkWriter<InputT, String> {
@@ -387,7 +393,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
}
/** A {@link Committer} that always re-commits the committables data it
received. */
- static class RetryOnceCommitter extends DefaultCommitter {
+ public static class RetryOnceCommitter extends DefaultCommitter {
private final Set<CommitRequest<String>> seen = new LinkedHashSet<>();
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeDeprecatedITCase.java
similarity index 97%
copy from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
copy to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeDeprecatedITCase.java
index a0d64adb821..ac10d55a3b9 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeDeprecatedITCase.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.functions;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
@@ -61,8 +61,12 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for verifying runtime behaviour of {@link
BuiltInFunctionDefinitions#INTERNAL_HASHCODE}.
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}
*/
-public class HashcodeITCase {
+@Deprecated
+public class HashcodeDeprecatedITCase {
@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new
MiniClusterExtension();
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
index a0d64adb821..8eaf4a277f7 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.connector.sink.SinkV2Provider;
-import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
@@ -158,8 +157,7 @@ public class HashcodeITCase {
}
@Override
- public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(
- ScanTableSource.ScanContext context) {
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context)
{
return SourceFunctionProvider.of(new TestSourceFunction(), false);
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
index 6f02b41c573..b326aa824b0 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
@@ -29,8 +29,13 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
@@ -188,6 +193,21 @@ class SpeculativeSchedulerITCase {
assertThat(DummyCommitter.foundSpeculativeWriter).isTrue();
}
+ /** Should be removed along {@link TwoPhaseCommittingSink}. */
+ @Deprecated
+ @Test
+ public void testSpeculativeSlowSinkDeprecated() throws Exception {
+ executeJob(this::setupSpeculativeSlowSinkDeprecated);
+ waitUntilJobArchived();
+
+ checkResults();
+
+ // no speculative executions for committer
+ assertThat(DummyCommitter.attempts.get()).isEqualTo(parallelism);
+ // there is a speculative execution for writer
+ assertThat(DummyCommitter.foundSpeculativeWriter).isTrue();
+ }
+
@Test
public void testNonSpeculativeSlowSinkFunction() throws Exception {
executeJob(this::setupNonSpeculativeSlowSinkFunction);
@@ -343,6 +363,9 @@ class SpeculativeSchedulerITCase {
}
private void setupSpeculativeSlowSink(StreamExecutionEnvironment env) {
+ DummyCommitter.attempts.set(0);
+ DummyCommitter.blocked.set(false);
+ DummyCommitter.foundSpeculativeWriter = false;
final DataStream<Long> source =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
.setParallelism(parallelism)
@@ -354,6 +377,23 @@ class SpeculativeSchedulerITCase {
.slotSharingGroup("sinkGroup");
}
+ /** Should be removed along {@link TwoPhaseCommittingSink}. */
+ @Deprecated
+ private void setupSpeculativeSlowSinkDeprecated(StreamExecutionEnvironment
env) {
+ DummyCommitter.attempts.set(0);
+ DummyCommitter.blocked.set(false);
+ DummyCommitter.foundSpeculativeWriter = false;
+ final DataStream<Long> source =
+ env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
+ .setParallelism(parallelism)
+ .name("source")
+ .slotSharingGroup("sourceGroup");
+ source.sinkTo(new SpeculativeSinkDeprecated())
+ .setParallelism(parallelism)
+ .name("sink")
+ .slotSharingGroup("sinkGroup");
+ }
+
private void
setupNonSpeculativeSlowSinkFunction(StreamExecutionEnvironment env) {
final DataStream<Long> source =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
@@ -524,6 +564,93 @@ class SpeculativeSchedulerITCase {
}
private static class SpeculativeSink
+ implements Sink<Long>,
+ SupportsCommitter<Tuple3<Integer, Integer, Map<Long,
Long>>>,
+ SupportsConcurrentExecutionAttempts {
+
+ @Override
+ public SinkWriter<Long> createWriter(InitContext context) {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public CommittingSinkWriter<Long, Tuple3<Integer, Integer, Map<Long,
Long>>> createWriter(
+ WriterInitContext context) {
+ return new DummyCommittingSinkWriter(
+ context.getTaskInfo().getIndexOfThisSubtask(),
+ context.getTaskInfo().getAttemptNumber());
+ }
+
+ @Override
+ public Committer<Tuple3<Integer, Integer, Map<Long, Long>>>
createCommitter(
+ CommitterInitContext context) {
+ return new DummyCommitter();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Tuple3<Integer, Integer, Map<Long,
Long>>>
+ getCommittableSerializer() {
+ return new SimpleVersionedSerializer<Tuple3<Integer, Integer,
Map<Long, Long>>>() {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(Tuple3<Integer, Integer, Map<Long,
Long>> obj)
+ throws IOException {
+ return InstantiationUtil.serializeObject(obj);
+ }
+
+ @Override
+ public Tuple3<Integer, Integer, Map<Long, Long>> deserialize(
+ int version, byte[] serialized) throws IOException {
+ try {
+ return InstantiationUtil.deserializeObject(
+ serialized,
Thread.currentThread().getContextClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+
+ private static class DummyCommittingSinkWriter
+ implements CommittingSinkWriter<Long, Tuple3<Integer, Integer,
Map<Long, Long>>> {
+
+ private final int subTaskIndex;
+
+ private final int attemptNumber;
+
+ public DummyCommittingSinkWriter(int subTaskIndex, int attemptNumber) {
+ this.subTaskIndex = subTaskIndex;
+ this.attemptNumber = attemptNumber;
+ }
+
+ private final Map<Long, Long> numberCountResult = new HashMap<>();
+
+ @Override
+ public void write(Long value, Context context) throws IOException,
InterruptedException {
+ numberCountResult.merge(value, 1L, Long::sum);
+ maybeSleep();
+ }
+
+ @Override
+ public void flush(boolean endOfInput) {}
+
+ @Override
+ public Collection<Tuple3<Integer, Integer, Map<Long, Long>>>
prepareCommit() {
+ return Collections.singleton(Tuple3.of(subTaskIndex,
attemptNumber, numberCountResult));
+ }
+
+ @Override
+ public void close() throws Exception {}
+ }
+
+ /** Should be removed along {@link TwoPhaseCommittingSink}. */
+ @Deprecated
+ private static class SpeculativeSinkDeprecated
implements TwoPhaseCommittingSink<Long, Tuple3<Integer, Integer,
Map<Long, Long>>>,
SupportsConcurrentExecutionAttempts {
@@ -570,6 +697,8 @@ class SpeculativeSchedulerITCase {
}
}
+ /** Should be removed along {@link TwoPhaseCommittingSink}. */
+ @Deprecated
private static class DummyPrecommittingSinkWriter
implements PrecommittingSinkWriter<Long, Tuple3<Integer, Integer,
Map<Long, Long>>> {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2DeprecatedITCase.java
similarity index 95%
copy from
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
copy to
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2DeprecatedITCase.java
index b23dc34ed74..a101fbe4119 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2DeprecatedITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.util.AbstractTestBase;
@@ -44,8 +44,12 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
/**
* Integration test for {@link org.apache.flink.api.connector.sink.Sink} run
time implementation.
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}
*/
-public class SinkV2ITCase extends AbstractTestBase {
+@Deprecated
+public class SinkV2DeprecatedITCase extends AbstractTestBase {
static final List<Integer> SOURCE_DATA =
Arrays.asList(
895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850,
630, 682, 765, 434, 970,
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index b23dc34ed74..f44ab39975b 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -103,6 +103,32 @@ public class SinkV2ITCase extends AbstractTestBase {
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
}
+ @Test
+ public void writerAndPrecommitToplogyAndCommitterExecuteInStreamingMode()
throws Exception {
+ final StreamExecutionEnvironment env = buildStreamEnv();
+ final FiniteTestSource<Integer> source =
+ new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA,
SOURCE_DATA);
+
+ env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO)
+ .sinkTo(
+ TestSinkV2.<Integer>newBuilder()
+ .setDefaultCommitter(
+
(Supplier<Queue<Committer.CommitRequest<String>>>
+ & Serializable)
+ () -> COMMIT_QUEUE)
+ .setWithPreCommitTopology(true)
+ .build());
+ env.execute();
+ assertThat(
+ COMMIT_QUEUE.stream()
+ .map(Committer.CommitRequest::getCommittable)
+ .collect(Collectors.toList()),
+ containsInAnyOrder(
+ EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.stream()
+ .map(s -> s + "Transformed")
+ .toArray()));
+ }
+
@Test
public void writerAndCommitterExecuteInBatchMode() throws Exception {
final StreamExecutionEnvironment env = buildBatchEnv();
@@ -123,6 +149,30 @@ public class SinkV2ITCase extends AbstractTestBase {
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
}
+ @Test
+ public void writerAndPrecommitToplogyAndCommitterExecuteInBatchMode()
throws Exception {
+ final StreamExecutionEnvironment env = buildBatchEnv();
+
+ env.fromData(SOURCE_DATA)
+ .sinkTo(
+ TestSinkV2.<Integer>newBuilder()
+ .setDefaultCommitter(
+
(Supplier<Queue<Committer.CommitRequest<String>>>
+ & Serializable)
+ () -> COMMIT_QUEUE)
+ .setWithPreCommitTopology(true)
+ .build());
+ env.execute();
+ assertThat(
+ COMMIT_QUEUE.stream()
+ .map(Committer.CommitRequest::getCommittable)
+ .collect(Collectors.toList()),
+ containsInAnyOrder(
+ EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.stream()
+ .map(s -> s + "Transformed")
+ .toArray()));
+ }
+
private StreamExecutionEnvironment buildStreamEnv() {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java
new file mode 100644
index 00000000000..3dea88bcf65
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static
org.apache.flink.metrics.testutils.MetricAssertions.assertThatCounter;
+import static
org.apache.flink.metrics.testutils.MetricAssertions.assertThatGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasEntry;
+
+/**
+ * Tests whether all provided metrics of a {@link Sink} are of the expected
values (FLIP-33).
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}
+ */
+@Deprecated
+public class SinkV2MetricsDeprecatedITCase extends TestLogger {
+
+ private static final String TEST_SINK_NAME = "MetricTestSink";
+ // please refer to SinkTransformationTranslator#WRITER_NAME
+ private static final String DEFAULT_WRITER_NAME = "Writer";
+ private static final String DEFAULT_COMMITTER_NAME = "Committer";
+ private static final int DEFAULT_PARALLELISM = 4;
+
+ @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+ private final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .build());
+
+ @Test
+ public void testMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ int numSplits = Math.max(1, env.getParallelism() - 2);
+
+ int numRecordsPerSplit = 10;
+
+ // make sure all parallel instances have processed the same amount of
records before
+ // validating metrics
+ SharedReference<CyclicBarrier> beforeBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ SharedReference<CyclicBarrier> afterBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ int stopAtRecord1 = 4;
+ int stopAtRecord2 = numRecordsPerSplit - 1;
+
+ env.fromSequence(0, numSplits - 1)
+ .<Long>flatMap(
+ (split, collector) ->
+ LongStream.range(0,
numRecordsPerSplit).forEach(collector::collect))
+ .returns(BasicTypeInfo.LONG_TYPE_INFO)
+ .map(
+ i -> {
+ if (i % numRecordsPerSplit == stopAtRecord1
+ || i % numRecordsPerSplit ==
stopAtRecord2) {
+ beforeBarrier.get().await();
+ afterBarrier.get().await();
+ }
+ return i;
+ })
+ .sinkTo(TestSinkV2.<Long>newBuilder().setWriter(new
MetricWriter()).build())
+ .name(TEST_SINK_NAME);
+ JobClient jobClient = env.executeAsync();
+ final JobID jobId = jobClient.getJobID();
+
+ beforeBarrier.get().await();
+ assertSinkMetrics(jobId, stopAtRecord1, numSplits);
+ afterBarrier.get().await();
+
+ beforeBarrier.get().await();
+ assertSinkMetrics(jobId, stopAtRecord2, numSplits);
+ afterBarrier.get().await();
+
+ jobClient.getJobExecutionResult().get();
+ }
+
+ @Test
+ public void testCommitterMetrics() throws Exception {
+ final int numCommittables = 7;
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // make sure all parallel instances have processed the records once
before validating
+ // metrics
+ SharedReference<CountDownLatch> beforeLatch =
+ sharedObjects.add(new CountDownLatch(numCommittables));
+ SharedReference<CountDownLatch> afterLatch = sharedObjects.add(new
CountDownLatch(1));
+
+ env.fromSequence(0, numCommittables - 1)
+ .returns(BasicTypeInfo.LONG_TYPE_INFO)
+ .sinkTo(
+ TestSinkV2.<Long>newBuilder()
+ .setCommitter(new MetricCommitter(beforeLatch,
afterLatch))
+
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
+ .build())
+ .name(TEST_SINK_NAME);
+ JobClient jobClient = env.executeAsync();
+ final JobID jobId = jobClient.getJobID();
+
+ // Run until every committer finished with 1 commit round - everything
should be retried and
+ // pending
+ beforeLatch.get().await();
+ assertSinkCommitterMetrics(
+ jobId,
+ ImmutableMap.of(
+ MetricNames.ALREADY_COMMITTED_COMMITTABLES, 0L,
+ MetricNames.FAILED_COMMITTABLES, 0L,
+ MetricNames.RETRIED_COMMITTABLES, 7L,
+ MetricNames.SUCCESSFUL_COMMITTABLES, 0L,
+ MetricNames.TOTAL_COMMITTABLES, 7L,
+ MetricNames.PENDING_COMMITTABLES, 7L));
+ afterLatch.get().countDown();
+
+ // Run until finished
+ jobClient.getJobExecutionResult().get();
+ assertSinkCommitterMetrics(
+ jobId,
+ ImmutableMap.of(
+ MetricNames.ALREADY_COMMITTED_COMMITTABLES, 1L,
+ MetricNames.FAILED_COMMITTABLES, 2L,
+ MetricNames.RETRIED_COMMITTABLES, 10L,
+ MetricNames.SUCCESSFUL_COMMITTABLES, 4L,
+ MetricNames.TOTAL_COMMITTABLES, 7L,
+ MetricNames.PENDING_COMMITTABLES, 0L));
+ }
+
+ @SuppressWarnings("checkstyle:WhitespaceAfter")
+ private void assertSinkMetrics(JobID jobId, long
processedRecordsPerSubtask, int numSplits) {
+ List<OperatorMetricGroup> groups =
+ reporter.findOperatorMetricGroups(
+ jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
+
+ int subtaskWithMetrics = 0;
+ for (OperatorMetricGroup group : groups) {
+ Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
+ // There are only 2 splits assigned; so two groups will not update
metrics.
+ if (group.getIOMetricGroup() == null
+ || group.getIOMetricGroup().getNumRecordsOutCounter() ==
null
+ ||
group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
+ continue;
+ }
+ subtaskWithMetrics++;
+
+ // SinkWriterMetricGroup metrics
+ assertThatCounter(metrics.get(MetricNames.IO_NUM_RECORDS_OUT))
+ .isEqualTo(processedRecordsPerSubtask);
+ assertThatCounter(metrics.get(MetricNames.IO_NUM_BYTES_OUT))
+ .isEqualTo(processedRecordsPerSubtask *
MetricWriter.RECORD_SIZE_IN_BYTES);
+ // MetricWriter is just incrementing errors every even record
+ assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_OUT_ERRORS))
+ .isEqualTo((processedRecordsPerSubtask + 1) / 2);
+
+ // Test "send" metric series has the same value as "out" metric
series.
+ assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_SEND))
+ .isEqualTo(processedRecordsPerSubtask);
+ assertThatCounter(metrics.get(MetricNames.NUM_BYTES_SEND))
+ .isEqualTo(processedRecordsPerSubtask *
MetricWriter.RECORD_SIZE_IN_BYTES);
+ assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_SEND_ERRORS))
+ .isEqualTo((processedRecordsPerSubtask + 1) / 2);
+
+ // check if the latest send time is fetched
+ assertThatGauge(metrics.get(MetricNames.CURRENT_SEND_TIME))
+ .isEqualTo((processedRecordsPerSubtask - 1) *
MetricWriter.BASE_SEND_TIME);
+ }
+ assertThat(subtaskWithMetrics, equalTo(numSplits));
+ }
+
+ private void assertSinkCommitterMetrics(JobID jobId, Map<String, Long>
expected) {
+ List<OperatorMetricGroup> groups =
+ reporter.findOperatorMetricGroups(
+ jobId, TEST_SINK_NAME + ": " + DEFAULT_COMMITTER_NAME);
+
+ Map<String, Long> aggregated = new HashMap<>(6);
+ for (OperatorMetricGroup group : groups) {
+ Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
+
+ for (String metricName :
+ Arrays.asList(
+ MetricNames.SUCCESSFUL_COMMITTABLES,
+ MetricNames.ALREADY_COMMITTED_COMMITTABLES,
+ MetricNames.RETRIED_COMMITTABLES,
+ MetricNames.FAILED_COMMITTABLES,
+ MetricNames.TOTAL_COMMITTABLES)) {
+ final Counter counter = (Counter) metrics.get(metricName);
+ if (counter != null) {
+ aggregated.merge(metricName, counter.getCount(),
Long::sum);
+ }
+ }
+
+ Gauge<Integer> pendingMetrics =
+ (Gauge<Integer>)
metrics.get(MetricNames.PENDING_COMMITTABLES);
+ if (pendingMetrics != null && pendingMetrics.getValue() != null) {
+ aggregated.merge(
+ MetricNames.PENDING_COMMITTABLES,
+ pendingMetrics.getValue().longValue(),
+ Long::sum);
+ }
+ }
+
+ expected.entrySet()
+ .forEach(e -> assertThat(aggregated, hasEntry(e.getKey(),
e.getValue())));
+ }
+
+ private static class MetricWriter extends
TestSinkV2.DefaultSinkWriter<Long> {
+ static final long BASE_SEND_TIME = 100;
+ static final long RECORD_SIZE_IN_BYTES = 10;
+ private SinkWriterMetricGroup metricGroup;
+ private long sendTime;
+
+ @Override
+ public void init(Sink.InitContext context) {
+ this.metricGroup = context.metricGroup();
+ metricGroup.setCurrentSendTimeGauge(() -> sendTime);
+ }
+
+ @Override
+ public void write(Long element, Context context) {
+ super.write(element, context);
+ sendTime = element * BASE_SEND_TIME;
+ metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
+ if (element % 2 == 0) {
+ metricGroup.getNumRecordsOutErrorsCounter().inc();
+ }
+
metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
+ }
+ }
+
+ private static class MetricCommitter extends TestSinkV2.DefaultCommitter {
+ private int counter = 0;
+ private SharedReference<CountDownLatch> beforeLatch;
+ private SharedReference<CountDownLatch> afterLatch;
+
+ MetricCommitter(
+ SharedReference<CountDownLatch> beforeLatch,
+ SharedReference<CountDownLatch> afterLatch) {
+ this.beforeLatch = beforeLatch;
+ this.afterLatch = afterLatch;
+ this.counter = 0;
+ }
+
+ @Override
+ public void commit(Collection<CommitRequest<String>> committables) {
+ if (counter == 0) {
+ System.err.println(
+ "Committables arrived "
+ + Thread.currentThread().getName()
+ + " "
+ + committables.stream()
+ .map(c -> c.getCommittable())
+ .collect(Collectors.toList()));
+ committables.forEach(c -> c.retryLater());
+ } else {
+ if (counter == 1) {
+ // Wait for metrics check before continue
+ try {
+ committables.forEach(any ->
beforeLatch.get().countDown());
+ afterLatch.get().await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ committables.forEach(
+ c -> {
+ switch (c.getCommittable().charAt(1)) {
+ case '0':
+ c.signalAlreadyCommitted();
+ // 1 already committed
+ break;
+ case '1':
+ case '2':
+ // 2 failed
+ c.signalFailedWithKnownReason(new
RuntimeException());
+ break;
+ case '3':
+ // Retry without change
+ if (counter == 1) {
+ c.retryLater();
+ }
+ break;
+ case '4':
+ case '5':
+ // Retry with change
+ c.updateAndRetryLater("Retry-" +
c.getCommittable());
+ }
+ });
+ }
+ counter++;
+ }
+ }
+}