This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1cea29df3702ef438d8eb5964450a7bafea3c7d5 Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Jan 2 15:52:46 2019 +0100 Pass Beam Source and PipelineOptions to the spark DataSource as serialized strings --- .../translation/batch/DatasetSourceBatch.java | 41 ++++++++++++++++------ .../batch/ReadSourceTranslatorBatch.java | 16 +++++++-- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java index f4cd885..331e397 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java @@ -24,8 +24,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.core.serialization.Base64Serializer; import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; -import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.util.WindowedValue; @@ -45,16 +46,38 @@ import org.apache.spark.sql.types.StructType; */ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport { + static final String BEAM_SOURCE_OPTION = "beam-source"; + static final String DEFAULT_PARALLELISM = "default-parallelism"; + static final String PIPELINE_OPTIONS = "pipeline-options"; private int numPartitions; private Long bundleSize; - private TranslationContext context; private BoundedSource<T> source; + private SparkPipelineOptions sparkPipelineOptions; - @Override public DataSourceReader createReader(DataSourceOptions options) { - this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism(); + @SuppressWarnings("unchecked") + @Override + public DataSourceReader createReader(DataSourceOptions options) { + if (!options.get(BEAM_SOURCE_OPTION).isPresent()){ + throw new RuntimeException("Beam source was not set in DataSource options"); + } + this.source = Base64Serializer + .deserializeUnchecked(options.get(BEAM_SOURCE_OPTION).get(), BoundedSource.class); + + if (!options.get(DEFAULT_PARALLELISM).isPresent()){ + throw new RuntimeException("Spark default parallelism was not set in DataSource options"); + } + if (!options.get(BEAM_SOURCE_OPTION).isPresent()){ + throw new RuntimeException("Beam source was not set in DataSource options"); + } + this.numPartitions = Integer.valueOf(options.get(DEFAULT_PARALLELISM).get()); checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero."); - this.bundleSize = context.getOptions().getBundleSize(); + if (!options.get(PIPELINE_OPTIONS).isPresent()){ + throw new RuntimeException("Beam pipelineOptions were not set in DataSource options"); + } + this.sparkPipelineOptions = SerializablePipelineOptions + .deserializeFromJson(options.get(PIPELINE_OPTIONS).get()).as(SparkPipelineOptions.class); + this.bundleSize = sparkPipelineOptions.getBundleSize(); return new DatasetReader(); } /** This class can be mapped to Beam {@link BoundedSource}. */ @@ -62,7 +85,6 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport { private Optional<StructType> schema; private String checkpointLocation; - private DataSourceOptions options; @Override public StructType readSchema() { @@ -73,13 +95,12 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport { public List<InputPartition<InternalRow>> planInputPartitions() { List<InputPartition<InternalRow>> result = new ArrayList<>(); long desiredSizeBytes; - SparkPipelineOptions options = context.getOptions(); try { desiredSizeBytes = (bundleSize == null) - ? source.getEstimatedSizeBytes(options) / numPartitions + ? source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions : bundleSize; - List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options); + List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, sparkPipelineOptions); for (BoundedSource<T> source : sources) { result.add( new InputPartition<InternalRow>() { @@ -88,7 +109,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport { public InputPartitionReader<InternalRow> createPartitionReader() { BoundedReader<T> reader = null; try { - reader = source.createReader(options); + reader = source.createReader(sparkPipelineOptions); } catch (IOException e) { throw new RuntimeException( "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e); diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index d980a52..50f4915 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -18,7 +18,11 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.beam.runners.core.construction.ReadTranslation; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.core.serialization.Base64Serializer; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.sdk.io.BoundedSource; @@ -54,7 +58,15 @@ class ReadSourceTranslatorBatch<T> } SparkSession sparkSession = context.getSparkSession(); - Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load(); + String serializedSource = Base64Serializer.serializeUnchecked(source); + Map<String, String> datasetSourceOptions = new HashMap<>(); + datasetSourceOptions.put(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource); + datasetSourceOptions.put(DatasetSourceBatch.DEFAULT_PARALLELISM, + String.valueOf(context.getSparkSession().sparkContext().defaultParallelism())); + datasetSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS, + SerializablePipelineOptions.serializeToJson(context.getOptions())); + Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).options(datasetSourceOptions) + .load(); //TODO pass the source and the translation context serialized as string to the DatasetSource MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() { @@ -63,7 +75,7 @@ class ReadSourceTranslatorBatch<T> return value.<WindowedValue>getAs(0); } }; - //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>> + //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedValue<T>> // be created ? Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
