This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 92a104e680bb03a7ba16068ae80f055bbd82ea3a Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Dec 28 10:28:18 2018 +0100 Add ReadSourceTranslatorStreaming --- ...mingSource.java => DatasetSourceStreaming.java} | 2 +- .../streaming/ReadSourceTranslatorStreaming.java | 76 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java similarity index 99% rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java index 6947b6d..fad68d3 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java @@ -55,7 +55,7 @@ import scala.collection.immutable.Map; * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. */ -public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSupport{ +public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport{ private int numPartitions; private Long bundleSize; diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java new file mode 100644 index 0000000..6066822 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.structuredstreaming.translation.streaming; + +import java.io.IOException; +import org.apache.beam.runners.core.construction.ReadTranslation; +import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +class ReadSourceTranslatorStreaming<T> + implements TransformTranslator<PTransform<PBegin, PCollection<T>>> { + + private String SOURCE_PROVIDER_CLASS = DatasetSourceStreaming.class.getCanonicalName(); + + @SuppressWarnings("unchecked") + @Override + public void translateTransform( + PTransform<PBegin, PCollection<T>> transform, TranslationContext context) { + AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> rootTransform = + (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>) + context.getCurrentTransform(); + + UnboundedSource<T, UnboundedSource.CheckpointMark> source; + try { + source = ReadTranslation + .unboundedSourceFromTransform(rootTransform); + } catch (IOException e) { + throw new RuntimeException(e); + } + SparkSession sparkSession = context.getSparkSession(); + + Dataset<Row> rowDataset = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS).load(); + + //TODO pass the source and the translation context serialized as string to the DatasetSource + MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() { + @Override public WindowedValue call(Row value) throws Exception { + //there is only one value put in each Row by the InputPartitionReader + return value.<WindowedValue>getAs(0); + } + }; + //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>> + // be created ? + Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class)); + + PCollection<T> output = (PCollection<T>) context.getOutput(); + context.putDatasetRaw(output, dataset); + } +}
