Repository: beam Updated Branches: refs/heads/master 5c612272d -> d1d85dfc7
[BEAM-1177] Input DStream "bundles" are now in serialized form and include relevant metadata. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a49fbcac Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a49fbcac Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a49fbcac Branch: refs/heads/master Commit: a49fbcacaf972a7b919d1544c5e3c83389fc3291 Parents: 5c61227 Author: Sela <[email protected]> Authored: Sun Dec 18 14:36:53 2016 +0200 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Wed Jan 4 13:05:34 2017 +0100 ---------------------------------------------------------------------- .../beam/runners/spark/io/MicrobatchSource.java | 4 +- .../runners/spark/io/SparkUnboundedSource.java | 127 +++++++++++++------ .../spark/stateful/StateSpecFunctions.java | 37 ++++-- 3 files changed, 121 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a49fbcac/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index f42cca3..ff818a1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -258,8 +258,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo return (CheckpointMarkT) reader.getCheckpointMark(); } - public long getNumRecordsRead() { - return recordsRead; + public Instant getWatermark() { + return reader.getWatermark(); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/a49fbcac/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index 394b023..f03dc8c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -18,16 +18,20 @@ package org.apache.beam.runners.spark.io; +import java.io.Serializable; import java.util.Collections; -import java.util.Iterator; import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.stateful.StateSpecFunctions; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; -import org.apache.beam.runners.spark.translation.TranslationUtils; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext$; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.RDD; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.StateSpec; @@ -39,6 +43,10 @@ import org.apache.spark.streaming.api.java.JavaPairInputDStream$; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.dstream.DStream; import org.apache.spark.streaming.scheduler.StreamInputInfo; +import org.joda.time.Instant; + +import scala.Tuple2; +import scala.runtime.BoxedUnit; /** @@ -75,20 +83,39 @@ public class SparkUnboundedSource { // call mapWithState to read from a checkpointable sources. JavaMapWithStateDStream<Source<T>, CheckpointMarkT, byte[], - Iterator<WindowedValue<T>>> mapWithStateDStream = inputDStream.mapWithState( + Tuple2<Iterable<byte[]>, Metadata>> mapWithStateDStream = inputDStream.mapWithState( StateSpec.function(StateSpecFunctions.<T, CheckpointMarkT>mapSourceFunction(rc))); // set checkpoint duration for read stream, if set. checkpointStream(mapWithStateDStream, options); - // flatmap and report read elements. Use the inputDStream's id to tie between the reported - // info and the inputDStream it originated from. - int id = inputDStream.inputDStream().id(); - ReportingFlatMappedDStream<WindowedValue<T>> reportingFlatMappedDStream = - new ReportingFlatMappedDStream<>(mapWithStateDStream.dstream(), id, - getSourceName(source, id)); + // cache since checkpointing is less frequent. + mapWithStateDStream.cache(); - return JavaDStream.fromDStream(reportingFlatMappedDStream, - JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag()); + // report the number of input elements for this InputDStream to the InputInfoTracker. + int id = inputDStream.inputDStream().id(); + JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map( + new Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata>() { + @Override + public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception { + return t2._2(); + } + }); + + // register the ReportingDStream op. + new ReportingDStream(metadataDStream.dstream(), id, getSourceName(source, id)).register(); + + // output the actual (deserialized) stream. + WindowedValue.FullWindowedValueCoder<T> coder = + WindowedValue.FullWindowedValueCoder.of( + source.getDefaultOutputCoder(), + GlobalWindow.Coder.INSTANCE); + return mapWithStateDStream.flatMap( + new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() { + @Override + public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception { + return t2._1(); + } + }).map(CoderHelpers.fromByteFunction(coder)); } private static <T> String getSourceName(Source<T> source, int id) { @@ -111,20 +138,20 @@ public class SparkUnboundedSource { } /** - * A flatMap DStream function that "flattens" the Iterators read by the - * {@link MicrobatchSource.Reader}s, while reporting the properties of the read to the - * {@link org.apache.spark.streaming.scheduler.InputInfoTracker} for RateControl purposes - * and visibility. + * A DStream function that reports the properties of the read to the + * {@link org.apache.spark.streaming.scheduler.InputInfoTracker} + * for RateControl purposes and visibility. */ - private static class ReportingFlatMappedDStream<T> extends DStream<T> { - private final DStream<Iterator<T>> parent; + private static class ReportingDStream extends DStream<BoxedUnit> { + private final DStream<Metadata> parent; private final int inputDStreamId; private final String sourceName; - ReportingFlatMappedDStream(DStream<Iterator<T>> parent, - int inputDStreamId, - String sourceName) { - super(parent.ssc(), JavaSparkContext$.MODULE$.<T>fakeClassTag()); + ReportingDStream( + DStream<Metadata> parent, + int inputDStreamId, + String sourceName) { + super(parent.ssc(), JavaSparkContext$.MODULE$.<BoxedUnit>fakeClassTag()); this.parent = parent; this.inputDStreamId = inputDStreamId; this.sourceName = sourceName; @@ -142,31 +169,59 @@ public class SparkUnboundedSource { } @Override - public scala.Option<RDD<T>> compute(Time validTime) { + public scala.Option<RDD<BoxedUnit>> compute(Time validTime) { // compute parent. - scala.Option<RDD<Iterator<T>>> computedParentRDD = parent.getOrCompute(validTime); - // compute this DStream - take single-iterator partitions an flatMap them. - if (computedParentRDD.isDefined()) { - RDD<T> computedRDD = computedParentRDD.get().toJavaRDD() - .flatMap(TranslationUtils.<T>flattenIter()).rdd().cache(); - // report - for RateEstimator and visibility. - report(validTime, computedRDD.count()); - return scala.Option.apply(computedRDD); - } else { - report(validTime, 0); - return scala.Option.empty(); + scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime); + long count = 0; + Instant globalWatermark = new Instant(Long.MIN_VALUE); + if (parentRDDOpt.isDefined()) { + JavaRDD<Metadata> parentRDD = parentRDDOpt.get().toJavaRDD(); + for (Metadata metadata: parentRDD.collect()) { + count += metadata.getNumRecords(); + // a monotonically increasing watermark. + globalWatermark = globalWatermark.isBefore(metadata.getWatermark()) + ? metadata.getWatermark() : globalWatermark; + } } + // report - for RateEstimator and visibility. + report(validTime, count, globalWatermark); + return scala.Option.empty(); } - private void report(Time batchTime, long count) { + private void report(Time batchTime, long count, Instant watermark) { // metadata - #records read and a description. scala.collection.immutable.Map<String, Object> metadata = new scala.collection.immutable.Map.Map1<String, Object>( StreamInputInfo.METADATA_KEY_DESCRIPTION(), - String.format("Read %d records from %s for batch time: %s", count, sourceName, - batchTime)); + String.format( + "Read %d records with observed watermark %s, from %s for batch time: %s", + count, + watermark, + sourceName, + batchTime)); StreamInputInfo streamInputInfo = new StreamInputInfo(inputDStreamId, count, metadata); ssc().scheduler().inputInfoTracker().reportInfo(batchTime, streamInputInfo); } } + + /** + * A metadata holder for an input stream partition. + */ + public static class Metadata implements Serializable { + private final long numRecords; + private final Instant watermark; + + public Metadata(long numRecords, Instant watermark) { + this.numRecords = numRecords; + this.watermark = watermark; + } + + public long getNumRecords() { + return numRecords; + } + + public Instant getWatermark() { + return watermark; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/a49fbcac/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index 053f4ac..ffe0ddd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.EmptyCheckpointMark; import org.apache.beam.runners.spark.io.MicrobatchSource; +import org.apache.beam.runners.spark.io.SparkUnboundedSource.Metadata; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; @@ -39,10 +40,12 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.streaming.State; import org.apache.spark.streaming.StateSpec; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; +import scala.Tuple2; import scala.runtime.AbstractFunction3; /** @@ -92,14 +95,17 @@ public class StateSpecFunctions { */ public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, /* CheckpointMarkT */State<byte[]>, - Iterator<WindowedValue<T>>> mapSourceFunction(final SparkRuntimeContext runtimeContext) { + Tuple2<Iterable<byte[]>, Metadata>> mapSourceFunction( + final SparkRuntimeContext runtimeContext) { return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, State<byte[]>, - Iterator<WindowedValue<T>>>() { + Tuple2<Iterable<byte[]>, Metadata>>() { @Override - public Iterator<WindowedValue<T>> apply(Source<T> source, scala.Option<CheckpointMarkT> - startCheckpointMark, State<byte[]> state) { + public Tuple2<Iterable<byte[]>, Metadata> apply( + Source<T> source, + scala.Option<CheckpointMarkT> startCheckpointMark, + State<byte[]> state) { // source as MicrobatchSource MicrobatchSource<T, CheckpointMarkT> microbatchSource = (MicrobatchSource<T, CheckpointMarkT>) source; @@ -130,18 +136,25 @@ public class StateSpecFunctions { throw new RuntimeException(e); } - // read microbatch. - final List<WindowedValue<T>> readValues = new ArrayList<>(); + // read microbatch as a serialized collection. + final List<byte[]> readValues = new ArrayList<>(); + final Instant watermark; + WindowedValue.FullWindowedValueCoder<T> coder = + WindowedValue.FullWindowedValueCoder.of( + source.getDefaultOutputCoder(), + GlobalWindow.Coder.INSTANCE); try { // measure how long a read takes per-partition. Stopwatch stopwatch = Stopwatch.createStarted(); boolean finished = !reader.start(); while (!finished) { - readValues.add(WindowedValue.of(reader.getCurrent(), reader.getCurrentTimestamp(), - GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + WindowedValue<T> wv = WindowedValue.of(reader.getCurrent(), + reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + readValues.add(CoderHelpers.toByteArray(wv, coder)); finished = !reader.advance(); } + watermark = ((MicrobatchSource.Reader) reader).getWatermark(); // close and checkpoint reader. reader.close(); LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getId(), @@ -160,7 +173,13 @@ public class StateSpecFunctions { throw new RuntimeException("Failed to read from reader.", e); } - return Iterators.unmodifiableIterator(readValues.iterator()); + Iterable <byte[]> iterable = new Iterable<byte[]>() { + @Override + public Iterator<byte[]> iterator() { + return Iterators.unmodifiableIterator(readValues.iterator()); + } + }; + return new Tuple2<>(iterable, new Metadata(readValues.size(), watermark)); } }; }
