This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 836d422caa5d497682bae12d74b05910c6863869 Author: GuoWei Ma <[email protected]> AuthorDate: Mon Nov 23 12:17:36 2020 +0800 [FLINK-20337] Let StatefulSinkWriterOperator load StreamingFileSink's state To allow stateful migration from `StreamingFileSink` to `FileSink` we let the `StatefulSinkWriterOperator` load the `StreamingFileSink`'s state ("bucket-state") if it exists. --- .../operators/sink/StatefulSinkWriterOperator.java | 42 +++++++- .../sink/StatefulSinkWriterOperatorFactory.java | 18 +++- .../translators/SinkTransformationTranslator.java | 15 ++- .../sink/StatefulSinkWriterOperatorTest.java | 109 +++++++++++++++++++++ 4 files changed, 179 insertions(+), 5 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java index 3751a3e..65b395c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java @@ -31,8 +31,13 @@ import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.CollectionUtil; +import javax.annotation.Nullable; + +import java.util.ArrayList; import java.util.List; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link * SinkWriter Writers} that have state. @@ -54,18 +59,28 @@ final class StatefulSinkWriterOperator<InputT, CommT, WriterStateT> extends Abst /** The writer operator's state serializer. */ private final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer; + /** The previous sink operator's state name. */ + @Nullable + private final String previousSinkStateName; + // ------------------------------- runtime fields --------------------------------------- + /** The previous sink operator's state. */ + @Nullable + private ListState<WriterStateT> previousSinkState; + /** The operator's state. */ private ListState<WriterStateT> writerState; StatefulSinkWriterOperator( + @Nullable final String previousSinkStateName, final ProcessingTimeService processingTimeService, final Sink<InputT, CommT, WriterStateT, ?> sink, final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer) { super(processingTimeService); this.sink = sink; this.writerStateSimpleVersionedSerializer = writerStateSimpleVersionedSerializer; + this.previousSinkStateName = previousSinkStateName; } @Override @@ -74,17 +89,40 @@ final class StatefulSinkWriterOperator<InputT, CommT, WriterStateT> extends Abst final ListState<byte[]> rawState = context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC); writerState = new SimpleVersionedListState<>(rawState, writerStateSimpleVersionedSerializer); + + if (previousSinkStateName != null) { + final ListStateDescriptor<byte[]> preSinkStateDesc = new ListStateDescriptor<>( + previousSinkStateName, + BytePrimitiveArraySerializer.INSTANCE); + + final ListState<byte[]> preRawState = context + .getOperatorStateStore() + .getListState(preSinkStateDesc); + this.previousSinkState = new SimpleVersionedListState<>( + preRawState, + writerStateSimpleVersionedSerializer); + } } @SuppressWarnings("unchecked") @Override public void snapshotState(StateSnapshotContext context) throws Exception { writerState.update((List<WriterStateT>) sinkWriter.snapshotState()); + if (previousSinkState != null) { + previousSinkState.clear(); + } } @Override SinkWriter<InputT, CommT, WriterStateT> createWriter() throws Exception { - final List<WriterStateT> committables = CollectionUtil.iterableToList(writerState.get()); - return sink.createWriter(createInitContext(), committables); + final List<WriterStateT> writerStates = CollectionUtil.iterableToList(writerState.get()); + final List<WriterStateT> states = new ArrayList<>(writerStates); + if (previousSinkStateName != null) { + checkNotNull(previousSinkState); + final List<WriterStateT> previousSinkStates = CollectionUtil.iterableToList( + previousSinkState.get()); + states.addAll(previousSinkStates); + } + return sink.createWriter(createInitContext(), states); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorFactory.java index 2741e37..95da487 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorFactory.java @@ -23,6 +23,8 @@ import org.apache.flink.api.connector.sink.SinkWriter; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import javax.annotation.Nullable; + /** * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link * StatefulSinkWriterOperator}. @@ -35,13 +37,27 @@ public final class StatefulSinkWriterOperatorFactory<InputT, CommT, WriterStateT private final Sink<InputT, CommT, WriterStateT, ?> sink; + @Nullable + private final String previousSinkStateName; + public StatefulSinkWriterOperatorFactory(Sink<InputT, CommT, WriterStateT, ?> sink) { + this(sink, null); + } + + public StatefulSinkWriterOperatorFactory( + Sink<InputT, CommT, WriterStateT, ?> sink, + @Nullable String previousSinkStateName) { this.sink = sink; + this.previousSinkStateName = previousSinkStateName; } @Override AbstractSinkWriterOperator<InputT, CommT> createWriterOperator(ProcessingTimeService processingTimeService) { - return new StatefulSinkWriterOperator<>(processingTimeService, sink, sink.getWriterStateSerializer().get()); + return new StatefulSinkWriterOperator<>( + previousSinkStateName, + processingTimeService, + sink, + sink.getWriterStateSerializer().get()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index ddd5e06..9e643de 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -63,6 +63,9 @@ public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCom protected static final Logger LOG = LoggerFactory.getLogger(SinkTransformationTranslator.class); + // Currently we only support load the state from streaming file sink; + private static final String PREVIOUS_SINK_STATE_NAME = "bucket-states"; + @Override public Collection<Integer> translateForBatch( SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation, @@ -75,6 +78,7 @@ public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCom internalTranslate( transformation, parallelism, + PREVIOUS_SINK_STATE_NAME, new BatchCommitterOperatorFactory<>(transformation.getSink()), 1, 1, @@ -101,6 +105,7 @@ public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCom internalTranslate( transformation, parallelism, + PREVIOUS_SINK_STATE_NAME, new StreamingCommitterOperatorFactory<>(transformation.getSink()), parallelism, transformation.getMaxParallelism(), @@ -117,9 +122,9 @@ public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCom /** * Add the sink operators to the stream graph. - * * @param sinkTransformation The sink transformation that committer and global committer belongs to. * @param writerParallelism The parallelism of the writer operator. + * @param previousSinkStateName The state name of previous sink's state. * @param committerFactory The committer operator factory. * @param committerParallelism The parallelism of the committer operator. * @param committerMaxParallelism The max parallelism of the committer operator. @@ -128,6 +133,7 @@ public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCom private void internalTranslate( SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, int writerParallelism, + @SuppressWarnings("SameParameterValue") @Nullable String previousSinkStateName, OneInputStreamOperatorFactory<CommT, CommT> committerFactory, int committerParallelism, int committerMaxParallelism, @@ -139,6 +145,7 @@ public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCom final int writerId = addWriter( sinkTransformation, writerParallelism, + previousSinkStateName, context); final int committerId = addCommitter( @@ -161,12 +168,14 @@ public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCom * * @param sinkTransformation The transformation that the writer belongs to * @param parallelism The parallelism of the writer + * @param previousSinkStateName The state name of previous sink's state. * * @return The stream node id of the writer */ private int addWriter( SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, int parallelism, + @Nullable String previousSinkStateName, Context context) { final boolean hasState = sinkTransformation .getSink() @@ -180,7 +189,9 @@ public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCom final TypeInformation<InputT> inputTypeInfo = input.getOutputType(); final StreamOperatorFactory<CommT> writer = - hasState ? new StatefulSinkWriterOperatorFactory<>(sinkTransformation.getSink()) : new StatelessSinkWriterOperatorFactory<>( + hasState ? new StatefulSinkWriterOperatorFactory<>( + sinkTransformation.getSink(), + previousSinkStateName) : new StatelessSinkWriterOperatorFactory<>( sinkTransformation.getSink()); final String prefix = "Sink Writer:"; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorTest.java index bc7c79b..35a93a4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorTest.java @@ -18,18 +18,31 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink.SinkWriter; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; import org.junit.Test; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; /** @@ -90,6 +103,70 @@ public class StatefulSinkWriterOperatorTest extends SinkWriterOperatorTestBase { new StreamRecord<>(Tuple3.of(2, initialTime + 2, initialTime).toString()))); } + @Test + public void loadPreviousSinkState() throws Exception { + //1. Build previous sink state + final List<String> previousSinkInputs = Arrays.asList("bit", "mention", "thick", "stick", "stir", + "easy", "sleep", "forth", "cost", "prompt"); + + final OneInputStreamOperatorTestHarness<String, String> previousSink = + new OneInputStreamOperatorTestHarness<>( + new DummySinkOperator(), + StringSerializer.INSTANCE); + + OperatorSubtaskState previousSinkState = TestHarnessUtil.buildSubtaskState( + previousSink, + previousSinkInputs); + + //2. Load previous sink state and verify the output + final OneInputStreamOperatorTestHarness<Integer, String> compatibleWriterOperator = + createCompatibleSinkOperator(); + + final List<StreamRecord<String>> expectedOutput1 = + previousSinkInputs.stream().map(StreamRecord::new).collect(Collectors.toList()); + expectedOutput1.add(new StreamRecord<>(Tuple3.of(1, 1, Long.MIN_VALUE).toString())); + + // load the state from previous sink + compatibleWriterOperator.initializeState(previousSinkState); + + compatibleWriterOperator.open(); + + compatibleWriterOperator.processElement(1, 1); + + // this will flush out the committables that were restored from previous sink + compatibleWriterOperator.endInput(); + + OperatorSubtaskState operatorStateWithoutPreviousState = compatibleWriterOperator.snapshot( + 1L, + 1L); + + compatibleWriterOperator.close(); + + assertThat( + compatibleWriterOperator.getOutput(), + containsInAnyOrder(expectedOutput1.toArray())); + + //3. Restore the sink without previous sink's state + final OneInputStreamOperatorTestHarness<Integer, String> restoredSinkOperator = + createCompatibleSinkOperator(); + final List<StreamRecord<String>> expectedOutput2 = + Arrays.asList( + new StreamRecord<>(Tuple3.of(2, 2, Long.MIN_VALUE).toString()), + new StreamRecord<>(Tuple3.of(3, 3, Long.MIN_VALUE).toString())); + + restoredSinkOperator.initializeState(operatorStateWithoutPreviousState); + + restoredSinkOperator.open(); + + restoredSinkOperator.processElement(2, 2); + restoredSinkOperator.processElement(3, 3); + + // this will flush out the committables that were restored + restoredSinkOperator.endInput(); + + assertThat(restoredSinkOperator.getOutput(), containsInAnyOrder(expectedOutput2.toArray())); + } + /** * A {@link SinkWriter} buffers elements and snapshots them when asked. */ @@ -105,4 +182,36 @@ public class StatefulSinkWriterOperatorTest extends SinkWriterOperatorTestBase { this.elements = states; } } + + static class DummySinkOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> { + + static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state"; + + static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor<>( + DUMMY_SINK_STATE_NAME, + BytePrimitiveArraySerializer.INSTANCE); + ListState<String> sinkState; + + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + sinkState = new SimpleVersionedListState<>(context + .getOperatorStateStore() + .getListState(SINK_STATE_DESC), TestSink.StringCommittableSerializer.INSTANCE); + } + + @Override + public void processElement(StreamRecord<String> element) throws Exception { + sinkState.add(element.getValue()); + } + } + + private OneInputStreamOperatorTestHarness<Integer, String> createCompatibleSinkOperator() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new StatefulSinkWriterOperatorFactory<>(TestSink + .newBuilder() + .setWriter(new SnapshottingBufferingSinkWriter()) + .withWriterState() + .build(), DummySinkOperator.DUMMY_SINK_STATE_NAME), + IntSerializer.INSTANCE); + } }
