This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0e44feb120a73c36116b2bd6b89e4f1676d7266f Author: Grzegorz KoĊakowski <[email protected]> AuthorDate: Wed Feb 21 11:11:53 2018 +0100 [BEAM-2393] Make BoundedSource fault-tolerant --- .../UnboundedReadFromBoundedSource.java | 5 +- .../flink/FlinkStreamingTransformTranslators.java | 138 ++++++++++- .../streaming/io/BoundedSourceWrapper.java | 259 --------------------- .../flink/streaming/BoundedSourceRestoreTest.java | 236 +++++++++++++++++++ 4 files changed, 367 insertions(+), 271 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java index 09acc82..88119d1 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -169,8 +169,11 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); } + /** + * A marker representing the progress and state of an {@link BoundedToUnboundedSourceAdapter}. + */ @VisibleForTesting - static class Checkpoint<T> implements UnboundedSource.CheckpointMark { + public static class Checkpoint<T> implements UnboundedSource.CheckpointMark { private final @Nullable List<TimestampedValue<T>> residualElements; private final @Nullable BoundedSource<T> residualSource; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 970ece1..74ca168 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -42,6 +42,7 @@ import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; @@ -51,7 +52,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKey import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.Coder; @@ -87,18 +87,27 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; @@ -276,8 +285,7 @@ class FlinkStreamingTransformTranslators { PCollection<T> output = context.getOutput(transform); TypeInformation<WindowedValue<T>> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - + context.getTypeInfo(context.getOutput(transform)); BoundedSource<T> rawSource; try { @@ -289,24 +297,26 @@ class FlinkStreamingTransformTranslators { } String fullName = getCurrentTransformName(context); + UnboundedSource<T, ?> adaptedRawSource = new BoundedToUnboundedSourceAdapter<>(rawSource); DataStream<WindowedValue<T>> source; try { - BoundedSourceWrapper<T> sourceWrapper = - new BoundedSourceWrapper<>( + UnboundedSourceWrapperNoValueWithRecordId<T, ?> sourceWrapper = + new UnboundedSourceWrapperNoValueWithRecordId<>( + new UnboundedSourceWrapper<>( fullName, context.getPipelineOptions(), - rawSource, - context.getExecutionEnvironment().getParallelism()); + adaptedRawSource, + context.getExecutionEnvironment().getParallelism()) + ); source = context .getExecutionEnvironment() .addSource(sourceWrapper) - .name(fullName).uid(fullName) + .name(fullName) + .uid(fullName) .returns(outputTypeInfo); } catch (Exception e) { - throw new RuntimeException( - "Error while translating BoundedSource: " + rawSource, e); + throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); } - context.setOutputDataStream(output, source); } } @@ -1259,4 +1269,110 @@ class FlinkStreamingTransformTranslators { return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN; } } + + /** + * Wrapper for {@link UnboundedSourceWrapper}, which simplifies output type, namely, removes + * {@link ValueWithRecordId}. + */ + private static class UnboundedSourceWrapperNoValueWithRecordId< + OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> + extends RichParallelSourceFunction<WindowedValue<OutputT>> + implements ProcessingTimeCallback, StoppableFunction, + CheckpointListener, CheckpointedFunction { + + private final UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper; + + private UnboundedSourceWrapperNoValueWithRecordId( + UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper) { + this.unboundedSourceWrapper = unboundedSourceWrapper; + } + + @Override + public void open(Configuration parameters) throws Exception { + unboundedSourceWrapper.setRuntimeContext(getRuntimeContext()); + unboundedSourceWrapper.open(parameters); + } + + @Override + public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception { + unboundedSourceWrapper.run(new SourceContextWrapper(ctx)); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + unboundedSourceWrapper.initializeState(context); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + unboundedSourceWrapper.snapshotState(context); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + unboundedSourceWrapper.notifyCheckpointComplete(checkpointId); + } + + @Override + public void stop() { + unboundedSourceWrapper.stop(); + } + + @Override + public void cancel() { + unboundedSourceWrapper.cancel(); + } + + @Override + public void onProcessingTime(long timestamp) throws Exception { + unboundedSourceWrapper.onProcessingTime(timestamp); + } + + private final class SourceContextWrapper implements + SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> { + + private final SourceContext<WindowedValue<OutputT>> ctx; + + private SourceContextWrapper(SourceContext<WindowedValue<OutputT>> ctx) { + this.ctx = ctx; + } + + @Override + public void collect(WindowedValue<ValueWithRecordId<OutputT>> element) { + OutputT originalValue = element.getValue().getValue(); + WindowedValue<OutputT> output = WindowedValue + .of(originalValue, element.getTimestamp(), element.getWindows(), element.getPane()); + ctx.collect(output); + } + + @Override + public void collectWithTimestamp(WindowedValue<ValueWithRecordId<OutputT>> element, + long timestamp) { + OutputT originalValue = element.getValue().getValue(); + WindowedValue<OutputT> output = WindowedValue + .of(originalValue, element.getTimestamp(), element.getWindows(), element.getPane()); + ctx.collectWithTimestamp(output, timestamp); + } + + @Override + public void emitWatermark(Watermark mark) { + ctx.emitWatermark(mark); + } + + @Override + public void markAsTemporarilyIdle() { + ctx.markAsTemporarilyIdle(); + } + + @Override + public Object getCheckpointLock() { + return ctx.getCheckpointLock(); + } + + @Override + public void close() { + ctx.close(); + } + } + } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java deleted file mode 100644 index 6db5426..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - -import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.runners.core.construction.SerializablePipelineOptions; -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; -import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.flink.api.common.functions.StoppableFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink Source. - */ -public class BoundedSourceWrapper<OutputT> - extends RichParallelSourceFunction<WindowedValue<OutputT>> - implements StoppableFunction { - - private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class); - - private String stepName; - /** - * Keep the options so that we can initialize the readers. - */ - private final SerializablePipelineOptions serializedOptions; - - /** - * The split sources. We split them in the constructor to ensure that all parallel - * sources are consistent about the split sources. - */ - private List<? extends BoundedSource<OutputT>> splitSources; - - /** - * Make it a field so that we can access it in {@link #close()}. - */ - private transient List<BoundedSource.BoundedReader<OutputT>> readers; - - /** - * Initialize here and not in run() to prevent races where we cancel a job before run() is - * ever called or run() is called after cancel(). - */ - private volatile boolean isRunning = true; - - @SuppressWarnings("unchecked") - public BoundedSourceWrapper( - String stepName, - PipelineOptions pipelineOptions, - BoundedSource<OutputT> source, - int parallelism) throws Exception { - this.stepName = stepName; - this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); - - long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism; - - // get the splits early. we assume that the generated splits are stable, - // this is necessary so that the mapping of state to source is correct - // when restoring - splitSources = source.split(desiredBundleSize, pipelineOptions); - } - - @Override - public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception { - - // figure out which split sources we're responsible for - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); - - List<BoundedSource<OutputT>> localSources = new ArrayList<>(); - - for (int i = 0; i < splitSources.size(); i++) { - if (i % numSubtasks == subtaskIndex) { - localSources.add(splitSources.get(i)); - } - } - - LOG.info("Bounded Flink Source {}/{} is reading from sources: {}", - subtaskIndex, - numSubtasks, - localSources); - - FlinkMetricContainer metricContainer = new FlinkMetricContainer(getRuntimeContext()); - - ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker = - new ReaderInvocationUtil<>( - stepName, - serializedOptions.get(), - metricContainer); - - readers = new ArrayList<>(); - // initialize readers from scratch - for (BoundedSource<OutputT> source : localSources) { - readers.add(source.createReader(serializedOptions.get())); - } - - if (readers.size() == 1) { - // the easy case, we just read from one reader - BoundedSource.BoundedReader<OutputT> reader = readers.get(0); - - boolean dataAvailable = readerInvoker.invokeStart(reader); - if (dataAvailable) { - emitElement(ctx, reader); - } - - while (isRunning) { - dataAvailable = readerInvoker.invokeAdvance(reader); - - if (dataAvailable) { - emitElement(ctx, reader); - } else { - break; - } - } - } else { - // a bit more complicated, we are responsible for several readers - // loop through them and sleep if none of them had any data - - int currentReader = 0; - - // start each reader and emit data if immediately available - for (BoundedSource.BoundedReader<OutputT> reader : readers) { - boolean dataAvailable = readerInvoker.invokeStart(reader); - if (dataAvailable) { - emitElement(ctx, reader); - } - } - - // a flag telling us whether any of the readers had data - // if no reader had data, sleep for bit - boolean hadData = false; - while (isRunning && !readers.isEmpty()) { - BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader); - boolean dataAvailable = readerInvoker.invokeAdvance(reader); - - if (dataAvailable) { - emitElement(ctx, reader); - hadData = true; - } else { - readers.remove(currentReader); - currentReader--; - if (readers.isEmpty()) { - break; - } - } - - currentReader = (currentReader + 1) % readers.size(); - if (currentReader == 0 && !hadData) { - Thread.sleep(50); - } else if (currentReader == 0) { - hadData = false; - } - } - - } - - // emit final Long.MAX_VALUE watermark, just to be sure - ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); - - FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class); - if (!options.isShutdownSourcesOnFinalWatermark()) { - // do nothing, but still look busy ... - // we can't return here since Flink requires that all operators stay up, - // otherwise checkpointing would not work correctly anymore - // - // See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue - - // wait until this is canceled - final Object waitLock = new Object(); - while (isRunning) { - try { - // Flink will interrupt us at some point - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (waitLock) { - // don't wait indefinitely, in case something goes horribly wrong - waitLock.wait(1000); - } - } catch (InterruptedException e) { - if (!isRunning) { - // restore the interrupted state, and fall through the loop - Thread.currentThread().interrupt(); - } - } - } - } - } - - /** - * Emit the current element from the given Reader. The reader is guaranteed to have data. - */ - private void emitElement( - SourceContext<WindowedValue<OutputT>> ctx, - BoundedSource.BoundedReader<OutputT> reader) { - // make sure that reader state update and element emission are atomic - // with respect to snapshots - synchronized (ctx.getCheckpointLock()) { - - OutputT item = reader.getCurrent(); - Instant timestamp = reader.getCurrentTimestamp(); - - WindowedValue<OutputT> windowedValue = - WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); - ctx.collectWithTimestamp(windowedValue, timestamp.getMillis()); - } - } - - @Override - public void close() throws Exception { - super.close(); - if (readers != null) { - for (BoundedSource.BoundedReader<OutputT> reader: readers) { - reader.close(); - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public void stop() { - this.isRunning = false; - } - - /** - * Visible so that we can check this in tests. Must not be used for anything else. - */ - @VisibleForTesting - public List<? extends BoundedSource<OutputT>> getSplitSources() { - return splitSources; - } -} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java new file mode 100644 index 0000000..7701602 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.ValueWithRecordId; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.util.OutputTag; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Test for bounded source restore in streaming mode. + */ +@RunWith(Parameterized.class) +public class BoundedSourceRestoreTest { + + private final int numTasks; + private final int numSplits; + + public BoundedSourceRestoreTest(int numTasks, int numSplits) { + this.numTasks = numTasks; + this.numSplits = numSplits; + } + + @Parameterized.Parameters + public static Collection<Object[]> data() { + /* Parameters for initializing the tests: {numTasks, numSplits} */ + return Arrays.asList(new Object[][]{ + {1, 1}, + {1, 2}, + {1, 4}, + }); + } + + @Test + public void testRestore() throws Exception { + final int numElements = 102; + final int firstBatchSize = 23; + final int secondBatchSize = numElements - firstBatchSize; + final Set<Long> emittedElements = new HashSet<>(); + final Object checkpointLock = new Object(); + PipelineOptions options = PipelineOptionsFactory.create(); + + // bounded source wrapped as unbounded source + BoundedSource<Long> source = CountingSource.upTo(numElements); + BoundedToUnboundedSourceAdapter<Long> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(source); + UnboundedSourceWrapper<Long, Checkpoint<Long>> flinkWrapper = new UnboundedSourceWrapper<>( + "stepName", options, unboundedSource, numSplits); + + StreamSource<WindowedValue<ValueWithRecordId<Long>>, + UnboundedSourceWrapper<Long, Checkpoint<Long>>> sourceOperator = + new StreamSource<>(flinkWrapper); + + AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<Long>>> testHarness = + new AbstractStreamOperatorTestHarness<>(sourceOperator, + numTasks /* max parallelism */, + numTasks /* parallelism */, + 0 /* subtask index */); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + + // the first half of elements is read + boolean readFirstBatchOfElements = false; + try { + testHarness.open(); + sourceOperator.run(checkpointLock, + new TestStreamStatusMaintainer(), + new PartialCollector<>(emittedElements, firstBatchSize) + ); + } catch (SuccessException e) { + // success + readFirstBatchOfElements = true; + } + assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements); + + // draw a snapshot + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + + // finalize checkpoint + final ArrayList<Integer> finalizeList = new ArrayList<>(); + TestCountingSource.setFinalizeTracker(finalizeList); + testHarness.notifyOfCompletedCheckpoint(0); + + // create a completely new source but restore from the snapshot + BoundedSource<Long> restoredSource = CountingSource.upTo(numElements); + BoundedToUnboundedSourceAdapter<Long> restoredUnboundedSource = + new BoundedToUnboundedSourceAdapter<>(restoredSource); + UnboundedSourceWrapper<Long, Checkpoint<Long>> restoredFlinkWrapper = + new UnboundedSourceWrapper<>("stepName", options, restoredUnboundedSource, numSplits); + StreamSource<WindowedValue<ValueWithRecordId<Long>>, + UnboundedSourceWrapper<Long, Checkpoint<Long>>> restoredSourceOperator = + new StreamSource<>(restoredFlinkWrapper); + + // set parallelism to 1 to ensure that our testing operator gets all checkpointed state + AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<Long>>> + restoredTestHarness = + new AbstractStreamOperatorTestHarness<>( + restoredSourceOperator, + numTasks /* max parallelism */, + 1 /* parallelism */, + 0 /* subtask index */); + + restoredTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + + // restore snapshot + restoredTestHarness.initializeState(snapshot); + + // run again and verify that we see the other elements + boolean readSecondBatchOfElements = false; + try { + restoredTestHarness.open(); + restoredSourceOperator.run(checkpointLock, + new TestStreamStatusMaintainer(), + new PartialCollector<>(emittedElements, secondBatchSize) + ); + } catch (SuccessException e) { + // success + readSecondBatchOfElements = true; + } + assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements); + + // verify that we saw all NUM_ELEMENTS elements + assertTrue(emittedElements.size() == numElements); + } + + /** + * A special {@link RuntimeException} that we throw to signal that the test was successful. + */ + private static class SuccessException extends RuntimeException { + + } + + /** + * A collector which consumes only specified number of elements. + */ + private static class PartialCollector<T> + implements Output<StreamRecord<WindowedValue<ValueWithRecordId<T>>>> { + + private final Set<T> emittedElements; + private final int elementsToConsumeLimit; + + private int count = 0; + + private PartialCollector(Set<T> emittedElements, int elementsToConsumeLimit) { + this.emittedElements = emittedElements; + this.elementsToConsumeLimit = elementsToConsumeLimit; + } + + @Override + public void emitWatermark(Watermark watermark) { + + } + + @Override + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) { + collect((StreamRecord) streamRecord); + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + + } + + @Override + public void collect(StreamRecord<WindowedValue<ValueWithRecordId<T>>> record) { + emittedElements.add(record.getValue().getValue().getValue()); + count++; + if (count >= elementsToConsumeLimit) { + throw new SuccessException(); + } + } + + @Override + public void close() { + + } + } + + private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer { + + StreamStatus currentStreamStatus = StreamStatus.ACTIVE; + + @Override + public void toggleStreamStatus(StreamStatus streamStatus) { + if (!currentStreamStatus.equals(streamStatus)) { + currentStreamStatus = streamStatus; + } + } + + @Override + public StreamStatus getStreamStatus() { + return currentStreamStatus; + } + } + +} -- To stop receiving notification emails like this one, please contact [email protected].
