This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit e9ac3c36ca414f5cf014cf787eb67045ce3d4b2a Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Dec 19 17:08:58 2018 +0100 Experiment over using spark Catalog to pass in Beam Source through spark Table --- .../batch/ReadSourceTranslatorBatch.java | 12 +- .../translation/io/DatasetSource.java | 191 ++++++++++++++++++++- 2 files changed, 193 insertions(+), 10 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 2c1aa93..0b828fb 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -30,11 +30,16 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerApplicationStart; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalog.Catalog; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; import org.apache.spark.sql.streaming.DataStreamReader; class ReadSourceTranslatorBatch<T> @@ -58,9 +63,12 @@ class ReadSourceTranslatorBatch<T> throw new RuntimeException(e); } SparkSession sparkSession = context.getSparkSession(); - Dataset<Row> rowDataset = sparkSession.readStream().format(providerClassName).load(); + + DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName); + Dataset<Row> rowDataset = dataStreamReader.load(); + //TODO initialize source : how, to get a reference to the DatasetSource instance that spark - // instantiates to be able to call DatasetSource.initialize() + // instantiates to be able to call DatasetSource.initialize(). How to pass in a DatasetCatalog? MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() { @Override public WindowedValue<T> call(Row value) throws Exception { //there is only one value put in each Row by the InputPartitionReader diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java index d23ecf3..deacdf4 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java @@ -28,7 +28,16 @@ import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalog.Catalog; +import org.apache.spark.sql.catalog.Column; +import org.apache.spark.sql.catalog.Database; +import org.apache.spark.sql.catalog.Function; +import org.apache.spark.sql.catalog.Table; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.ContinuousReadSupport; import org.apache.spark.sql.sources.v2.DataSourceOptions; @@ -39,6 +48,8 @@ import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; import org.apache.spark.sql.sources.v2.reader.streaming.Offset; import org.apache.spark.sql.types.StructType; +import org.apache.spark.storage.StorageLevel; +import scala.collection.immutable.Map; /** * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming @@ -53,17 +64,12 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{ private BoundedSource<T> source; - public void initialize(TranslationContext context, BoundedSource<T> source){ - this.context = context; - this.source = source; - this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism(); - checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero."); - this.bundleSize = context.getOptions().getBundleSize(); - } - @Override public MicroBatchReader createMicroBatchReader( Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) { + this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism(); + checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero."); + this.bundleSize = context.getOptions().getBundleSize(); return new DatasetMicroBatchReader(schema, checkpointLocation, options); } @@ -190,4 +196,173 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{ reader.close(); } } + + private static class DatasetCatalog<T> extends Catalog { + + TranslationContext context; + Source<T> source; + + private DatasetCatalog(TranslationContext context, Source<T> source) { + this.context = context; + this.source = source; + } + + @Override public String currentDatabase() { + return null; + } + + @Override public void setCurrentDatabase(String dbName) { + + } + + @Override public Dataset<Database> listDatabases() { + return null; + } + + @Override public Dataset<Table> listTables() { + return null; + } + + @Override public Dataset<Table> listTables(String dbName) throws AnalysisException { + return null; + } + + @Override public Dataset<Function> listFunctions() { + return null; + } + + @Override public Dataset<Function> listFunctions(String dbName) throws AnalysisException { + return null; + } + + @Override public Dataset<Column> listColumns(String tableName) throws AnalysisException { + return null; + } + + @Override public Dataset<Column> listColumns(String dbName, String tableName) + throws AnalysisException { + return null; + } + + @Override public Database getDatabase(String dbName) throws AnalysisException { + return null; + } + + @Override public Table getTable(String tableName) throws AnalysisException { + return new DatasetTable<>("beam", "beaam", "beam fake table to wire up with Beam sources", + null, true, source, context); + } + + @Override public Table getTable(String dbName, String tableName) throws AnalysisException { + return null; + } + + @Override public Function getFunction(String functionName) throws AnalysisException { + return null; + } + + @Override public Function getFunction(String dbName, String functionName) + throws AnalysisException { + return null; + } + + @Override public boolean databaseExists(String dbName) { + return false; + } + + @Override public boolean tableExists(String tableName) { + return false; + } + + @Override public boolean tableExists(String dbName, String tableName) { + return false; + } + + @Override public boolean functionExists(String functionName) { + return false; + } + + @Override public boolean functionExists(String dbName, String functionName) { + return false; + } + + @Override public Dataset<Row> createTable(String tableName, String path) { + return null; + } + + @Override public Dataset<Row> createTable(String tableName, String path, String source) { + return null; + } + + @Override public Dataset<Row> createTable(String tableName, String source, + Map<String, String> options) { + return null; + } + + @Override public Dataset<Row> createTable(String tableName, String source, StructType schema, + Map<String, String> options) { + return null; + } + + @Override public boolean dropTempView(String viewName) { + return false; + } + + @Override public boolean dropGlobalTempView(String viewName) { + return false; + } + + @Override public void recoverPartitions(String tableName) { + + } + + @Override public boolean isCached(String tableName) { + return false; + } + + @Override public void cacheTable(String tableName) { + + } + + @Override public void cacheTable(String tableName, StorageLevel storageLevel) { + + } + + @Override public void uncacheTable(String tableName) { + + } + + @Override public void clearCache() { + + } + + @Override public void refreshTable(String tableName) { + + } + + @Override public void refreshByPath(String path) { + + } + + private static class DatasetTable<T> extends Table { + + private Source<T> source; + private TranslationContext context; + + public DatasetTable(String name, String database, String description, String tableType, + boolean isTemporary, Source<T> source, TranslationContext context) { + super(name, database, description, tableType, isTemporary); + this.source = source; + this.context = context; + } + + private Source<T> getSource() { + return source; + } + + private TranslationContext getContext() { + return context; + } + } + } }
