This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new f8a3d584c Lazy system tables (#4606)
f8a3d584c is described below

commit f8a3d584c8a392574347ebab97b26c07b054e93a
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Dec 13 20:37:40 2022 +0000

    Lazy system tables (#4606)
---
 datafusion/core/src/catalog/information_schema.rs | 521 ++++++++++++----------
 datafusion/core/src/datasource/mod.rs             |   1 +
 datafusion/core/src/datasource/streaming.rs       |  93 ++++
 datafusion/core/src/physical_plan/mod.rs          |   1 +
 datafusion/core/src/physical_plan/streaming.rs    | 124 +++++
 5 files changed, 516 insertions(+), 224 deletions(-)

diff --git a/datafusion/core/src/catalog/information_schema.rs 
b/datafusion/core/src/catalog/information_schema.rs
index 957cac53a..f7ef6b93d 100644
--- a/datafusion/core/src/catalog/information_schema.rs
+++ b/datafusion/core/src/catalog/information_schema.rs
@@ -20,29 +20,31 @@
 //! Information Schema]<https://en.wikipedia.org/wiki/Information_schema>
 
 use std::{
-    any,
+    any::Any,
     sync::{Arc, Weak},
 };
 
-use parking_lot::RwLock;
-
 use arrow::{
     array::{StringBuilder, UInt64Builder},
-    datatypes::{DataType, Field, Schema},
+    datatypes::{DataType, Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
+use parking_lot::RwLock;
+
 use datafusion_common::Result;
 
-use crate::datasource::{MemTable, TableProvider};
+use crate::config::ConfigOptions;
+use crate::datasource::streaming::{PartitionStream, StreamingTable};
+use crate::datasource::TableProvider;
 use crate::logical_expr::TableType;
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use crate::physical_plan::SendableRecordBatchStream;
 
 use super::{
     catalog::{CatalogList, CatalogProvider},
     schema::SchemaProvider,
 };
 
-use crate::config::ConfigOptions;
-
 const INFORMATION_SCHEMA: &str = "information_schema";
 const TABLES: &str = "tables";
 const VIEWS: &str = "views";
@@ -73,7 +75,7 @@ impl CatalogWithInformationSchema {
 }
 
 impl CatalogProvider for CatalogWithInformationSchema {
-    fn as_any(&self) -> &dyn any::Any {
+    fn as_any(&self) -> &dyn Any {
         self
     }
 
@@ -90,8 +92,10 @@ impl CatalogProvider for CatalogWithInformationSchema {
             Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
                 Weak::upgrade(&self.config_options).map(|config_options| {
                     Arc::new(InformationSchemaProvider {
-                        catalog_list,
-                        config_options,
+                        config: InformationSchemaConfig {
+                            catalog_list,
+                            config_options,
+                        },
                     }) as Arc<dyn SchemaProvider>
                 })
             })
@@ -117,15 +121,19 @@ impl CatalogProvider for CatalogWithInformationSchema {
 /// providers, they will appear the next time the `information_schema`
 /// table is queried.
 struct InformationSchemaProvider {
+    config: InformationSchemaConfig,
+}
+
+#[derive(Clone)]
+struct InformationSchemaConfig {
     catalog_list: Arc<dyn CatalogList>,
     config_options: Arc<RwLock<ConfigOptions>>,
 }
 
-impl InformationSchemaProvider {
+impl InformationSchemaConfig {
     /// Construct the `information_schema.tables` virtual table
-    fn make_tables(&self) -> Arc<dyn TableProvider> {
+    fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
         // create a mem table with the names of tables
-        let mut builder = InformationSchemaTablesBuilder::new();
 
         for catalog_name in self.catalog_list.catalog_names() {
             let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
@@ -161,15 +169,9 @@ impl InformationSchemaProvider {
                 TableType::View,
             );
         }
-
-        let mem_table: MemTable = builder.into();
-
-        Arc::new(mem_table)
     }
 
-    fn make_views(&self) -> Arc<dyn TableProvider> {
-        let mut builder = InformationSchemaViewBuilder::new();
-
+    fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
         for catalog_name in self.catalog_list.catalog_names() {
             let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
 
@@ -188,15 +190,10 @@ impl InformationSchemaProvider {
                 }
             }
         }
-
-        let mem_table: MemTable = builder.into();
-        Arc::new(mem_table)
     }
 
     /// Construct the `information_schema.columns` virtual table
-    fn make_columns(&self) -> Arc<dyn TableProvider> {
-        let mut builder = InformationSchemaColumnsBuilder::new();
-
+    fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) {
         for catalog_name in self.catalog_list.catalog_names() {
             let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
 
@@ -220,47 +217,47 @@ impl InformationSchemaProvider {
                 }
             }
         }
-
-        let mem_table: MemTable = builder.into();
-
-        Arc::new(mem_table)
     }
 
     /// Construct the `information_schema.df_settings` virtual table
-    fn make_df_settings(&self) -> Arc<dyn TableProvider> {
-        let mut builder = InformationSchemaDfSettingsBuilder::new();
-
+    fn make_df_settings(&self, builder: &mut 
InformationSchemaDfSettingsBuilder) {
         for (name, setting) in self.config_options.read().options() {
             builder.add_setting(name, setting.to_string());
         }
-
-        let mem_table: MemTable = builder.into();
-
-        Arc::new(mem_table)
     }
 }
 
 impl SchemaProvider for InformationSchemaProvider {
-    fn as_any(&self) -> &(dyn any::Any + 'static) {
+    fn as_any(&self) -> &(dyn Any + 'static) {
         self
     }
 
     fn table_names(&self) -> Vec<String> {
-        vec![TABLES.to_string(), VIEWS.to_string(), COLUMNS.to_string()]
+        vec![
+            TABLES.to_string(),
+            VIEWS.to_string(),
+            COLUMNS.to_string(),
+            DF_SETTINGS.to_string(),
+        ]
     }
 
     fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
-        if name.eq_ignore_ascii_case("tables") {
-            Some(self.make_tables())
+        let config = self.config.clone();
+        let table: Arc<dyn PartitionStream> = if 
name.eq_ignore_ascii_case("tables") {
+            Arc::new(InformationSchemaTables::new(config))
         } else if name.eq_ignore_ascii_case("columns") {
-            Some(self.make_columns())
+            Arc::new(InformationSchemaColumns::new(config))
         } else if name.eq_ignore_ascii_case("views") {
-            Some(self.make_views())
+            Arc::new(InformationSchemaViews::new(config))
         } else if name.eq_ignore_ascii_case("df_settings") {
-            Some(self.make_df_settings())
+            Arc::new(InformationSchemaDfSettings::new(config))
         } else {
-            None
-        }
+            return None;
+        };
+
+        Some(Arc::new(
+            StreamingTable::try_new(table.schema().clone(), 
vec![table]).unwrap(),
+        ))
     }
 
     fn table_exist(&self, name: &str) -> bool {
@@ -268,10 +265,58 @@ impl SchemaProvider for InformationSchemaProvider {
     }
 }
 
+struct InformationSchemaTables {
+    schema: SchemaRef,
+    config: InformationSchemaConfig,
+}
+
+impl InformationSchemaTables {
+    fn new(config: InformationSchemaConfig) -> Self {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("table_catalog", DataType::Utf8, false),
+            Field::new("table_schema", DataType::Utf8, false),
+            Field::new("table_name", DataType::Utf8, false),
+            Field::new("table_type", DataType::Utf8, false),
+        ]));
+
+        Self { schema, config }
+    }
+
+    fn builder(&self) -> InformationSchemaTablesBuilder {
+        InformationSchemaTablesBuilder {
+            catalog_names: StringBuilder::new(),
+            schema_names: StringBuilder::new(),
+            table_names: StringBuilder::new(),
+            table_types: StringBuilder::new(),
+            schema: self.schema.clone(),
+        }
+    }
+}
+
+impl PartitionStream for InformationSchemaTables {
+    fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    fn execute(&self) -> SendableRecordBatchStream {
+        let mut builder = self.builder();
+        let config = self.config.clone();
+        Box::pin(RecordBatchStreamAdapter::new(
+            self.schema.clone(),
+            // TODO: Stream this
+            futures::stream::once(async move {
+                config.make_tables(&mut builder);
+                Ok(builder.finish())
+            }),
+        ))
+    }
+}
+
 /// Builds the `information_schema.TABLE` table row by row
 ///
 /// Columns are based on 
<https://www.postgresql.org/docs/current/infoschema-columns.html>
 struct InformationSchemaTablesBuilder {
+    schema: SchemaRef,
     catalog_names: StringBuilder,
     schema_names: StringBuilder,
     table_names: StringBuilder,
@@ -279,15 +324,6 @@ struct InformationSchemaTablesBuilder {
 }
 
 impl InformationSchemaTablesBuilder {
-    fn new() -> Self {
-        Self {
-            catalog_names: StringBuilder::new(),
-            schema_names: StringBuilder::new(),
-            table_names: StringBuilder::new(),
-            table_types: StringBuilder::new(),
-        }
-    }
-
     fn add_table(
         &mut self,
         catalog_name: impl AsRef<str>,
@@ -305,37 +341,65 @@ impl InformationSchemaTablesBuilder {
             TableType::Temporary => "LOCAL TEMPORARY",
         });
     }
+
+    fn finish(&mut self) -> RecordBatch {
+        RecordBatch::try_new(
+            self.schema.clone(),
+            vec![
+                Arc::new(self.catalog_names.finish()),
+                Arc::new(self.schema_names.finish()),
+                Arc::new(self.table_names.finish()),
+                Arc::new(self.table_types.finish()),
+            ],
+        )
+        .unwrap()
+    }
+}
+
+struct InformationSchemaViews {
+    schema: SchemaRef,
+    config: InformationSchemaConfig,
 }
 
-impl From<InformationSchemaTablesBuilder> for MemTable {
-    fn from(value: InformationSchemaTablesBuilder) -> MemTable {
-        let schema = Schema::new(vec![
+impl InformationSchemaViews {
+    fn new(config: InformationSchemaConfig) -> Self {
+        let schema = Arc::new(Schema::new(vec![
             Field::new("table_catalog", DataType::Utf8, false),
             Field::new("table_schema", DataType::Utf8, false),
             Field::new("table_name", DataType::Utf8, false),
-            Field::new("table_type", DataType::Utf8, false),
-        ]);
-
-        let InformationSchemaTablesBuilder {
-            mut catalog_names,
-            mut schema_names,
-            mut table_names,
-            mut table_types,
-        } = value;
-
-        let schema = Arc::new(schema);
-        let batch = RecordBatch::try_new(
-            schema.clone(),
-            vec![
-                Arc::new(catalog_names.finish()),
-                Arc::new(schema_names.finish()),
-                Arc::new(table_names.finish()),
-                Arc::new(table_types.finish()),
-            ],
-        )
-        .unwrap();
+            Field::new("definition", DataType::Utf8, true),
+        ]));
+
+        Self { schema, config }
+    }
 
-        MemTable::try_new(schema, vec![vec![batch]]).unwrap()
+    fn builder(&self) -> InformationSchemaViewBuilder {
+        InformationSchemaViewBuilder {
+            catalog_names: StringBuilder::new(),
+            schema_names: StringBuilder::new(),
+            table_names: StringBuilder::new(),
+            definitions: StringBuilder::new(),
+            schema: self.schema.clone(),
+        }
+    }
+}
+
+impl PartitionStream for InformationSchemaViews {
+    fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    fn execute(&self) -> SendableRecordBatchStream {
+        let mut builder = self.builder();
+        let config = self.config.clone();
+        Box::pin(RecordBatchStreamAdapter::new(
+            self.schema.clone(),
+            // TODO: Stream this
+            futures::stream::once(async move {
+                config.make_views(&mut builder);
+                Ok(builder.finish())
+            }),
+        ))
     }
 }
 
@@ -343,6 +407,7 @@ impl From<InformationSchemaTablesBuilder> for MemTable {
 ///
 /// Columns are based on 
<https://www.postgresql.org/docs/current/infoschema-columns.html>
 struct InformationSchemaViewBuilder {
+    schema: SchemaRef,
     catalog_names: StringBuilder,
     schema_names: StringBuilder,
     table_names: StringBuilder,
@@ -350,15 +415,6 @@ struct InformationSchemaViewBuilder {
 }
 
 impl InformationSchemaViewBuilder {
-    fn new() -> Self {
-        Self {
-            catalog_names: StringBuilder::new(),
-            schema_names: StringBuilder::new(),
-            table_names: StringBuilder::new(),
-            definitions: StringBuilder::new(),
-        }
-    }
-
     fn add_view(
         &mut self,
         catalog_name: impl AsRef<str>,
@@ -372,68 +428,56 @@ impl InformationSchemaViewBuilder {
         self.table_names.append_value(table_name.as_ref());
         self.definitions.append_option(definition.as_ref());
     }
-}
 
-impl From<InformationSchemaViewBuilder> for MemTable {
-    fn from(value: InformationSchemaViewBuilder) -> Self {
-        let schema = Schema::new(vec![
-            Field::new("table_catalog", DataType::Utf8, false),
-            Field::new("table_schema", DataType::Utf8, false),
-            Field::new("table_name", DataType::Utf8, false),
-            Field::new("definition", DataType::Utf8, true),
-        ]);
-
-        let InformationSchemaViewBuilder {
-            mut catalog_names,
-            mut schema_names,
-            mut table_names,
-            mut definitions,
-        } = value;
-
-        let schema = Arc::new(schema);
-        let batch = RecordBatch::try_new(
-            schema.clone(),
+    fn finish(&mut self) -> RecordBatch {
+        RecordBatch::try_new(
+            self.schema.clone(),
             vec![
-                Arc::new(catalog_names.finish()),
-                Arc::new(schema_names.finish()),
-                Arc::new(table_names.finish()),
-                Arc::new(definitions.finish()),
+                Arc::new(self.catalog_names.finish()),
+                Arc::new(self.schema_names.finish()),
+                Arc::new(self.table_names.finish()),
+                Arc::new(self.definitions.finish()),
             ],
         )
-        .unwrap();
-
-        MemTable::try_new(schema, vec![vec![batch]]).unwrap()
+        .unwrap()
     }
 }
 
-/// Builds the `information_schema.COLUMNS` table row by row
-///
-/// Columns are based on 
<https://www.postgresql.org/docs/current/infoschema-columns.html>
-struct InformationSchemaColumnsBuilder {
-    catalog_names: StringBuilder,
-    schema_names: StringBuilder,
-    table_names: StringBuilder,
-    column_names: StringBuilder,
-    ordinal_positions: UInt64Builder,
-    column_defaults: StringBuilder,
-    is_nullables: StringBuilder,
-    data_types: StringBuilder,
-    character_maximum_lengths: UInt64Builder,
-    character_octet_lengths: UInt64Builder,
-    numeric_precisions: UInt64Builder,
-    numeric_precision_radixes: UInt64Builder,
-    numeric_scales: UInt64Builder,
-    datetime_precisions: UInt64Builder,
-    interval_types: StringBuilder,
+struct InformationSchemaColumns {
+    schema: SchemaRef,
+    config: InformationSchemaConfig,
 }
 
-impl InformationSchemaColumnsBuilder {
-    fn new() -> Self {
+impl InformationSchemaColumns {
+    fn new(config: InformationSchemaConfig) -> Self {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("table_catalog", DataType::Utf8, false),
+            Field::new("table_schema", DataType::Utf8, false),
+            Field::new("table_name", DataType::Utf8, false),
+            Field::new("column_name", DataType::Utf8, false),
+            Field::new("ordinal_position", DataType::UInt64, false),
+            Field::new("column_default", DataType::Utf8, true),
+            Field::new("is_nullable", DataType::Utf8, false),
+            Field::new("data_type", DataType::Utf8, false),
+            Field::new("character_maximum_length", DataType::UInt64, true),
+            Field::new("character_octet_length", DataType::UInt64, true),
+            Field::new("numeric_precision", DataType::UInt64, true),
+            Field::new("numeric_precision_radix", DataType::UInt64, true),
+            Field::new("numeric_scale", DataType::UInt64, true),
+            Field::new("datetime_precision", DataType::UInt64, true),
+            Field::new("interval_type", DataType::Utf8, true),
+        ]));
+
+        Self { schema, config }
+    }
+
+    fn builder(&self) -> InformationSchemaColumnsBuilder {
         // StringBuilder requires providing an initial capacity, so
         // pick 10 here arbitrarily as this is not performance
         // critical code and the number of tables is unavailable here.
         let default_capacity = 10;
-        Self {
+
+        InformationSchemaColumnsBuilder {
             catalog_names: StringBuilder::new(),
             schema_names: StringBuilder::new(),
             table_names: StringBuilder::new(),
@@ -449,9 +493,53 @@ impl InformationSchemaColumnsBuilder {
             numeric_scales: UInt64Builder::with_capacity(default_capacity),
             datetime_precisions: 
UInt64Builder::with_capacity(default_capacity),
             interval_types: StringBuilder::new(),
+            schema: self.schema.clone(),
         }
     }
+}
+
+impl PartitionStream for InformationSchemaColumns {
+    fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    fn execute(&self) -> SendableRecordBatchStream {
+        let mut builder = self.builder();
+        let config = self.config.clone();
+        Box::pin(RecordBatchStreamAdapter::new(
+            self.schema.clone(),
+            // TODO: Stream this
+            futures::stream::once(async move {
+                config.make_columns(&mut builder);
+                Ok(builder.finish())
+            }),
+        ))
+    }
+}
+
+/// Builds the `information_schema.COLUMNS` table row by row
+///
+/// Columns are based on 
<https://www.postgresql.org/docs/current/infoschema-columns.html>
+struct InformationSchemaColumnsBuilder {
+    schema: SchemaRef,
+    catalog_names: StringBuilder,
+    schema_names: StringBuilder,
+    table_names: StringBuilder,
+    column_names: StringBuilder,
+    ordinal_positions: UInt64Builder,
+    column_defaults: StringBuilder,
+    is_nullables: StringBuilder,
+    data_types: StringBuilder,
+    character_maximum_lengths: UInt64Builder,
+    character_octet_lengths: UInt64Builder,
+    numeric_precisions: UInt64Builder,
+    numeric_precision_radixes: UInt64Builder,
+    numeric_scales: UInt64Builder,
+    datetime_precisions: UInt64Builder,
+    interval_types: StringBuilder,
+}
 
+impl InformationSchemaColumnsBuilder {
     #[allow(clippy::too_many_arguments)]
     fn add_column(
         &mut self,
@@ -547,111 +635,96 @@ impl InformationSchemaColumnsBuilder {
         self.datetime_precisions.append_option(None);
         self.interval_types.append_null();
     }
-}
 
-impl From<InformationSchemaColumnsBuilder> for MemTable {
-    fn from(value: InformationSchemaColumnsBuilder) -> MemTable {
-        let schema = Schema::new(vec![
-            Field::new("table_catalog", DataType::Utf8, false),
-            Field::new("table_schema", DataType::Utf8, false),
-            Field::new("table_name", DataType::Utf8, false),
-            Field::new("column_name", DataType::Utf8, false),
-            Field::new("ordinal_position", DataType::UInt64, false),
-            Field::new("column_default", DataType::Utf8, true),
-            Field::new("is_nullable", DataType::Utf8, false),
-            Field::new("data_type", DataType::Utf8, false),
-            Field::new("character_maximum_length", DataType::UInt64, true),
-            Field::new("character_octet_length", DataType::UInt64, true),
-            Field::new("numeric_precision", DataType::UInt64, true),
-            Field::new("numeric_precision_radix", DataType::UInt64, true),
-            Field::new("numeric_scale", DataType::UInt64, true),
-            Field::new("datetime_precision", DataType::UInt64, true),
-            Field::new("interval_type", DataType::Utf8, true),
-        ]);
-
-        let InformationSchemaColumnsBuilder {
-            mut catalog_names,
-            mut schema_names,
-            mut table_names,
-            mut column_names,
-            mut ordinal_positions,
-            mut column_defaults,
-            mut is_nullables,
-            mut data_types,
-            mut character_maximum_lengths,
-            mut character_octet_lengths,
-            mut numeric_precisions,
-            mut numeric_precision_radixes,
-            mut numeric_scales,
-            mut datetime_precisions,
-            mut interval_types,
-        } = value;
-
-        let schema = Arc::new(schema);
-        let batch = RecordBatch::try_new(
-            schema.clone(),
+    fn finish(&mut self) -> RecordBatch {
+        RecordBatch::try_new(
+            self.schema.clone(),
             vec![
-                Arc::new(catalog_names.finish()),
-                Arc::new(schema_names.finish()),
-                Arc::new(table_names.finish()),
-                Arc::new(column_names.finish()),
-                Arc::new(ordinal_positions.finish()),
-                Arc::new(column_defaults.finish()),
-                Arc::new(is_nullables.finish()),
-                Arc::new(data_types.finish()),
-                Arc::new(character_maximum_lengths.finish()),
-                Arc::new(character_octet_lengths.finish()),
-                Arc::new(numeric_precisions.finish()),
-                Arc::new(numeric_precision_radixes.finish()),
-                Arc::new(numeric_scales.finish()),
-                Arc::new(datetime_precisions.finish()),
-                Arc::new(interval_types.finish()),
+                Arc::new(self.catalog_names.finish()),
+                Arc::new(self.schema_names.finish()),
+                Arc::new(self.table_names.finish()),
+                Arc::new(self.column_names.finish()),
+                Arc::new(self.ordinal_positions.finish()),
+                Arc::new(self.column_defaults.finish()),
+                Arc::new(self.is_nullables.finish()),
+                Arc::new(self.data_types.finish()),
+                Arc::new(self.character_maximum_lengths.finish()),
+                Arc::new(self.character_octet_lengths.finish()),
+                Arc::new(self.numeric_precisions.finish()),
+                Arc::new(self.numeric_precision_radixes.finish()),
+                Arc::new(self.numeric_scales.finish()),
+                Arc::new(self.datetime_precisions.finish()),
+                Arc::new(self.interval_types.finish()),
             ],
         )
-        .unwrap();
-
-        MemTable::try_new(schema, vec![vec![batch]]).unwrap()
+        .unwrap()
     }
 }
 
-struct InformationSchemaDfSettingsBuilder {
-    names: StringBuilder,
-    settings: StringBuilder,
+struct InformationSchemaDfSettings {
+    schema: SchemaRef,
+    config: InformationSchemaConfig,
 }
 
-impl InformationSchemaDfSettingsBuilder {
-    fn new() -> Self {
-        Self {
+impl InformationSchemaDfSettings {
+    fn new(config: InformationSchemaConfig) -> Self {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("name", DataType::Utf8, false),
+            Field::new("setting", DataType::Utf8, false),
+        ]));
+
+        Self { schema, config }
+    }
+
+    fn builder(&self) -> InformationSchemaDfSettingsBuilder {
+        InformationSchemaDfSettingsBuilder {
             names: StringBuilder::new(),
             settings: StringBuilder::new(),
+            schema: self.schema.clone(),
         }
     }
+}
 
-    fn add_setting(&mut self, name: impl AsRef<str>, setting: impl AsRef<str>) 
{
-        self.names.append_value(name.as_ref());
-        self.settings.append_value(setting.as_ref());
+impl PartitionStream for InformationSchemaDfSettings {
+    fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    fn execute(&self) -> SendableRecordBatchStream {
+        let mut builder = self.builder();
+        let config = self.config.clone();
+        Box::pin(RecordBatchStreamAdapter::new(
+            self.schema.clone(),
+            // TODO: Stream this
+            futures::stream::once(async move {
+                // create a mem table with the names of tables
+                config.make_df_settings(&mut builder);
+                Ok(builder.finish())
+            }),
+        ))
     }
 }
 
-impl From<InformationSchemaDfSettingsBuilder> for MemTable {
-    fn from(value: InformationSchemaDfSettingsBuilder) -> MemTable {
-        let schema = Schema::new(vec![
-            Field::new("name", DataType::Utf8, false),
-            Field::new("setting", DataType::Utf8, false),
-        ]);
+struct InformationSchemaDfSettingsBuilder {
+    schema: SchemaRef,
+    names: StringBuilder,
+    settings: StringBuilder,
+}
 
-        let InformationSchemaDfSettingsBuilder {
-            mut names,
-            mut settings,
-        } = value;
+impl InformationSchemaDfSettingsBuilder {
+    fn add_setting(&mut self, name: impl AsRef<str>, setting: impl AsRef<str>) 
{
+        self.names.append_value(name.as_ref());
+        self.settings.append_value(setting.as_ref());
+    }
 
-        let schema = Arc::new(schema);
-        let batch = RecordBatch::try_new(
-            schema.clone(),
-            vec![Arc::new(names.finish()), Arc::new(settings.finish())],
+    fn finish(&mut self) -> RecordBatch {
+        RecordBatch::try_new(
+            self.schema.clone(),
+            vec![
+                Arc::new(self.names.finish()),
+                Arc::new(self.settings.finish()),
+            ],
         )
-        .unwrap();
-
-        MemTable::try_new(schema, vec![vec![batch]]).unwrap()
+        .unwrap()
     }
 }
diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index fc3e8f2d2..8610607b6 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -26,6 +26,7 @@ pub mod listing;
 pub mod listing_table_factory;
 pub mod memory;
 pub mod object_store;
+pub mod streaming;
 pub mod view;
 
 use futures::Stream;
diff --git a/datafusion/core/src/datasource/streaming.rs 
b/datafusion/core/src/datasource/streaming.rs
new file mode 100644
index 000000000..88de34efa
--- /dev/null
+++ b/datafusion/core/src/datasource/streaming.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A simplified [`TableProvider`] for streaming partitioned datasets
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::{Expr, TableType};
+
+use crate::datasource::TableProvider;
+use crate::execution::context::SessionState;
+use crate::physical_plan::streaming::StreamingTableExec;
+use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
+
+/// A partition that can be converted into a [`SendableRecordBatchStream`]
+pub trait PartitionStream: Send + Sync {
+    /// Returns the schema of this partition
+    fn schema(&self) -> &SchemaRef;
+
+    /// Returns a stream yielding this partitions values
+    fn execute(&self) -> SendableRecordBatchStream;
+}
+
+/// A [`TableProvider`] that streams a set of [`PartitionStream`]
+pub struct StreamingTable {
+    schema: SchemaRef,
+    partitions: Vec<Arc<dyn PartitionStream>>,
+}
+
+impl StreamingTable {
+    /// Try to create a new [`StreamingTable`] returning an error if the 
schema is incorrect
+    pub fn try_new(
+        schema: SchemaRef,
+        partitions: Vec<Arc<dyn PartitionStream>>,
+    ) -> Result<Self> {
+        if !partitions.iter().all(|x| schema.contains(x.schema())) {
+            return Err(DataFusionError::Plan(
+                "Mismatch between schema and batches".to_string(),
+            ));
+        }
+
+        Ok(Self { schema, partitions })
+    }
+}
+
+#[async_trait]
+impl TableProvider for StreamingTable {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::View
+    }
+
+    async fn scan(
+        &self,
+        _ctx: &SessionState,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // TODO: push limit down
+        Ok(Arc::new(StreamingTableExec::try_new(
+            self.schema.clone(),
+            self.partitions.clone(),
+            projection,
+        )?))
+    }
+}
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index 2b4cd63c6..aa365ea45 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -648,6 +648,7 @@ pub mod repartition;
 pub mod rewrite;
 pub mod sorts;
 pub mod stream;
+pub mod streaming;
 pub mod udaf;
 pub mod union;
 pub mod values;
diff --git a/datafusion/core/src/physical_plan/streaming.rs 
b/datafusion/core/src/physical_plan/streaming.rs
new file mode 100644
index 000000000..a6ab51bb1
--- /dev/null
+++ b/datafusion/core/src/physical_plan/streaming.rs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for streaming [`PartitionStream`]
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+use futures::stream::StreamExt;
+
+use datafusion_common::{DataFusionError, Result, Statistics};
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use crate::datasource::streaming::PartitionStream;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use crate::physical_plan::{ExecutionPlan, Partitioning, 
SendableRecordBatchStream};
+
+/// An [`ExecutionPlan`] for [`PartitionStream`]
+pub struct StreamingTableExec {
+    partitions: Vec<Arc<dyn PartitionStream>>,
+    projection: Option<Arc<[usize]>>,
+    projected_schema: SchemaRef,
+}
+
+impl StreamingTableExec {
+    /// Try to create a new [`StreamingTableExec`] returning an error if the 
schema is incorrect
+    pub fn try_new(
+        schema: SchemaRef,
+        partitions: Vec<Arc<dyn PartitionStream>>,
+        projection: Option<&Vec<usize>>,
+    ) -> Result<Self> {
+        if !partitions.iter().all(|x| schema.contains(x.schema())) {
+            return Err(DataFusionError::Plan(
+                "Mismatch between schema and batches".to_string(),
+            ));
+        }
+
+        let projected_schema = match projection {
+            Some(p) => Arc::new(schema.project(p)?),
+            None => schema,
+        };
+
+        Ok(Self {
+            partitions,
+            projected_schema,
+            projection: projection.cloned().map(Into::into),
+        })
+    }
+}
+
+impl std::fmt::Debug for StreamingTableExec {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for StreamingTableExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(self.partitions.len())
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Err(DataFusionError::Internal(format!(
+            "Children cannot be replaced in {:?}",
+            self
+        )))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let stream = self.partitions[partition].execute();
+        Ok(match self.projection.clone() {
+            Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
+                self.projected_schema.clone(),
+                stream.map(move |x| x.and_then(|b| 
b.project(projection.as_ref()))),
+            )),
+            None => stream,
+        })
+    }
+
+    fn statistics(&self) -> Statistics {
+        Default::default()
+    }
+}

Reply via email to