Repository: incubator-beam Updated Branches: refs/heads/master 874ddef05 -> 4020e3645
[BEAM-103][BEAM-130] Make Flink Source Parallel and Checkpointed This also changes how the setParallelism option of FlinkPipelineOptions behaves. Now it defaults to either 1 or the default parallelism set in the Flink configuration. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/336e90fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/336e90fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/336e90fa Branch: refs/heads/master Commit: 336e90fa795885e670ff3a523a6fefa6033c2663 Parents: 874ddef Author: Aljoscha Krettek <[email protected]> Authored: Tue May 3 13:35:35 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Tue May 10 22:20:08 2016 +0200 ---------------------------------------------------------------------- .../flink/DefaultParallelismFactory.java | 39 +++ .../runners/flink/FlinkPipelineOptions.java | 2 +- .../FlinkStreamingTransformTranslators.java | 18 +- .../streaming/io/UnboundedSourceWrapper.java | 337 ++++++++++++++++--- .../flink/streaming/TestCountingSource.java | 256 ++++++++++++++ .../flink/streaming/UnboundedSourceITCase.java | 208 ------------ .../streaming/UnboundedSourceWrapperTest.java | 324 ++++++++++++++++++ 7 files changed, 916 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java new file mode 100644 index 0000000..e512db0 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.PipelineOptions; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.GlobalConfiguration; + +/** + * {@link DefaultValueFactory} for getting a default value for the parallelism option + * on {@link FlinkPipelineOptions}. + * + * <p>This will return either the default value from {@link GlobalConfiguration} or {@code 1}. + * A valid {@link GlobalConfiguration} is only available if the program is executed by the Flink + * run scripts. + */ +public class DefaultParallelismFactory implements DefaultValueFactory<Integer> { + @Override + public Integer create(PipelineOptions options) { + return GlobalConfiguration.getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 8c82abd..fd86bc9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -70,7 +70,7 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp void setFlinkMaster(String value); @Description("The degree of parallelism to be used when distributing operations onto workers.") - @Default.Integer(-1) + @Default.InstanceFactory(DefaultParallelismFactory.class) Integer getParallelism(); void setParallelism(Integer value); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 618727d..2778d5c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; @@ -284,8 +285,17 @@ public class FlinkStreamingTransformTranslators { } }).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>()); } else { - source = context.getExecutionEnvironment() - .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform)); + try { + transform.getSource(); + UnboundedSourceWrapper<T, ?> sourceWrapper = + new UnboundedSourceWrapper<>( + context.getPipelineOptions(), + transform.getSource(), + context.getExecutionEnvironment().getParallelism()); + source = context.getExecutionEnvironment().addSource(sourceWrapper).name(transform.getName()); + } catch (Exception e) { + throw new RuntimeException("Error while translating UnboundedSource: " + transform.getSource(), e); + } } context.setOutputDataStream(output, source); @@ -310,7 +320,9 @@ public class FlinkStreamingTransformTranslators { FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundWrapper<>( context.getPipelineOptions(), windowingStrategy, transform.getFn()); DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform)); - SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream.flatMap(doFnWrapper) + SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream + .flatMap(doFnWrapper) + .name(transform.getName()) .returns(outputWindowedValueCoder); context.setOutputDataStream(context.getOutput(transform), outDataStream); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 9d15a33..b816e2a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -18,107 +18,286 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.List; /** - * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the - * {@link org.apache.beam.sdk.io.Read.Unbounded} interface. - * - * For now we support non-parallel sources, checkpointing is WIP. - * */ -public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable { + * Wrapper for executing {@link UnboundedSource UnboundedSources} as a Flink Source. + */ +public class UnboundedSourceWrapper< + OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> + extends RichParallelSourceFunction<WindowedValue<OutputT>> + implements Triggerable, Checkpointed<byte[]> { - private final String name; - private final UnboundedSource<T, ?> source; + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); - private StreamingRuntimeContext runtime = null; - private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null; + /** + * Keep the options so that we can initialize the readers. + */ + private final SerializedPipelineOptions serializedOptions; - private volatile boolean isRunning = false; + /** + * For snapshot and restore. + */ + private final ListCoder< + KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> checkpointCoder; - private final SerializedPipelineOptions serializedOptions; + /** + * The split sources. We split them in the constructor to ensure that all parallel + * sources are consistent about the split sources. + */ + private List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources; - /** Instantiated during runtime **/ - private transient UnboundedSource.UnboundedReader<T> reader; + /** + * Make it a field so that we can access it in {@link #trigger(long)} for + * emitting watermarks. + */ + private transient List<UnboundedSource.UnboundedReader<OutputT>> readers; - public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded<T> transform) { - this.name = transform.getName(); - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - this.source = transform.getSource(); - } + /** + * Initialize here and not in run() to prevent races where we cancel a job before run() is + * ever called or run() is called after cancel(). + */ + private volatile boolean isRunning = true; - public String getName() { - return this.name; - } + /** + * Make it a field so that we can access it in {@link #trigger(long)} for registering new + * triggers. + */ + private transient StreamingRuntimeContext runtimeContext; - WindowedValue<T> makeWindowedValue(T output, Instant timestamp) { - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + /** + * Make it a field so that we can access it in {@link #trigger(long)} for emitting + * watermarks. + */ + private transient StreamSource.ManualWatermarkContext<WindowedValue<OutputT>> context; + + /** + * When restoring from a snapshot we put the restored sources/checkpoint marks here + * and open in {@link #open(Configuration)}. + */ + private transient List< + KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> restoredState; + + @SuppressWarnings("unchecked") + public UnboundedSourceWrapper( + PipelineOptions pipelineOptions, + UnboundedSource<OutputT, CheckpointMarkT> source, + int parallelism) throws Exception { + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + + if (source.requiresDeduping()) { + LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source); } - return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + + Coder<CheckpointMarkT> checkpointMarkCoder = source.getCheckpointMarkCoder(); + Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder = + SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointMarkT>>() {}); + + checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, checkpointMarkCoder)); + + // get the splits early. we assume that the generated splits are stable, + // this is necessary so that the mapping of state to source is correct + // when restoring + splitSources = source.generateInitialSplits(parallelism, pipelineOptions); } @Override - public void run(SourceContext<WindowedValue<T>> ctx) throws Exception { + public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception { if (!(ctx instanceof StreamSource.ManualWatermarkContext)) { throw new RuntimeException( - "We assume that all sources in Dataflow are EventTimeSourceFunction. " + - "Apparently " + this.name + " is not. " + - "Probably you should consider writing your own Wrapper for this source."); + "Cannot emit watermarks, this hints at a misconfiguration/bug."); } - context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx; - runtime = (StreamingRuntimeContext) getRuntimeContext(); + context = (StreamSource.ManualWatermarkContext<WindowedValue<OutputT>>) ctx; + runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); - isRunning = true; + // figure out which split sources we're responsible for + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); - reader = source.createReader(serializedOptions.getPipelineOptions(), null); + List<UnboundedSource<OutputT, CheckpointMarkT>> localSources = new ArrayList<>(); - boolean inputAvailable = reader.start(); + for (int i = 0; i < splitSources.size(); i++) { + if (i % numSubtasks == subtaskIndex) { + localSources.add(splitSources.get(i)); + } + } + + LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}", + subtaskIndex, + numSubtasks, + localSources); + + readers = new ArrayList<>(); + if (restoredState != null) { + + // restore the splitSources from the checkpoint to ensure consistent ordering + // do it using a transform because otherwise we would have to do + // unchecked casts + splitSources = Lists.transform( + restoredState, + new Function< + KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>, + UnboundedSource<OutputT, CheckpointMarkT>>() { + @Override + public UnboundedSource<OutputT, CheckpointMarkT> apply( + KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> input) { + return input.getKey(); + } + }); + + for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored: + restoredState) { + readers.add( + restored.getKey().createReader( + serializedOptions.getPipelineOptions(), restored.getValue())); + } + restoredState = null; + } else { + // initialize readers from scratch + for (UnboundedSource<OutputT, CheckpointMarkT> source : localSources) { + readers.add(source.createReader(serializedOptions.getPipelineOptions(), null)); + } + } + + if (readers.size() == 0) { + // do nothing, but still look busy ... + // also, output a Long.MAX_VALUE watermark since we know that we're not + // going to emit anything + // we can't return here since Flink requires that all operators stay up, + // otherwise checkpointing would not work correctly anymore + ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); - setNextWatermarkTimer(this.runtime); + // wait until this is canceled + final Object waitLock = new Object(); + while (isRunning) { + try { + // Flink will interrupt us at some point + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (waitLock) { + waitLock.wait(); + } + } catch (InterruptedException e) { + if (!isRunning) { + // restore the interrupted state, and fall through the loop + Thread.currentThread().interrupt(); + } + } + } + } else if (readers.size() == 1) { + // the easy case, we just read from one reader + UnboundedSource.UnboundedReader<OutputT> reader = readers.get(0); + boolean dataAvailable = reader.start(); + if (dataAvailable) { + emitElement(ctx, reader); + } - try { + setNextWatermarkTimer(this.runtimeContext); while (isRunning) { + dataAvailable = reader.advance(); - if (!inputAvailable && isRunning) { - // wait a bit until we retry to pull more records + if (dataAvailable) { + emitElement(ctx, reader); + } else { Thread.sleep(50); - inputAvailable = reader.advance(); } + } + } else { + // a bit more complicated, we are responsible for several readers + // loop through them and sleep if none of them had any data - if (inputAvailable) { + int numReaders = readers.size(); + int currentReader = 0; - // get it and its timestamp from the source - T item = reader.getCurrent(); - Instant timestamp = reader.getCurrentTimestamp(); + // start each reader and emit data if immediately available + for (UnboundedSource.UnboundedReader<OutputT> reader : readers) { + boolean dataAvailable = reader.start(); + if (dataAvailable) { + emitElement(ctx, reader); + } + } - // write it to the output collector - synchronized (ctx.getCheckpointLock()) { - context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis()); - } + // a flag telling us whether any of the readers had data + // if no reader had data, sleep for bit + boolean hadData = false; + while (isRunning) { + UnboundedSource.UnboundedReader<OutputT> reader = readers.get(currentReader); + boolean dataAvailable = reader.advance(); - inputAvailable = reader.advance(); + if (dataAvailable) { + emitElement(ctx, reader); + hadData = true; + } + + currentReader = (currentReader + 1) % numReaders; + if (currentReader == 0 && !hadData) { + Thread.sleep(50); + } else if (currentReader == 0) { + hadData = false; } } - } finally { - reader.close(); + } + } + + /** + * Emit the current element from the given Reader. The reader is guaranteed to have data. + */ + private void emitElement( + SourceContext<WindowedValue<OutputT>> ctx, + UnboundedSource.UnboundedReader<OutputT> reader) { + // make sure that reader state update and element emission are atomic + // with respect to snapshots + synchronized (ctx.getCheckpointLock()) { + + OutputT item = reader.getCurrent(); + Instant timestamp = reader.getCurrentTimestamp(); + + WindowedValue<OutputT> windowedValue = + WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + ctx.collectWithTimestamp(windowedValue, timestamp.getMillis()); + } + } + + @Override + public void close() throws Exception { + super.close(); + if (readers != null) { + for (UnboundedSource.UnboundedReader<OutputT> reader: readers) { + reader.close(); + } } } @@ -128,13 +307,52 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue< } @Override + public byte[] snapshotState(long l, long l1) throws Exception { + // we checkpoint the sources along with the CheckpointMarkT to ensure + // than we have a correct mapping of checkpoints to sources when + // restoring + List<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> checkpoints = + new ArrayList<>(); + + for (int i = 0; i < splitSources.size(); i++) { + UnboundedSource<OutputT, CheckpointMarkT> source = splitSources.get(i); + UnboundedSource.UnboundedReader<OutputT> reader = readers.get(i); + + @SuppressWarnings("unchecked") + CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark(); + KV<UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> kv = + KV.of(source, mark); + checkpoints.add(kv); + } + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + checkpointCoder.encode(checkpoints, baos, Coder.Context.OUTER); + return baos.toByteArray(); + } + } + + @Override + public void restoreState(byte[] bytes) throws Exception { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { + restoredState = checkpointCoder.decode(bais, Coder.Context.OUTER); + } + } + + @Override public void trigger(long timestamp) throws Exception { if (this.isRunning) { synchronized (context.getCheckpointLock()) { - long watermarkMillis = this.reader.getWatermark().getMillis(); + // find minimum watermark over all readers + long watermarkMillis = Long.MAX_VALUE; + for (UnboundedSource.UnboundedReader<OutputT> reader: readers) { + Instant watermark = reader.getWatermark(); + if (watermark != null) { + watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis); + } + } context.emitWatermark(new Watermark(watermarkMillis)); } - setNextWatermarkTimer(this.runtime); + setNextWatermarkTimer(this.runtimeContext); } } @@ -150,4 +368,11 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue< return System.currentTimeMillis() + watermarkInterval; } + /** + * Visible so that we can check this in tests. Must not be used for anything else. + */ + @VisibleForTesting + public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() { + return splitSources; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java new file mode 100644 index 0000000..3ced02e --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.DelegateCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.values.KV; + +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; + +/** + * An unbounded source for testing the unbounded sources framework code. + * + * <p>Each split of this sources produces records of the form KV(split_id, i), + * where i counts up from 0. Each record has a timestamp of i, and the watermark + * accurately tracks these timestamps. The reader will occasionally return false + * from {@code advance}, in order to simulate a source where not all the data is + * available immediately. + */ +public class TestCountingSource + extends UnboundedSource<KV<Integer, Integer>, TestCountingSource.CounterMark> { + private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class); + + private static List<Integer> finalizeTracker; + private final int numMessagesPerShard; + private final int shardNumber; + private final boolean dedup; + private final boolean throwOnFirstSnapshot; + private final boolean allowSplitting; + + /** + * We only allow an exception to be thrown from getCheckpointMark + * at most once. This must be static since the entire TestCountingSource + * instance may re-serialized when the pipeline recovers and retries. + */ + private static boolean thrown = false; + + public static void setFinalizeTracker(List<Integer> finalizeTracker) { + TestCountingSource.finalizeTracker = finalizeTracker; + } + + public TestCountingSource(int numMessagesPerShard) { + this(numMessagesPerShard, 0, false, false, true); + } + + public TestCountingSource withDedup() { + return new TestCountingSource( + numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot, true); + } + + private TestCountingSource withShardNumber(int shardNumber) { + return new TestCountingSource( + numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true); + } + + public TestCountingSource withThrowOnFirstSnapshot(boolean throwOnFirstSnapshot) { + return new TestCountingSource( + numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true); + } + + public TestCountingSource withoutSplitting() { + return new TestCountingSource( + numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, false); + } + + private TestCountingSource(int numMessagesPerShard, int shardNumber, boolean dedup, + boolean throwOnFirstSnapshot, boolean allowSplitting) { + this.numMessagesPerShard = numMessagesPerShard; + this.shardNumber = shardNumber; + this.dedup = dedup; + this.throwOnFirstSnapshot = throwOnFirstSnapshot; + this.allowSplitting = allowSplitting; + } + + public int getShardNumber() { + return shardNumber; + } + + @Override + public List<TestCountingSource> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) { + List<TestCountingSource> splits = new ArrayList<>(); + int numSplits = allowSplitting ? desiredNumSplits : 1; + for (int i = 0; i < numSplits; i++) { + splits.add(withShardNumber(i)); + } + return splits; + } + + class CounterMark implements UnboundedSource.CheckpointMark { + int current; + + public CounterMark(int current) { + this.current = current; + } + + @Override + public void finalizeCheckpoint() { + if (finalizeTracker != null) { + finalizeTracker.add(current); + } + } + } + + @Override + public Coder<CounterMark> getCheckpointMarkCoder() { + return DelegateCoder.of( + VarIntCoder.of(), + new DelegateCoder.CodingFunction<CounterMark, Integer>() { + @Override + public Integer apply(CounterMark input) { + return input.current; + } + }, + new DelegateCoder.CodingFunction<Integer, CounterMark>() { + @Override + public CounterMark apply(Integer input) { + return new CounterMark(input); + } + }); + } + + @Override + public boolean requiresDeduping() { + return dedup; + } + + /** + * Public only so that the checkpoint can be conveyed from {@link #getCheckpointMark()} to + * {@link TestCountingSource#createReader(PipelineOptions, CounterMark)} without cast. + */ + public class CountingSourceReader extends UnboundedReader<KV<Integer, Integer>> { + private int current; + + public CountingSourceReader(int startingPoint) { + this.current = startingPoint; + } + + @Override + public boolean start() { + return advance(); + } + + @Override + public boolean advance() { + if (current >= numMessagesPerShard - 1) { + return false; + } + // If testing dedup, occasionally insert a duplicate value; + if (current >= 0 && dedup && ThreadLocalRandom.current().nextInt(5) == 0) { + return true; + } + current++; + return true; + } + + @Override + public KV<Integer, Integer> getCurrent() { + return KV.of(shardNumber, current); + } + + @Override + public Instant getCurrentTimestamp() { + return new Instant(current); + } + + @Override + public byte[] getCurrentRecordId() { + try { + return encodeToByteArray(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), getCurrent()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() {} + + @Override + public TestCountingSource getCurrentSource() { + return TestCountingSource.this; + } + + @Override + public Instant getWatermark() { + // The watermark is a promise about future elements, and the timestamps of elements are + // strictly increasing for this source. + return new Instant(current + 1); + } + + @Override + public CounterMark getCheckpointMark() { + if (throwOnFirstSnapshot && !thrown) { + thrown = true; + LOG.error("Throwing exception while checkpointing counter"); + throw new RuntimeException("failed during checkpoint"); + } + // The checkpoint can assume all records read, including the current, have + // been commited. + return new CounterMark(current); + } + + @Override + public long getSplitBacklogBytes() { + return 7L; + } + } + + @Override + public CountingSourceReader createReader( + PipelineOptions options, @Nullable CounterMark checkpointMark) { + if (checkpointMark == null) { + LOG.debug("creating reader"); + } else { + LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current); + } + return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1); + } + + @Override + public void validate() {} + + @Override + public Coder<KV<Integer, Integer>> getDefaultOutputCoder() { + return KvCoder.of(VarIntCoder.of(), VarIntCoder.of()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java deleted file mode 100644 index 8a5de15..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; - -import org.apache.beam.runners.flink.FlinkTestPipeline; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; - -import org.apache.flink.streaming.util.StreamingProgramTestBase; -import org.joda.time.Instant; - -import java.io.IOException; -import java.util.List; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - - -public class UnboundedSourceITCase extends StreamingProgramTestBase { - - protected static String resultPath; - - public UnboundedSourceITCase() { - } - - static final String[] EXPECTED_RESULT = new String[]{ - "1", "2", "3", "4", "5", "6", "7", "8", "9"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - runProgram(resultPath); - } - - private static void runProgram(String resultPath) { - - Pipeline p = FlinkTestPipeline.createForStreaming(); - - PCollection<String> result = p - .apply(Read.from(new RangeReadSource(1, 10))) - .apply(Window.<Integer>into(new GlobalWindows()) - .triggering(AfterPane.elementCountAtLeast(10)) - .discardingFiredPanes()) - .apply(ParDo.of(new DoFn<Integer, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element().toString()); - } - })); - - result.apply(TextIO.Write.to(resultPath)); - - try { - p.run(); - fail(); - } catch(Exception e) { - assertEquals("The source terminates as expected.", e.getCause().getCause().getMessage()); - } - } - - - private static class RangeReadSource extends UnboundedSource<Integer, UnboundedSource.CheckpointMark> { - - final int from; - final int to; - - RangeReadSource(int from, int to) { - this.from = from; - this.to = to; - } - - - @Override - public List<? extends UnboundedSource<Integer, CheckpointMark>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - return ImmutableList.of(this); - } - - @Override - public UnboundedReader<Integer> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) { - return new RangeReadReader(options); - } - - @Nullable - @Override - public Coder<CheckpointMark> getCheckpointMarkCoder() { - return null; - } - - @Override - public void validate() { - } - - @Override - public Coder<Integer> getDefaultOutputCoder() { - return BigEndianIntegerCoder.of(); - } - - private class RangeReadReader extends UnboundedReader<Integer> { - - private int current; - - private long watermark; - - public RangeReadReader(PipelineOptions options) { - assertNotNull(options); - current = from; - } - - @Override - public boolean start() throws IOException { - return true; - } - - @Override - public boolean advance() throws IOException { - current++; - watermark++; - - if (current >= to) { - try { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - throw new IOException("The source terminates as expected."); - } catch (IOException e) { - // pass on the exception to terminate the source - throw e; - } catch (Throwable t) { - // expected here from the file check - } - } - return current < to; - } - - @Override - public Integer getCurrent() throws NoSuchElementException { - return current; - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return new Instant(current); - } - - @Override - public void close() throws IOException { - } - - @Override - public Instant getWatermark() { - return new Instant(watermark); - } - - @Override - public CheckpointMark getCheckpointMark() { - return null; - } - - @Override - public UnboundedSource<Integer, ?> getCurrentSource() { - return RangeReadSource.this; - } - } - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java new file mode 100644 index 0000000..f5a52f5 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Tests for {@link UnboundedSourceWrapper}. + */ +public class UnboundedSourceWrapperTest { + + /** + * Creates a {@link UnboundedSourceWrapper} that has exactly one reader per source, since we + * specify a parallelism of 1 and also at runtime tell the source that it has 1 parallel subtask. + */ + @Test + public void testWithOneReader() throws Exception { + final int NUM_ELEMENTS = 20; + final Object checkpointLock = new Object(); + PipelineOptions options = PipelineOptionsFactory.create(); + + // this source will emit exactly NUM_ELEMENTS across all parallel readers, + // afterwards it will stall. We check whether we also receive NUM_ELEMENTS + // elements later. + TestCountingSource source = new TestCountingSource(NUM_ELEMENTS); + UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = + new UnboundedSourceWrapper<>(options, source, 1); + + assertEquals(1, flinkWrapper.getSplitSources().size()); + + StreamSource< + WindowedValue<KV<Integer, Integer>>, + UnboundedSourceWrapper< + KV<Integer, Integer>, + TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); + + setupSourceOperator(sourceOperator); + + + try { + sourceOperator.run(checkpointLock, + new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { + private int count = 0; + + @Override + public void emitWatermark(Watermark watermark) { + } + + @Override + public void collect( + StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) { + + count++; + if (count >= NUM_ELEMENTS) { + throw new SuccessException(); + } + } + + @Override + public void close() { + + } + }); + } catch (SuccessException e) { + // success + } catch (Exception e) { + fail("We caught " + e); + } + } + + /** + * Creates a {@link UnboundedSourceWrapper} that has multiple readers per source, since we + * specify a parallelism higher than 1 and at runtime tell the source that it has 1 parallel + * this means that one source will manage multiple readers. + */ + @Test + public void testWithMultipleReaders() throws Exception { + final int NUM_ELEMENTS = 20; + final Object checkpointLock = new Object(); + PipelineOptions options = PipelineOptionsFactory.create(); + + // this source will emit exactly NUM_ELEMENTS across all parallel readers, + // afterwards it will stall. We check whether we also receive NUM_ELEMENTS + // elements later. + TestCountingSource source = new TestCountingSource(NUM_ELEMENTS); + UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = + new UnboundedSourceWrapper<>(options, source, 4); + + assertEquals(4, flinkWrapper.getSplitSources().size()); + + StreamSource<WindowedValue< + KV<Integer, Integer>>, + UnboundedSourceWrapper< + KV<Integer, Integer>, + TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); + + setupSourceOperator(sourceOperator); + + + try { + sourceOperator.run(checkpointLock, + new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { + private int count = 0; + + @Override + public void emitWatermark(Watermark watermark) { + } + + @Override + public void collect( + StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) { + + count++; + if (count >= NUM_ELEMENTS) { + throw new SuccessException(); + } + } + + @Override + public void close() { + + } + }); + } catch (SuccessException e) { + // success + return; + } + fail("Read terminated without producing expected number of outputs"); + } + + /** + * Verify that snapshot/restore work as expected. We bring up a source and cancel + * after seeing a certain number of elements. Then we snapshot that source, + * bring up a completely new source that we restore from the snapshot and verify + * that we see all expected elements in the end. + */ + @Test + public void testRestore() throws Exception { + final int NUM_ELEMENTS = 20; + final Object checkpointLock = new Object(); + PipelineOptions options = PipelineOptionsFactory.create(); + + // this source will emit exactly NUM_ELEMENTS across all parallel readers, + // afterwards it will stall. We check whether we also receive NUM_ELEMENTS + // elements later. + TestCountingSource source = new TestCountingSource(NUM_ELEMENTS); + UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = + new UnboundedSourceWrapper<>(options, source, 1); + + assertEquals(1, flinkWrapper.getSplitSources().size()); + + StreamSource< + WindowedValue<KV<Integer, Integer>>, + UnboundedSourceWrapper< + KV<Integer, Integer>, + TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); + + setupSourceOperator(sourceOperator); + + final Set<KV<Integer, Integer>> emittedElements = new HashSet<>(); + + boolean readFirstBatchOfElements = false; + + try { + sourceOperator.run(checkpointLock, + new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { + private int count = 0; + + @Override + public void emitWatermark(Watermark watermark) { + } + + @Override + public void collect( + StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) { + + emittedElements.add(windowedValueStreamRecord.getValue().getValue()); + count++; + if (count >= NUM_ELEMENTS / 2) { + throw new SuccessException(); + } + } + + @Override + public void close() { + + } + }); + } catch (SuccessException e) { + // success + readFirstBatchOfElements = true; + } + + assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements); + + // draw a snapshot + byte[] snapshot = flinkWrapper.snapshotState(0, 0); + + // create a completely new source but restore from the snapshot + TestCountingSource restoredSource = new TestCountingSource(NUM_ELEMENTS); + UnboundedSourceWrapper< + KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper = + new UnboundedSourceWrapper<>(options, restoredSource, 1); + + assertEquals(1, restoredFlinkWrapper.getSplitSources().size()); + + StreamSource< + WindowedValue<KV<Integer, Integer>>, + UnboundedSourceWrapper< + KV<Integer, Integer>, + TestCountingSource.CounterMark>> restoredSourceOperator = + new StreamSource<>(restoredFlinkWrapper); + + setupSourceOperator(restoredSourceOperator); + + // restore snapshot + restoredFlinkWrapper.restoreState(snapshot); + + boolean readSecondBatchOfElements = false; + + // run again and verify that we see the other elements + try { + restoredSourceOperator.run(checkpointLock, + new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { + private int count = 0; + + @Override + public void emitWatermark(Watermark watermark) { + } + + @Override + public void collect( + StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) { + emittedElements.add(windowedValueStreamRecord.getValue().getValue()); + count++; + if (count >= NUM_ELEMENTS / 2) { + throw new SuccessException(); + } + } + + @Override + public void close() { + + } + }); + } catch (SuccessException e) { + // success + readSecondBatchOfElements = true; + } + + assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements); + + // verify that we saw all NUM_ELEMENTS elements + assertTrue(emittedElements.size() == NUM_ELEMENTS); + } + + @SuppressWarnings("unchecked") + private static <T> void setupSourceOperator(StreamSource<T, ?> operator) { + ExecutionConfig executionConfig = new ExecutionConfig(); + StreamConfig cfg = new StreamConfig(new Configuration()); + + cfg.setTimeCharacteristic(TimeCharacteristic.EventTime); + + Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0); + + StreamTask<?, ?> mockTask = mock(StreamTask.class); + when(mockTask.getName()).thenReturn("Mock Task"); + when(mockTask.getCheckpointLock()).thenReturn(new Object()); + when(mockTask.getConfiguration()).thenReturn(cfg); + when(mockTask.getEnvironment()).thenReturn(env); + when(mockTask.getExecutionConfig()).thenReturn(executionConfig); + when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); + + operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class)); + } + + /** + * A special {@link RuntimeException} that we throw to signal that the test was successful. + */ + private static class SuccessException extends RuntimeException {} +}
