This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 758c1ce371e9dc41018fa5c1668cb24cc1751c99 Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Dec 28 10:24:11 2018 +0100 Cleaning --- .../translation/batch/DatasetSourceBatch.java | 3 +- .../streaming/DatasetStreamingSource.java | 172 +-------------------- 2 files changed, 2 insertions(+), 173 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java index 1ad16eb..f4cd885 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java @@ -41,8 +41,7 @@ import org.apache.spark.sql.types.StructType; /** * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming - * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This - * class is just a mix-in. + * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. */ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport { diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java index 8701a83..6947b6d 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java @@ -53,8 +53,7 @@ import scala.collection.immutable.Map; /** * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming - * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This - * class is just a mix-in. + * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. */ public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSupport{ @@ -196,173 +195,4 @@ public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSu reader.close(); } } - - private static class DatasetCatalog<T> extends Catalog { - - TranslationContext context; - Source<T> source; - - private DatasetCatalog(TranslationContext context, Source<T> source) { - this.context = context; - this.source = source; - } - - @Override public String currentDatabase() { - return null; - } - - @Override public void setCurrentDatabase(String dbName) { - - } - - @Override public Dataset<Database> listDatabases() { - return null; - } - - @Override public Dataset<Table> listTables() { - return null; - } - - @Override public Dataset<Table> listTables(String dbName) throws AnalysisException { - return null; - } - - @Override public Dataset<Function> listFunctions() { - return null; - } - - @Override public Dataset<Function> listFunctions(String dbName) throws AnalysisException { - return null; - } - - @Override public Dataset<Column> listColumns(String tableName) throws AnalysisException { - return null; - } - - @Override public Dataset<Column> listColumns(String dbName, String tableName) - throws AnalysisException { - return null; - } - - @Override public Database getDatabase(String dbName) throws AnalysisException { - return null; - } - - @Override public Table getTable(String tableName) throws AnalysisException { - return new DatasetTable<>("beam", "beaam", "beam fake table to wire up with Beam sources", - null, true, source, context); - } - - @Override public Table getTable(String dbName, String tableName) throws AnalysisException { - return null; - } - - @Override public Function getFunction(String functionName) throws AnalysisException { - return null; - } - - @Override public Function getFunction(String dbName, String functionName) - throws AnalysisException { - return null; - } - - @Override public boolean databaseExists(String dbName) { - return false; - } - - @Override public boolean tableExists(String tableName) { - return false; - } - - @Override public boolean tableExists(String dbName, String tableName) { - return false; - } - - @Override public boolean functionExists(String functionName) { - return false; - } - - @Override public boolean functionExists(String dbName, String functionName) { - return false; - } - - @Override public Dataset<Row> createTable(String tableName, String path) { - return null; - } - - @Override public Dataset<Row> createTable(String tableName, String path, String source) { - return null; - } - - @Override public Dataset<Row> createTable(String tableName, String source, - Map<String, String> options) { - return null; - } - - @Override public Dataset<Row> createTable(String tableName, String source, StructType schema, - Map<String, String> options) { - return null; - } - - @Override public boolean dropTempView(String viewName) { - return false; - } - - @Override public boolean dropGlobalTempView(String viewName) { - return false; - } - - @Override public void recoverPartitions(String tableName) { - - } - - @Override public boolean isCached(String tableName) { - return false; - } - - @Override public void cacheTable(String tableName) { - - } - - @Override public void cacheTable(String tableName, StorageLevel storageLevel) { - - } - - @Override public void uncacheTable(String tableName) { - - } - - @Override public void clearCache() { - - } - - @Override public void refreshTable(String tableName) { - - } - - @Override public void refreshByPath(String path) { - - } - - private static class DatasetTable<T> extends Table { - - private Source<T> source; - private TranslationContext context; - - public DatasetTable(String name, String database, String description, String tableType, - boolean isTemporary, Source<T> source, TranslationContext context) { - super(name, database, description, tableType, isTemporary); - this.source = source; - this.context = context; - } - - private Source<T> getSource() { - return source; - } - - private TranslationContext getContext() { - return context; - } - } - } }
