This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4217408576552fc929d9c8331495a9282064cd9c Author: Arvid Heise <[email protected]> AuthorDate: Tue Sep 10 11:47:02 2024 +0200 [FLINK-25920] Refactor & Revise SinkWriterOperatorTestBase The stateful SinkWriterOperatorTestBase test cases used EOI to manipulate the state which was never clean. In particular, it also stored the input elements in state until EOI arrived and emitted them all at once. For state restoration tests, we emitted records after EOI arrived. This commit changed the writer state completely to just capture the record count, which is much more realistic than storing actual payload. The tests now directly assert on the state instead of output. This commit also introduces an adaptor for serializing basic types in the writer state and replaces the hard-to-maintain SinkAndSuppliers with an InspectableSink in the sink writer tests that require an abstraction on top of the different Sink flavors. --- .../core/io/SimpleVersionedSerializerAdapter.java | 58 ++++++ .../SinkV1TransformationTranslatorITCase.java | 8 +- .../api/graph/StreamingJobGraphGeneratorTest.java | 5 +- .../runtime/operators/sink/SinkTestUtil.java | 4 +- .../SinkV2CommitterOperatorDeprecatedTest.java | 3 - .../sink/SinkV2CommitterOperatorTest.java | 13 +- .../SinkV2SinkWriterOperatorDeprecatedTest.java | 88 ++++----- .../sink/SinkV2SinkWriterOperatorTest.java | 76 ++++---- .../operators/sink/SinkWriterOperatorTestBase.java | 217 +++++++++------------ .../streaming/runtime/operators/sink/TestSink.java | 98 ++++------ .../runtime/operators/sink/TestSinkV2.java | 111 +++++------ .../sink/WithAdapterCommitterOperatorTest.java | 13 +- .../sink/WithAdapterSinkWriterOperatorTest.java | 84 ++++---- .../operators/sink/deprecated/TestSinkV2.java | 82 ++++---- .../nodes/exec/common/CommonExecSinkITCase.java | 5 +- .../flink/test/streaming/runtime/SinkITCase.java | 3 - .../runtime/SinkV2MetricsDeprecatedITCase.java | 1 - .../streaming/runtime/SinkV2MetricsITCase.java | 5 +- 18 files changed, 400 insertions(+), 474 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializerAdapter.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializerAdapter.java new file mode 100644 index 00000000000..03056be26a9 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializerAdapter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Adapter for {@link TypeSerializer} to {@link SimpleVersionedSerializer}. The implementation is + * naive and should only be used for non-critical paths and tests. + */ +@Internal +public class SimpleVersionedSerializerAdapter<T> + implements SimpleVersionedSerializer<T>, Serializable { + private final TypeSerializer<T> serializer; + + public SimpleVersionedSerializerAdapter(TypeSerializer<T> serializer) { + this.serializer = serializer; + } + + public int getVersion() { + return serializer.snapshotConfiguration().getCurrentVersion(); + } + + public byte[] serialize(T value) throws IOException { + DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(10); + serializer.serialize(value, dataOutputSerializer); + return dataOutputSerializer.getCopyOfBuffer(); + } + + public T deserialize(int version, byte[] serialized) throws IOException { + DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(serialized); + T value = serializer.deserialize(dataInputDeserializer); + dataInputDeserializer.releaseArrays(); + return value; + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java index 8f4ddf77389..2685d57b6da 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java @@ -121,13 +121,7 @@ class SinkV1TransformationTranslatorITCase void generateWriterGlobalCommitterTopology() { final StreamGraph streamGraph = buildGraph( - TestSink.newBuilder() - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) - .setGlobalCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) - .setDefaultGlobalCommitter() - .build(), + TestSink.newBuilder().setDefaultGlobalCommitter().build(), runtimeExecutionMode); final StreamNode sourceNode = findNodeName(streamGraph, node -> node.contains("Source")); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 34085974363..1972d9e7697 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; @@ -55,6 +56,7 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -115,7 +117,6 @@ import org.apache.flink.streaming.api.transformations.MultipleInputTransformatio import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.StreamExchangeMode; -import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; @@ -2684,7 +2685,7 @@ class StreamingJobGraphGeneratorTest { @Override public SimpleVersionedSerializer<String> getWriteResultSerializer() { - return new TestSinkV2.StringSerializer(); + return new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java index b457b5a0b78..cfdae58596c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java @@ -52,7 +52,7 @@ class SinkTestUtil { static byte[] toBytes(String obj) { try { return SimpleVersionedSerialization.writeVersionAndSerialize( - TestSinkV2.StringSerializer.INSTANCE, obj); + TestSinkV2.COMMITTABLE_SERIALIZER, obj); } catch (IOException e) { throw new IllegalStateException(e); } @@ -83,7 +83,7 @@ class SinkTestUtil { static String fromBytes(byte[] obj) { try { return SimpleVersionedSerialization.readVersionAndDeSerialize( - TestSinkV2.StringSerializer.INSTANCE, obj); + TestSinkV2.COMMITTABLE_SERIALIZER, obj); } catch (IOException e) { throw new IllegalStateException(e); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java index 51d2fa0d8d0..3aacaa6d1f8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java @@ -35,7 +35,6 @@ class SinkV2CommitterOperatorDeprecatedTest extends CommitterOperatorTestBase { (TwoPhaseCommittingSink<?, String>) TestSinkV2.newBuilder() .setCommitter(committer) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .setWithPostCommitTopology(true) .build(), () -> committer.successfulCommits); @@ -47,7 +46,6 @@ class SinkV2CommitterOperatorDeprecatedTest extends CommitterOperatorTestBase { (TwoPhaseCommittingSink<?, String>) TestSinkV2.newBuilder() .setCommitter(new TestSinkV2.RetryOnceCommitter()) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .setWithPostCommitTopology(true) .build(), () -> 0); @@ -60,7 +58,6 @@ class SinkV2CommitterOperatorDeprecatedTest extends CommitterOperatorTestBase { (TwoPhaseCommittingSink<?, String>) TestSinkV2.newBuilder() .setCommitter(committer) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .setWithPostCommitTopology(false) .build(), () -> committer.successfulCommits); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java index 349e3961e43..0112b5cf862 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java @@ -30,7 +30,6 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { (SupportsCommitter<String>) TestSinkV2.newBuilder() .setCommitter(committer) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .setWithPostCommitTopology(true) .build(), () -> committer.successfulCommits); @@ -42,7 +41,6 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { (SupportsCommitter<String>) TestSinkV2.newBuilder() .setCommitter(new TestSinkV2.RetryOnceCommitter()) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .setWithPostCommitTopology(true) .build(), () -> 0); @@ -52,12 +50,11 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { SinkAndCounters sinkWithoutPostCommit() { ForwardingCommitter committer = new ForwardingCommitter(); return new SinkAndCounters( - (SupportsCommitter<String>) - TestSinkV2.newBuilder() - .setCommitter(committer) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) - .setWithPostCommitTopology(false) - .build(), + TestSinkV2.newBuilder() + .setCommitter(committer) + .setWithPostCommitTopology(false) + .build() + .asSupportsCommitter(), () -> committer.successfulCommits); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java index bf06065b17c..4e0c3ad0cec 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java @@ -18,16 +18,13 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2; -import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList; - -import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; /** @@ -35,67 +32,49 @@ import java.util.List; */ @Deprecated class SinkV2SinkWriterOperatorDeprecatedTest extends SinkWriterOperatorTestBase { - @Override - SinkAndSuppliers sinkWithoutCommitter() { + InspectableSink sinkWithoutCommitter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); - return new SinkAndSuppliers( - TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + return new InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build()); } @Override - SinkAndSuppliers sinkWithCommitter() { + InspectableSink sinkWithCommitter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultCommittingSinkWriter<>(); - return new SinkAndSuppliers( + return new InspectableSink( TestSinkV2.<Integer>newBuilder() - .setWriter(sinkWriter) .setDefaultCommitter() - .build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + .setWriter(sinkWriter) + .build()); } @Override - SinkAndSuppliers sinkWithTimeBasedWriter() { + InspectableSink sinkWithTimeBasedWriter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TimeBasedBufferingSinkWriter(); - return new SinkAndSuppliers( + return new InspectableSink( TestSinkV2.<Integer>newBuilder() .setWriter(sinkWriter) .setDefaultCommitter() - .build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + .build()); } @Override - SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String stateName) { - SnapshottingBufferingSinkWriter sinkWriter = new SnapshottingBufferingSinkWriter(); + InspectableSink sinkWithState(boolean withState, String stateName) { + TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = + new TestSinkV2.DefaultStatefulSinkWriter<>(); TestSinkV2.Builder<Integer> builder = - TestSinkV2.newBuilder() - .setWriter(sinkWriter) + TestSinkV2.<Integer>newBuilder() .setDefaultCommitter() - .setWithPostCommitTopology(true); + .setWithPostCommitTopology(true) + .setWriter(sinkWriter); if (withState) { builder.setWriterState(true); } if (stateName != null) { builder.setCompatibleStateNames(stateName); } - return new SinkAndSuppliers( - builder.build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> sinkWriter.lastCheckpointId, - () -> new TestSinkV2.StringSerializer()); + return new InspectableSink(builder.build()); } private static class TimeBasedBufferingSinkWriter @@ -125,31 +104,30 @@ class SinkV2SinkWriterOperatorDeprecatedTest extends SinkWriterOperatorTestBase } } - private static class SnapshottingBufferingSinkWriter - extends TestSinkV2.DefaultStatefulSinkWriter { - public static final int NOT_SNAPSHOTTED = -1; - long lastCheckpointId = NOT_SNAPSHOTTED; - boolean endOfInput = false; + static class InspectableSink extends AbstractInspectableSink<TestSinkV2<Integer>> { + InspectableSink(TestSinkV2<Integer> sink) { + super(sink); + } + + @Override + public long getLastCheckpointId() { + return getSink().getWriter().lastCheckpointId; + } @Override - public void flush(boolean endOfInput) throws IOException, InterruptedException { - this.endOfInput = endOfInput; + public List<String> getRecordsOfCurrentCheckpoint() { + return getSink().getWriter().elements; } @Override - public List<String> snapshotState(long checkpointId) throws IOException { - lastCheckpointId = checkpointId; - return super.snapshotState(checkpointId); + public List<Watermark> getWatermarks() { + return getSink().getWriter().watermarks; } @Override - public Collection<String> prepareCommit() { - if (!endOfInput) { - return ImmutableList.of(); - } - List<String> result = elements; - elements = new ArrayList<>(); - return result; + public int getRecordCountFromState() { + return ((TestSinkV2.DefaultStatefulSinkWriter<?>) getSink().getWriter()) + .getRecordCount(); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java index d46cf0c05fc..2d41721cbfb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.java.tuple.Tuple3; @@ -30,67 +31,49 @@ import java.util.Collection; import java.util.List; class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { - @Override - SinkAndSuppliers sinkWithoutCommitter() { + InspectableSink sinkWithoutCommitter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); - return new SinkAndSuppliers( - TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + return new InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build()); } @Override - SinkAndSuppliers sinkWithCommitter() { + InspectableSink sinkWithCommitter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultCommittingSinkWriter<>(); - return new SinkAndSuppliers( + return new InspectableSink( TestSinkV2.<Integer>newBuilder() .setWriter(sinkWriter) .setDefaultCommitter() - .build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + .build()); } @Override - SinkAndSuppliers sinkWithTimeBasedWriter() { + InspectableSink sinkWithTimeBasedWriter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TimeBasedBufferingSinkWriter(); - return new SinkAndSuppliers( + return new InspectableSink( TestSinkV2.<Integer>newBuilder() .setWriter(sinkWriter) .setDefaultCommitter() - .build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - TestSinkV2.StringSerializer::new); + .build()); } @Override - SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String stateName) { - SnapshottingBufferingSinkWriter sinkWriter = new SnapshottingBufferingSinkWriter(); + InspectableSink sinkWithState(boolean withState, String stateName) { + TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = + new TestSinkV2.DefaultStatefulSinkWriter<>(); TestSinkV2.Builder<Integer> builder = - TestSinkV2.newBuilder() - .setWriter(sinkWriter) + TestSinkV2.<Integer>newBuilder() .setDefaultCommitter() - .setWithPostCommitTopology(true); + .setWithPostCommitTopology(true) + .setWriter(sinkWriter); if (withState) { builder.setWriterState(true); } if (stateName != null) { builder.setCompatibleStateNames(stateName); } - return new SinkAndSuppliers( - builder.build(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> sinkWriter.lastCheckpointId, - () -> new TestSinkV2.StringSerializer()); + return new InspectableSink(builder.build()); } private static class TimeBasedBufferingSinkWriter @@ -147,4 +130,31 @@ class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { return result; } } + + static class InspectableSink extends AbstractInspectableSink<TestSinkV2<Integer>> { + InspectableSink(TestSinkV2<Integer> sink) { + super(sink); + } + + @Override + public long getLastCheckpointId() { + return getSink().getWriter().lastCheckpointId; + } + + @Override + public List<String> getRecordsOfCurrentCheckpoint() { + return getSink().getWriter().elements; + } + + @Override + public List<Watermark> getWatermarks() { + return getSink().getWriter().watermarks; + } + + @Override + public int getRecordCountFromState() { + return ((TestSinkV2.DefaultStatefulSinkWriter<?>) getSink().getWriter()) + .getRecordCount(); + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java index 4ede8a6ee83..d6a40dc24b3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java @@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataOutputSerializer; @@ -66,8 +65,6 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.LongSupplier; -import java.util.function.Supplier; import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput; @@ -77,21 +74,21 @@ abstract class SinkWriterOperatorTestBase { @Test void testNotEmitCommittablesWithoutCommitter() throws Exception { - SinkAndSuppliers sinkAndSuppliers = sinkWithoutCommitter(); + InspectableSink sink = sinkWithoutCommitter(); final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkAndSuppliers.sink)); + new SinkWriterOperatorFactory<>(sink.getSink())); testHarness.open(); testHarness.processElement(1, 1); assertThat(testHarness.getOutput()).isEmpty(); - assertThat(sinkAndSuppliers.elementSupplier.get()) + assertThat(sink.getRecordsOfCurrentCheckpoint()) .containsOnly("(1,1," + Long.MIN_VALUE + ")"); testHarness.prepareSnapshotPreBarrier(1); assertThat(testHarness.getOutput()).isEmpty(); // Elements are flushed - assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty(); + assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); testHarness.close(); } @@ -99,10 +96,10 @@ abstract class SinkWriterOperatorTestBase { void testWatermarkPropagatedToSinkWriter() throws Exception { final long initialTime = 0; - SinkAndSuppliers sinkAndSuppliers = sinkWithoutCommitter(); + InspectableSink sink = sinkWithoutCommitter(); final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkAndSuppliers.sink)); + new SinkWriterOperatorFactory<>(sink.getSink())); testHarness.open(); testHarness.processWatermark(initialTime); @@ -110,7 +107,7 @@ abstract class SinkWriterOperatorTestBase { assertThat(testHarness.getOutput()) .containsExactly(new Watermark(initialTime), new Watermark(initialTime + 1)); - assertThat(sinkAndSuppliers.watermarkSupplier.get()) + assertThat(sink.getWatermarks()) .containsExactly( new org.apache.flink.api.common.eventtime.Watermark(initialTime), new org.apache.flink.api.common.eventtime.Watermark(initialTime + 1)); @@ -123,7 +120,7 @@ abstract class SinkWriterOperatorTestBase { final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().sink)); + new SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink())); testHarness.open(); @@ -150,7 +147,7 @@ abstract class SinkWriterOperatorTestBase { void testEmitOnFlushWithCommitter() throws Exception { final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithCommitter().sink)); + new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink())); testHarness.open(); assertThat(testHarness.getOutput()).isEmpty(); @@ -168,7 +165,7 @@ abstract class SinkWriterOperatorTestBase { @Test void testEmitOnEndOfInputInBatchMode() throws Exception { final SinkWriterOperatorFactory<Integer, Integer> writerOperatorFactory = - new SinkWriterOperatorFactory<>(sinkWithCommitter().sink); + new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()); final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = new OneInputStreamOperatorTestHarness<>(writerOperatorFactory); @@ -186,9 +183,10 @@ abstract class SinkWriterOperatorTestBase { final long initialTime = 0; - final SinkAndSuppliers sinkAndSuppliers = sinkWithSnapshottingWriter(stateful, null); + final InspectableSink sink = sinkWithState(stateful, null); + Sink<Integer> sink2 = sink.getSink(); final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink); + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink2)); testHarness.open(); @@ -199,39 +197,23 @@ abstract class SinkWriterOperatorTestBase { testHarness.prepareSnapshotPreBarrier(1L); OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L); - // we see the watermark and the committable summary, so the committables must be stored in - // state - assertThat(testHarness.getOutput()).hasSize(2).contains(new Watermark(initialTime)); - assertThat(sinkAndSuppliers.lastCheckpointSupplier.getAsLong()) - .isEqualTo(stateful ? 1L : -1L); + assertThat(sink.getRecordCountFromState()).isEqualTo(2); + assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L); testHarness.close(); - final SinkAndSuppliers restoredSink = sinkWithSnapshottingWriter(stateful, null); + final InspectableSink restoredSink = sinkWithState(stateful, null); final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> - restoredTestHarness = createTestHarnessWithBufferingSinkWriter(restoredSink.sink); + restoredTestHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink.getSink())); restoredTestHarness.initializeState(snapshot); restoredTestHarness.open(); - // this will flush out the committables that were restored - restoredTestHarness.endInput(); - final long checkpointId = 2; - restoredTestHarness.prepareSnapshotPreBarrier(checkpointId); - restoredTestHarness.notifyOfCompletedCheckpoint(checkpointId); - - if (stateful) { - assertBasicOutput(restoredTestHarness.getOutput(), 2, EOI); - } else { - assertThat(fromOutput(restoredTestHarness.getOutput()).get(0).asRecord().getValue()) - .isInstanceOf(CommittableSummary.class) - .satisfies( - cs -> - SinkV2Assertions.assertThat((CommittableSummary<?>) cs) - .hasOverallCommittables(0) - .hasPendingCommittables(0) - .hasFailedCommittables(0)); - } + // check that the previous state is correctly restored + assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful ? 2 : 0); + restoredTestHarness.close(); } @@ -244,66 +226,46 @@ abstract class SinkWriterOperatorTestBase { "bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt"); - SinkAndSuppliers sinkAndSuppliers = - sinkWithSnapshottingWriter(stateful, DummySinkOperator.DUMMY_SINK_STATE_NAME); + InspectableSink sink = sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); + int expectedState = 5; final OneInputStreamOperatorTestHarness<String, String> previousSink = new OneInputStreamOperatorTestHarness<>( - new DummySinkOperator(sinkAndSuppliers.serializerSupplier.get()), + new CompatibleStateSinkOperator<>( + TestSinkV2.WRITER_SERIALIZER, expectedState), StringSerializer.INSTANCE); OperatorSubtaskState previousSinkState = TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs); - // 2. Load previous sink state and verify the output + // 2. Load previous sink state and verify state + Sink<Integer> sink3 = sink.getSink(); final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> compatibleWriterOperator = - createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink); - - final List<String> expectedOutput1 = - stateful ? new ArrayList<>(previousSinkInputs) : new ArrayList<>(); - expectedOutput1.add(Tuple3.of(1, 1, Long.MIN_VALUE).toString()); + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink3)); // load the state from previous sink compatibleWriterOperator.initializeState(previousSinkState); + assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - compatibleWriterOperator.open(); - - compatibleWriterOperator.processElement(1, 1); - - // this will flush out the committables that were restored from previous sink - compatibleWriterOperator.endInput(); - compatibleWriterOperator.prepareSnapshotPreBarrier(1); - - OperatorSubtaskState operatorStateWithoutPreviousState = - compatibleWriterOperator.snapshot(1L, 1L); + // 3. do another snapshot and check if this also can be restored without compabitible state + // name + compatibleWriterOperator.prepareSnapshotPreBarrier(1L); + OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 1L); compatibleWriterOperator.close(); - assertEmitted(expectedOutput1, compatibleWriterOperator.getOutput()); - - // 3. Restore the sink without previous sink's state - SinkAndSuppliers sinkAndSuppliers2 = - sinkWithSnapshottingWriter(stateful, DummySinkOperator.DUMMY_SINK_STATE_NAME); + // 4. Restore the sink without previous sink's state + InspectableSink sink2 = + sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> restoredSinkOperator = - createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers2.sink); - final List<String> expectedOutput2 = - Arrays.asList( - Tuple3.of(2, 2, Long.MIN_VALUE).toString(), - Tuple3.of(3, 3, Long.MIN_VALUE).toString()); - - restoredSinkOperator.initializeState(operatorStateWithoutPreviousState); - - restoredSinkOperator.open(); + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink2.getSink())); - restoredSinkOperator.processElement(2, 2); - restoredSinkOperator.processElement(3, 3); + restoredSinkOperator.initializeState(snapshot); + assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - // this will flush out the committables that were restored - restoredSinkOperator.endInput(); - restoredSinkOperator.prepareSnapshotPreBarrier(2); - - assertEmitted(expectedOutput2, restoredSinkOperator.getOutput()); restoredSinkOperator.close(); } @@ -311,10 +273,10 @@ abstract class SinkWriterOperatorTestBase { void testRestoreCommitterState() throws Exception { final List<String> committables = Arrays.asList("state1", "state2"); - SinkAndSuppliers sinkAndSuppliers = sinkWithCommitter(); + InspectableSink sink = sinkWithCommitter(); final OneInputStreamOperatorTestHarness<String, String> committer = new OneInputStreamOperatorTestHarness<>( - new TestCommitterOperator(sinkAndSuppliers.serializerSupplier.get()), + new TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER), StringSerializer.INSTANCE); final OperatorSubtaskState committerState = @@ -322,7 +284,7 @@ abstract class SinkWriterOperatorTestBase { final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkAndSuppliers.sink)); + new SinkWriterOperatorFactory<>(sink.getSink())); testHarness.initializeState(committerState); @@ -362,16 +324,16 @@ abstract class SinkWriterOperatorTestBase { @ParameterizedTest @ValueSource(booleans = {true, false}) void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { - SinkAndSuppliers sinkAndSuppliers = sinkWithCommitter(); + InspectableSink sink = sinkWithCommitter(); final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkAndSuppliers.sink)); + new SinkWriterOperatorFactory<>(sink.getSink())); testHarness.open(); testHarness.processElement(1, 1); assertThat(testHarness.getOutput()).isEmpty(); final String record = "(1,1," + Long.MIN_VALUE + ")"; - assertThat(sinkAndSuppliers.elementSupplier.get()).containsOnly(record); + assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record); testHarness.endInput(); @@ -380,7 +342,7 @@ abstract class SinkWriterOperatorTestBase { } assertEmitted(Collections.singletonList(record), testHarness.getOutput()); - assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty(); + assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); testHarness.close(); } @@ -556,13 +518,6 @@ abstract class SinkWriterOperatorTestBase { assertThat(committables).containsExactlyInAnyOrderElementsOf(records); } - private static OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> - createTestHarnessWithBufferingSinkWriter(Sink sink) throws Exception { - final SinkWriterOperatorFactory<Integer, Integer> writerOperatorFactory = - new SinkWriterOperatorFactory<>(sink); - return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory); - } - private static void assertBasicOutput( Collection<Object> queuedOutput, int numberOfCommittables, long checkpointId) { List<StreamElement> output = fromOutput(queuedOutput); @@ -622,19 +577,22 @@ abstract class SinkWriterOperatorTestBase { } } - private static class DummySinkOperator extends AbstractStreamOperator<String> + /** Writes state to test whether the sink can read from alternative state names. */ + private static class CompatibleStateSinkOperator<T> extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> { - static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state"; + static final String SINK_STATE_NAME = "compatible_sink_state"; static final ListStateDescriptor<byte[]> SINK_STATE_DESC = - new ListStateDescriptor<>( - DUMMY_SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE); - ListState<String> sinkState; - private final SimpleVersionedSerializer<String> serializer; + new ListStateDescriptor<>(SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE); + ListState<T> sinkState; + private final SimpleVersionedSerializer<T> serializer; + private final T initialState; - public DummySinkOperator(SimpleVersionedSerializer<String> serializer) { + public CompatibleStateSinkOperator( + SimpleVersionedSerializer<T> serializer, T initialState) { this.serializer = serializer; + this.initialState = initialState; } public void initializeState(StateInitializationContext context) throws Exception { @@ -643,11 +601,14 @@ abstract class SinkWriterOperatorTestBase { new SimpleVersionedListState<>( context.getOperatorStateStore().getListState(SINK_STATE_DESC), serializer); + if (!context.isRestored()) { + sinkState.add(initialState); + } } @Override - public void processElement(StreamRecord<String> element) throws Exception { - sinkState.add(element.getValue()); + public void processElement(StreamRecord<String> element) { + // do nothing } } @@ -672,32 +633,42 @@ abstract class SinkWriterOperatorTestBase { } } - abstract SinkAndSuppliers sinkWithoutCommitter(); + abstract InspectableSink sinkWithoutCommitter(); + + abstract InspectableSink sinkWithTimeBasedWriter(); + + abstract InspectableSink sinkWithState(boolean withState, String stateName); + + abstract InspectableSink sinkWithCommitter(); - abstract SinkAndSuppliers sinkWithTimeBasedWriter(); + /** + * Basic abstraction to access the different flavors of sinks. Remove once the older interfaces + * are removed. + */ + interface InspectableSink { + long getLastCheckpointId(); - abstract SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String stateName); + List<String> getRecordsOfCurrentCheckpoint(); - abstract SinkAndSuppliers sinkWithCommitter(); + List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks(); - static class SinkAndSuppliers { - org.apache.flink.api.connector.sink2.Sink<Integer> sink; - Supplier<List<String>> elementSupplier; - Supplier<List<org.apache.flink.api.common.eventtime.Watermark>> watermarkSupplier; - LongSupplier lastCheckpointSupplier; - Supplier<SimpleVersionedSerializer<String>> serializerSupplier; + int getRecordCountFromState(); - public SinkAndSuppliers( - org.apache.flink.api.connector.sink2.Sink<Integer> sink, - Supplier<List<String>> elementSupplier, - Supplier<List<org.apache.flink.api.common.eventtime.Watermark>> watermarkSupplier, - LongSupplier lastCheckpointSupplier, - Supplier<SimpleVersionedSerializer<String>> serializerSupplier) { + Sink<Integer> getSink(); + } + + abstract static class AbstractInspectableSink< + S extends org.apache.flink.api.connector.sink2.Sink<Integer>> + implements InspectableSink { + private final S sink; + + protected AbstractInspectableSink(S sink) { this.sink = sink; - this.elementSupplier = elementSupplier; - this.watermarkSupplier = watermarkSupplier; - this.lastCheckpointSupplier = lastCheckpointSupplier; - this.serializerSupplier = serializerSupplier; + } + + @Override + public S getSink() { + return sink; } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java index a062181cc88..22b988a00db 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java @@ -25,7 +25,7 @@ import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.connector.sink.SinkWriter; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.streaming.api.transformations.SinkV1Adapter; import javax.annotation.Nullable; @@ -56,13 +56,17 @@ import static org.assertj.core.api.Assertions.assertThat; * @deprecated Use {@link TestSinkV2} instead. */ @Deprecated -public class TestSink<T> implements Sink<T, String, String, String> { +public class TestSink<T> implements Sink<T, String, Integer, String> { + public static final SimpleVersionedSerializerAdapter<String> COMMITTABLE_SERIALIZER = + TestSinkV2.COMMITTABLE_SERIALIZER; + public static final SimpleVersionedSerializerAdapter<Integer> WRITER_SERIALIZER = + TestSinkV2.WRITER_SERIALIZER; public static final String END_OF_INPUT_STR = "end of input"; private final DefaultSinkWriter<T> writer; - @Nullable private final SimpleVersionedSerializer<String> writerStateSerializer; + @Nullable private final SimpleVersionedSerializer<Integer> writerStateSerializer; @Nullable private final Committer<String> committer; @@ -76,7 +80,7 @@ public class TestSink<T> implements Sink<T, String, String, String> { private TestSink( DefaultSinkWriter<T> writer, - @Nullable SimpleVersionedSerializer<String> writerStateSerializer, + @Nullable SimpleVersionedSerializer<Integer> writerStateSerializer, @Nullable Committer<String> committer, @Nullable SimpleVersionedSerializer<String> committableSerializer, @Nullable GlobalCommitter<String, String> globalCommitter, @@ -92,7 +96,7 @@ public class TestSink<T> implements Sink<T, String, String, String> { } @Override - public SinkWriter<T, String, String> createWriter(InitContext context, List<String> states) { + public SinkWriter<T, String, Integer> createWriter(InitContext context, List<Integer> states) { writer.init(context); writer.restoredFrom(states); writer.setProcessingTimerService(context.getProcessingTimeService()); @@ -120,7 +124,7 @@ public class TestSink<T> implements Sink<T, String, String, String> { } @Override - public Optional<SimpleVersionedSerializer<String>> getWriterStateSerializer() { + public Optional<SimpleVersionedSerializer<Integer>> getWriterStateSerializer() { return Optional.ofNullable(writerStateSerializer); } @@ -137,30 +141,30 @@ public class TestSink<T> implements Sink<T, String, String, String> { return SinkV1Adapter.wrap(this); } + public DefaultSinkWriter<T> getWriter() { + return writer; + } + /** A builder class for {@link TestSink}. */ public static class Builder<T> { - private DefaultSinkWriter writer = new DefaultSinkWriter(); + private DefaultSinkWriter<T> writer = new DefaultSinkWriter<>(); - private SimpleVersionedSerializer<String> writerStateSerializer; + private SimpleVersionedSerializer<Integer> writerStateSerializer; private Committer<String> committer; - private SimpleVersionedSerializer<String> committableSerializer; - private GlobalCommitter<String, String> globalCommitter; - private SimpleVersionedSerializer<String> globalCommittableSerializer; - private Collection<String> compatibleStateNames = Collections.emptyList(); public <W> Builder<W> setWriter(DefaultSinkWriter<W> writer) { - this.writer = checkNotNull(writer); + this.writer = (DefaultSinkWriter<T>) checkNotNull(writer); return (Builder<W>) this; } public Builder<T> withWriterState() { - this.writerStateSerializer = StringCommittableSerializer.INSTANCE; + this.writerStateSerializer = WRITER_SERIALIZER; return this; } @@ -169,39 +173,23 @@ public class TestSink<T> implements Sink<T, String, String, String> { return this; } - public Builder<T> setCommittableSerializer( - SimpleVersionedSerializer<String> committableSerializer) { - this.committableSerializer = committableSerializer; - return this; - } - public Builder<T> setDefaultCommitter() { this.committer = new DefaultCommitter(); - this.committableSerializer = StringCommittableSerializer.INSTANCE; return this; } public Builder<T> setDefaultCommitter(Supplier<Queue<String>> queueSupplier) { this.committer = new DefaultCommitter(queueSupplier); - this.committableSerializer = StringCommittableSerializer.INSTANCE; - return this; - } - - public Builder<T> setGlobalCommittableSerializer( - SimpleVersionedSerializer<String> globalCommittableSerializer) { - this.globalCommittableSerializer = globalCommittableSerializer; return this; } public Builder<T> setDefaultGlobalCommitter() { this.globalCommitter = new DefaultGlobalCommitter(""); - this.globalCommittableSerializer = StringCommittableSerializer.INSTANCE; return this; } public Builder<T> setGlobalCommitter(Supplier<Queue<String>> queueSupplier) { this.globalCommitter = new DefaultGlobalCommitter(queueSupplier); - this.globalCommittableSerializer = StringCommittableSerializer.INSTANCE; return this; } @@ -219,9 +207,9 @@ public class TestSink<T> implements Sink<T, String, String, String> { writer, writerStateSerializer, committer, - committableSerializer, + committer == null && globalCommitter == null ? null : COMMITTABLE_SERIALIZER, globalCommitter, - globalCommittableSerializer, + globalCommitter == null ? null : COMMITTABLE_SERIALIZER, compatibleStateNames); } } @@ -230,7 +218,7 @@ public class TestSink<T> implements Sink<T, String, String, String> { /** Base class for out testing {@link SinkWriter Writers}. */ public static class DefaultSinkWriter<T> - implements SinkWriter<T, String, String>, Serializable { + implements SinkWriter<T, String, Integer>, Serializable { protected List<String> elements; @@ -238,6 +226,10 @@ public class TestSink<T> implements Sink<T, String, String, String> { protected ProcessingTimeService processingTimerService; + private int recordCount; + + protected long lastCheckpointId = -1; + protected DefaultSinkWriter() { this.elements = new ArrayList<>(); this.watermarks = new ArrayList<>(); @@ -247,6 +239,7 @@ public class TestSink<T> implements Sink<T, String, String, String> { public void write(T element, Context context) { elements.add( Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString()); + recordCount++; } @Override @@ -262,20 +255,27 @@ public class TestSink<T> implements Sink<T, String, String, String> { } @Override - public List<String> snapshotState(long checkpointId) throws IOException { - return Collections.emptyList(); + public List<Integer> snapshotState(long checkpointId) throws IOException { + lastCheckpointId = checkpointId; + return Collections.singletonList(recordCount); } @Override public void close() throws Exception {} - void restoredFrom(List<String> states) {} + void restoredFrom(List<Integer> states) { + recordCount = states.isEmpty() ? 0 : states.get(0); + } void setProcessingTimerService(ProcessingTimeService processingTimerService) { this.processingTimerService = processingTimerService; } public void init(InitContext context) {} + + public int getRecordCount() { + return recordCount; + } } // -------------------------------------- Sink Committer --------------------------------------- @@ -393,30 +393,4 @@ public class TestSink<T> implements Sink<T, String, String, String> { commit(Collections.singletonList(END_OF_INPUT_STR)); } } - - /** - * We introduce this {@link StringCommittableSerializer} is because that all the fields of - * {@link TestSink} should be serializable. - */ - public static class StringCommittableSerializer - implements SimpleVersionedSerializer<String>, Serializable { - - public static final StringCommittableSerializer INSTANCE = - new StringCommittableSerializer(); - - @Override - public int getVersion() { - return SimpleVersionedStringSerializer.INSTANCE.getVersion(); - } - - @Override - public byte[] serialize(String obj) { - return SimpleVersionedStringSerializer.INSTANCE.serialize(obj); - } - - @Override - public String deserialize(int version, byte[] serialized) throws IOException { - return SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized); - } - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java index e11f71c37ee..9caf7a8f339 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java @@ -19,6 +19,8 @@ package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; @@ -30,6 +32,7 @@ import org.apache.flink.api.connector.sink2.SupportsWriterState; import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; @@ -37,7 +40,6 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableSet; @@ -61,6 +63,10 @@ import static org.assertj.core.api.Assertions.assertThat; /** A {@link Sink} for all the sink related tests. */ public class TestSinkV2<InputT> implements Sink<InputT> { + public static final SimpleVersionedSerializerAdapter<String> COMMITTABLE_SERIALIZER = + new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); + public static final SimpleVersionedSerializerAdapter<Integer> WRITER_SERIALIZER = + new SimpleVersionedSerializerAdapter<>(IntSerializer.INSTANCE); private final DefaultSinkWriter<InputT> writer; @@ -81,11 +87,18 @@ public class TestSinkV2<InputT> implements Sink<InputT> { return new Builder<>(); } + public static <InputT> Builder<InputT> newBuilder(DefaultSinkWriter<InputT> writer) { + return new Builder<InputT>().setWriter(writer); + } + + public SupportsCommitter<String> asSupportsCommitter() { + throw new UnsupportedOperationException("No committter"); + } + /** A builder class for {@link TestSinkV2}. */ public static class Builder<InputT> { private DefaultSinkWriter<InputT> writer = null; private DefaultCommitter committer; - private SimpleVersionedSerializer<String> committableSerializer; private boolean withPostCommitTopology = false; private boolean withPreCommitTopology = false; private boolean withWriterState = false; @@ -101,22 +114,14 @@ public class TestSinkV2<InputT> implements Sink<InputT> { return this; } - public Builder<InputT> setCommittableSerializer( - SimpleVersionedSerializer<String> committableSerializer) { - this.committableSerializer = committableSerializer; - return this; - } - public Builder<InputT> setDefaultCommitter() { this.committer = new DefaultCommitter(); - this.committableSerializer = StringSerializer.INSTANCE; return this; } public Builder<InputT> setDefaultCommitter( Supplier<Queue<Committer.CommitRequest<String>>> queueSupplier) { this.committer = new DefaultCommitter(queueSupplier); - this.committableSerializer = StringSerializer.INSTANCE; return this; } @@ -155,7 +160,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { if (!withPreCommitTopology) { // TwoPhaseCommittingSink with a stateless writer and a committer return new TestSinkV2TwoPhaseCommittingSink<>( - writer, committableSerializer, committer); + writer, COMMITTABLE_SERIALIZER, committer); } else { // TwoPhaseCommittingSink with a stateless writer, pre commit topology, // committer @@ -163,9 +168,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { writer instanceof DefaultCommittingSinkWriter, "Please provide a DefaultCommittingSinkWriter instance"); return new TestSinkV2WithPreCommitTopology<>( - (DefaultCommittingSinkWriter) writer, - committableSerializer, - committer); + writer, COMMITTABLE_SERIALIZER, committer); } } else { if (withWriterState) { @@ -174,9 +177,9 @@ public class TestSinkV2<InputT> implements Sink<InputT> { Preconditions.checkArgument( writer instanceof DefaultStatefulSinkWriter, "Please provide a DefaultStatefulSinkWriter instance"); - return new TestStatefulSinkV2( - (DefaultStatefulSinkWriter) writer, - committableSerializer, + return new TestStatefulSinkV2<>( + (DefaultStatefulSinkWriter<InputT>) writer, + COMMITTABLE_SERIALIZER, committer, compatibleStateNames); } else { @@ -186,9 +189,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { writer instanceof DefaultCommittingSinkWriter, "Please provide a DefaultCommittingSinkWriter instance"); return new TestSinkV2WithPostCommitTopology<>( - (DefaultCommittingSinkWriter) writer, - committableSerializer, - committer); + writer, COMMITTABLE_SERIALIZER, committer); } } } @@ -215,6 +216,11 @@ public class TestSinkV2<InputT> implements Sink<InputT> { return committer; } + @Override + public SupportsCommitter<String> asSupportsCommitter() { + return this; + } + @Override public SimpleVersionedSerializer<String> getCommittableSerializer() { return committableSerializer; @@ -268,19 +274,19 @@ public class TestSinkV2<InputT> implements Sink<InputT> { return withLineage.map(old -> old + "Transformed"); } }) - .returns(CommittableMessageTypeInfo.of(StringSerializer::new)); + .returns(CommittableMessageTypeInfo.of(() -> COMMITTABLE_SERIALIZER)); } @Override public SimpleVersionedSerializer<String> getWriteResultSerializer() { - return new StringSerializer(); + return new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE); } } private static class TestStatefulSinkV2<InputT> extends TestSinkV2WithPostCommitTopology<InputT> - implements SupportsWriterState<InputT, String>, + implements SupportsWriterState<InputT, Integer>, SupportsWriterState.WithCompatibleState { - private String compatibleState; + private final String compatibleState; public TestStatefulSinkV2( DefaultStatefulSinkWriter<InputT> writer, @@ -297,8 +303,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> { } @Override - public StatefulSinkWriter<InputT, String> restoreWriter( - WriterInitContext context, Collection<String> recoveredState) { + public StatefulSinkWriter<InputT, Integer> restoreWriter( + WriterInitContext context, Collection<Integer> recoveredState) { DefaultStatefulSinkWriter<InputT> statefulWriter = (DefaultStatefulSinkWriter) getWriter(); @@ -307,8 +313,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> { } @Override - public SimpleVersionedSerializer<String> getWriterStateSerializer() { - return new StringSerializer(); + public SimpleVersionedSerializer<Integer> getWriterStateSerializer() { + return WRITER_SERIALIZER; } @Override @@ -326,6 +332,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> { protected List<Watermark> watermarks; + public long lastCheckpointId = -1; + protected DefaultSinkWriter() { this.elements = new ArrayList<>(); this.watermarks = new ArrayList<>(); @@ -380,15 +388,27 @@ public class TestSinkV2<InputT> implements Sink<InputT> { */ protected static class DefaultStatefulSinkWriter<InputT> extends DefaultCommittingSinkWriter<InputT> - implements StatefulSinkWriter<InputT, String> { + implements StatefulSinkWriter<InputT, Integer> { + private int recordCount = 0; @Override - public List<String> snapshotState(long checkpointId) throws IOException { - return elements; + public void write(InputT element, Context context) { + super.write(element, context); + recordCount++; + } + + public int getRecordCount() { + return recordCount; } - protected void restore(Collection<String> recoveredState) { - this.elements = new ArrayList<>(recoveredState); + @Override + public List<Integer> snapshotState(long checkpointId) throws IOException { + lastCheckpointId = checkpointId; + return Collections.singletonList(recordCount); + } + + protected void restore(Collection<Integer> recoveredState) { + this.recordCount = recoveredState.isEmpty() ? 0 : recoveredState.iterator().next(); } } @@ -464,29 +484,4 @@ public class TestSinkV2<InputT> implements Sink<InputT> { }); } } - - /** - * We introduce this {@link StringSerializer} is because that all the fields of {@link - * TestSinkV2} should be serializable. - */ - public static class StringSerializer - implements SimpleVersionedSerializer<String>, Serializable { - - public static final StringSerializer INSTANCE = new StringSerializer(); - - @Override - public int getVersion() { - return SimpleVersionedStringSerializer.INSTANCE.getVersion(); - } - - @Override - public byte[] serialize(String obj) { - return SimpleVersionedStringSerializer.INSTANCE.serialize(obj); - } - - @Override - public String deserialize(int version, byte[] serialized) throws IOException { - return SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized); - } - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java index 6d4c3586f38..f817a6810af 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java @@ -33,8 +33,6 @@ class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase { TestSink.newBuilder() .setCommitter(committer) .setDefaultGlobalCommitter() - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) .build() .asV2(), () -> committer.successfulCommits); @@ -47,8 +45,6 @@ class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase { TestSink.newBuilder() .setCommitter(new TestSink.RetryOnceCommitter()) .setDefaultGlobalCommitter() - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) .build() .asV2(), () -> 0); @@ -59,12 +55,7 @@ class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase { ForwardingCommitter committer = new ForwardingCommitter(); return new SinkAndCounters( (SupportsCommitter<String>) - TestSink.newBuilder() - .setCommitter(committer) - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) - .build() - .asV2(), + TestSink.newBuilder().setCommitter(committer).build().asV2(), () -> committer.successfulCommits); } @@ -78,6 +69,6 @@ class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase { } @Override - public void close() throws Exception {} + public void close() {} } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java index 5af5ac5a679..4e320816aea 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java @@ -18,65 +18,46 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; -import java.util.Collections; import java.util.List; class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase { - @Override - SinkAndSuppliers sinkWithoutCommitter() { + InspectableSink sinkWithoutCommitter() { TestSink.DefaultSinkWriter<Integer> sinkWriter = new TestSink.DefaultSinkWriter<>(); - return new SinkAndSuppliers( - TestSink.newBuilder().setWriter(sinkWriter).build().asV2(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - () -> new TestSink.StringCommittableSerializer()); + return new InspectableSink(TestSink.newBuilder().setWriter(sinkWriter).build()); } @Override - SinkAndSuppliers sinkWithCommitter() { + InspectableSink sinkWithCommitter() { TestSink.DefaultSinkWriter<Integer> sinkWriter = new TestSink.DefaultSinkWriter<>(); - return new SinkAndSuppliers( - TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - () -> new TestSink.StringCommittableSerializer()); + return new InspectableSink( + TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build()); } @Override - SinkAndSuppliers sinkWithTimeBasedWriter() { + InspectableSink sinkWithTimeBasedWriter() { TestSink.DefaultSinkWriter<Integer> sinkWriter = new TimeBasedBufferingSinkWriter(); - return new SinkAndSuppliers( - TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> -1, - () -> new TestSink.StringCommittableSerializer()); + return new InspectableSink( + TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build()); } @Override - SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String stateName) { - SnapshottingBufferingSinkWriter sinkWriter = new SnapshottingBufferingSinkWriter(); + InspectableSink sinkWithState(boolean withState, String stateName) { + TestSink.DefaultSinkWriter<Integer> sinkWriter = new TestSink.DefaultSinkWriter<>(); TestSink.Builder<Integer> builder = TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter(); if (withState) { builder.withWriterState(); + if (stateName != null) { + builder.setCompatibleStateNames(stateName); + } } - if (stateName != null) { - builder.setCompatibleStateNames(stateName); - } - return new SinkAndSuppliers( - builder.build().asV2(), - () -> sinkWriter.elements, - () -> sinkWriter.watermarks, - () -> sinkWriter.lastCheckpointId, - () -> new TestSink.StringCommittableSerializer()); + return new InspectableSink(builder.build()); } private static class TimeBasedBufferingSinkWriter extends TestSink.DefaultSinkWriter<Integer> @@ -103,30 +84,33 @@ class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase { } } - private static class SnapshottingBufferingSinkWriter - extends TestSink.DefaultSinkWriter<Integer> { - public static final int NOT_SNAPSHOTTED = -1; - long lastCheckpointId = NOT_SNAPSHOTTED; + class InspectableSink + extends AbstractInspectableSink<org.apache.flink.api.connector.sink2.Sink<Integer>> { + private final TestSink<Integer> sink; + + InspectableSink(TestSink<Integer> sink) { + super(sink.asV2()); + this.sink = sink; + } @Override - public List<String> snapshotState(long checkpointId) { - lastCheckpointId = checkpointId; - return elements; + public long getLastCheckpointId() { + return sink.getWriter().lastCheckpointId; } @Override - void restoredFrom(List<String> states) { - this.elements = new ArrayList<>(states); + public List<String> getRecordsOfCurrentCheckpoint() { + return sink.getWriter().elements; } @Override - public List<String> prepareCommit(boolean flush) { - if (!flush) { - return Collections.emptyList(); - } - List<String> result = elements; - elements = new ArrayList<>(); - return result; + public List<Watermark> getWatermarks() { + return sink.getWriter().watermarks; + } + + @Override + public int getRecordCountFromState() { + return sink.getWriter().getRecordCount(); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java index 5bb25605d75..7aabbcb4e49 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java @@ -26,10 +26,10 @@ import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerAdapter; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableSet; @@ -60,6 +60,10 @@ import static org.assertj.core.api.Assertions.assertThat; @Deprecated public class TestSinkV2<InputT> implements Sink<InputT> { + public static final SimpleVersionedSerializerAdapter<String> COMMITTABLE_SERIALIZER = + org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.COMMITTABLE_SERIALIZER; + public static final SimpleVersionedSerializerAdapter<Integer> WRITER_SERIALIZER = + org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.WRITER_SERIALIZER; private final DefaultSinkWriter<InputT> writer; private TestSinkV2(DefaultSinkWriter<InputT> writer) { @@ -71,7 +75,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { return writer; } - DefaultSinkWriter<InputT> getWriter() { + public DefaultSinkWriter<InputT> getWriter() { return writer; } @@ -83,7 +87,6 @@ public class TestSinkV2<InputT> implements Sink<InputT> { public static class Builder<InputT> { private DefaultSinkWriter<InputT> writer = null; private DefaultCommitter committer; - private SimpleVersionedSerializer<String> committableSerializer; private boolean withPostCommitTopology = false; private boolean withWriterState = false; private String compatibleStateNames; @@ -98,22 +101,14 @@ public class TestSinkV2<InputT> implements Sink<InputT> { return this; } - public Builder<InputT> setCommittableSerializer( - SimpleVersionedSerializer<String> committableSerializer) { - this.committableSerializer = committableSerializer; - return this; - } - public Builder<InputT> setDefaultCommitter() { this.committer = new DefaultCommitter(); - this.committableSerializer = StringSerializer.INSTANCE; return this; } public Builder<InputT> setDefaultCommitter( Supplier<Queue<Committer.CommitRequest<String>>> queueSupplier) { this.committer = new DefaultCommitter(queueSupplier); - this.committableSerializer = StringSerializer.INSTANCE; return this; } @@ -146,7 +141,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { if (!withPostCommitTopology) { // TwoPhaseCommittingSink with a stateless writer and a committer return new TestSinkV2TwoPhaseCommittingSink<>( - writer, committableSerializer, committer); + writer, COMMITTABLE_SERIALIZER, committer); } else { if (withWriterState) { // TwoPhaseCommittingSink with a stateful writer and a committer and post @@ -156,7 +151,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { "Please provide a DefaultStatefulSinkWriter instance"); return new TestStatefulSinkV2( (DefaultStatefulSinkWriter) writer, - committableSerializer, + COMMITTABLE_SERIALIZER, committer, compatibleStateNames); } else { @@ -167,7 +162,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { "Please provide a DefaultCommittingSinkWriter instance"); return new TestSinkV2WithPostCommitTopology<>( (DefaultCommittingSinkWriter) writer, - committableSerializer, + COMMITTABLE_SERIALIZER, committer); } } @@ -225,7 +220,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> { } private static class TestStatefulSinkV2<InputT> extends TestSinkV2WithPostCommitTopology<InputT> - implements StatefulSink<InputT, String>, StatefulSink.WithCompatibleState { + implements StatefulSink<InputT, Integer>, StatefulSink.WithCompatibleState { private String compatibleState; public TestStatefulSinkV2( @@ -243,8 +238,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> { } @Override - public StatefulSinkWriter<InputT, String> restoreWriter( - InitContext context, Collection<String> recoveredState) { + public StatefulSinkWriter<InputT, Integer> restoreWriter( + InitContext context, Collection<Integer> recoveredState) { DefaultStatefulSinkWriter<InputT> statefulWriter = (DefaultStatefulSinkWriter) getWriter(); @@ -253,8 +248,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> { } @Override - public SimpleVersionedSerializer<String> getWriterStateSerializer() { - return new StringSerializer(); + public SimpleVersionedSerializer<Integer> getWriterStateSerializer() { + return WRITER_SERIALIZER; } @Override @@ -272,6 +267,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> { public List<Watermark> watermarks; + public long lastCheckpointId = -1; + public DefaultSinkWriter() { this.elements = new ArrayList<>(); this.watermarks = new ArrayList<>(); @@ -327,15 +324,27 @@ public class TestSinkV2<InputT> implements Sink<InputT> { */ public static class DefaultStatefulSinkWriter<InputT> extends DefaultCommittingSinkWriter<InputT> - implements StatefulSink.StatefulSinkWriter<InputT, String> { + implements StatefulSink.StatefulSinkWriter<InputT, Integer> { + private int recordCount; @Override - public List<String> snapshotState(long checkpointId) throws IOException { - return elements; + public List<Integer> snapshotState(long checkpointId) throws IOException { + lastCheckpointId = checkpointId; + return Collections.singletonList(recordCount); } - protected void restore(Collection<String> recoveredState) { - this.elements = new ArrayList<>(recoveredState); + @Override + public void write(InputT element, Context context) { + super.write(element, context); + recordCount++; + } + + public int getRecordCount() { + return recordCount; + } + + protected void restore(Collection<Integer> recoveredState) { + this.recordCount = recoveredState.isEmpty() ? 0 : recoveredState.iterator().next(); } } @@ -411,29 +420,4 @@ public class TestSinkV2<InputT> implements Sink<InputT> { }); } } - - /** - * We introduce this {@link StringSerializer} is because that all the fields of {@link - * TestSinkV2} should be serializable. - */ - public static class StringSerializer - implements SimpleVersionedSerializer<String>, Serializable { - - public static final StringSerializer INSTANCE = new StringSerializer(); - - @Override - public int getVersion() { - return SimpleVersionedStringSerializer.INSTANCE.getVersion(); - } - - @Override - public byte[] serialize(String obj) { - return SimpleVersionedStringSerializer.INSTANCE.serialize(obj); - } - - @Override - public String deserialize(int version, byte[] serialized) throws IOException { - return SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized); - } - } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java index f5b31cb66a7..662568d4afe 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java @@ -492,10 +492,7 @@ class CommonExecSinkITCase { private static TestSink<RowData> buildRecordWriterTestSink( TestSink.DefaultSinkWriter<RowData> writer) { - return TestSink.newBuilder() - .setWriter(writer) - .setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE) - .build(); + return TestSink.newBuilder().setWriter(writer).build(); } private TableFactoryHarness.SinkBase buildRuntimeSinkProvider( diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java index bbc08c9798e..0a3de995c9a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java @@ -226,7 +226,6 @@ public class SinkITCase extends AbstractTestBaseJUnit4 { stream.sinkTo( TestSink.newBuilder() - .setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE) .setGlobalCommitter( (Supplier<Queue<String>> & Serializable) () -> GLOBAL_COMMIT_QUEUE) .build()); @@ -252,8 +251,6 @@ public class SinkITCase extends AbstractTestBaseJUnit4 { env.fromData(SOURCE_DATA) .sinkTo( TestSink.newBuilder() - .setCommittableSerializer( - TestSink.StringCommittableSerializer.INSTANCE) .setGlobalCommitter( (Supplier<Queue<String>> & Serializable) () -> GLOBAL_COMMIT_QUEUE) diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java index 3dea88bcf65..95a0e4c1bec 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java @@ -147,7 +147,6 @@ public class SinkV2MetricsDeprecatedITCase extends TestLogger { .sinkTo( TestSinkV2.<Long>newBuilder() .setCommitter(new MetricCommitter(beforeLatch, afterLatch)) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) .build()) .name(TEST_SINK_NAME); JobClient jobClient = env.executeAsync(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java index bb227552a3e..4a67d601a0c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java @@ -140,9 +140,8 @@ public class SinkV2MetricsITCase extends TestLogger { env.fromSequence(0, numCommittables - 1) .returns(BasicTypeInfo.LONG_TYPE_INFO) .sinkTo( - TestSinkV2.<Long>newBuilder() - .setCommitter(new MetricCommitter(beforeLatch, afterLatch)) - .setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE) + (TestSinkV2.<Long>newBuilder() + .setCommitter(new MetricCommitter(beforeLatch, afterLatch))) .build()) .name(TEST_SINK_NAME); JobClient jobClient = env.executeAsync();
