This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new f8a3d584c Lazy system tables (#4606)
f8a3d584c is described below
commit f8a3d584c8a392574347ebab97b26c07b054e93a
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Dec 13 20:37:40 2022 +0000
Lazy system tables (#4606)
---
datafusion/core/src/catalog/information_schema.rs | 521 ++++++++++++----------
datafusion/core/src/datasource/mod.rs | 1 +
datafusion/core/src/datasource/streaming.rs | 93 ++++
datafusion/core/src/physical_plan/mod.rs | 1 +
datafusion/core/src/physical_plan/streaming.rs | 124 +++++
5 files changed, 516 insertions(+), 224 deletions(-)
diff --git a/datafusion/core/src/catalog/information_schema.rs
b/datafusion/core/src/catalog/information_schema.rs
index 957cac53a..f7ef6b93d 100644
--- a/datafusion/core/src/catalog/information_schema.rs
+++ b/datafusion/core/src/catalog/information_schema.rs
@@ -20,29 +20,31 @@
//! Information Schema]<https://en.wikipedia.org/wiki/Information_schema>
use std::{
- any,
+ any::Any,
sync::{Arc, Weak},
};
-use parking_lot::RwLock;
-
use arrow::{
array::{StringBuilder, UInt64Builder},
- datatypes::{DataType, Field, Schema},
+ datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
+use parking_lot::RwLock;
+
use datafusion_common::Result;
-use crate::datasource::{MemTable, TableProvider};
+use crate::config::ConfigOptions;
+use crate::datasource::streaming::{PartitionStream, StreamingTable};
+use crate::datasource::TableProvider;
use crate::logical_expr::TableType;
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use crate::physical_plan::SendableRecordBatchStream;
use super::{
catalog::{CatalogList, CatalogProvider},
schema::SchemaProvider,
};
-use crate::config::ConfigOptions;
-
const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";
const VIEWS: &str = "views";
@@ -73,7 +75,7 @@ impl CatalogWithInformationSchema {
}
impl CatalogProvider for CatalogWithInformationSchema {
- fn as_any(&self) -> &dyn any::Any {
+ fn as_any(&self) -> &dyn Any {
self
}
@@ -90,8 +92,10 @@ impl CatalogProvider for CatalogWithInformationSchema {
Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
Weak::upgrade(&self.config_options).map(|config_options| {
Arc::new(InformationSchemaProvider {
- catalog_list,
- config_options,
+ config: InformationSchemaConfig {
+ catalog_list,
+ config_options,
+ },
}) as Arc<dyn SchemaProvider>
})
})
@@ -117,15 +121,19 @@ impl CatalogProvider for CatalogWithInformationSchema {
/// providers, they will appear the next time the `information_schema`
/// table is queried.
struct InformationSchemaProvider {
+ config: InformationSchemaConfig,
+}
+
+#[derive(Clone)]
+struct InformationSchemaConfig {
catalog_list: Arc<dyn CatalogList>,
config_options: Arc<RwLock<ConfigOptions>>,
}
-impl InformationSchemaProvider {
+impl InformationSchemaConfig {
/// Construct the `information_schema.tables` virtual table
- fn make_tables(&self) -> Arc<dyn TableProvider> {
+ fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
// create a mem table with the names of tables
- let mut builder = InformationSchemaTablesBuilder::new();
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
@@ -161,15 +169,9 @@ impl InformationSchemaProvider {
TableType::View,
);
}
-
- let mem_table: MemTable = builder.into();
-
- Arc::new(mem_table)
}
- fn make_views(&self) -> Arc<dyn TableProvider> {
- let mut builder = InformationSchemaViewBuilder::new();
-
+ fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
@@ -188,15 +190,10 @@ impl InformationSchemaProvider {
}
}
}
-
- let mem_table: MemTable = builder.into();
- Arc::new(mem_table)
}
/// Construct the `information_schema.columns` virtual table
- fn make_columns(&self) -> Arc<dyn TableProvider> {
- let mut builder = InformationSchemaColumnsBuilder::new();
-
+ fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
@@ -220,47 +217,47 @@ impl InformationSchemaProvider {
}
}
}
-
- let mem_table: MemTable = builder.into();
-
- Arc::new(mem_table)
}
/// Construct the `information_schema.df_settings` virtual table
- fn make_df_settings(&self) -> Arc<dyn TableProvider> {
- let mut builder = InformationSchemaDfSettingsBuilder::new();
-
+ fn make_df_settings(&self, builder: &mut
InformationSchemaDfSettingsBuilder) {
for (name, setting) in self.config_options.read().options() {
builder.add_setting(name, setting.to_string());
}
-
- let mem_table: MemTable = builder.into();
-
- Arc::new(mem_table)
}
}
impl SchemaProvider for InformationSchemaProvider {
- fn as_any(&self) -> &(dyn any::Any + 'static) {
+ fn as_any(&self) -> &(dyn Any + 'static) {
self
}
fn table_names(&self) -> Vec<String> {
- vec![TABLES.to_string(), VIEWS.to_string(), COLUMNS.to_string()]
+ vec![
+ TABLES.to_string(),
+ VIEWS.to_string(),
+ COLUMNS.to_string(),
+ DF_SETTINGS.to_string(),
+ ]
}
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
- if name.eq_ignore_ascii_case("tables") {
- Some(self.make_tables())
+ let config = self.config.clone();
+ let table: Arc<dyn PartitionStream> = if
name.eq_ignore_ascii_case("tables") {
+ Arc::new(InformationSchemaTables::new(config))
} else if name.eq_ignore_ascii_case("columns") {
- Some(self.make_columns())
+ Arc::new(InformationSchemaColumns::new(config))
} else if name.eq_ignore_ascii_case("views") {
- Some(self.make_views())
+ Arc::new(InformationSchemaViews::new(config))
} else if name.eq_ignore_ascii_case("df_settings") {
- Some(self.make_df_settings())
+ Arc::new(InformationSchemaDfSettings::new(config))
} else {
- None
- }
+ return None;
+ };
+
+ Some(Arc::new(
+ StreamingTable::try_new(table.schema().clone(),
vec![table]).unwrap(),
+ ))
}
fn table_exist(&self, name: &str) -> bool {
@@ -268,10 +265,58 @@ impl SchemaProvider for InformationSchemaProvider {
}
}
+struct InformationSchemaTables {
+ schema: SchemaRef,
+ config: InformationSchemaConfig,
+}
+
+impl InformationSchemaTables {
+ fn new(config: InformationSchemaConfig) -> Self {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("table_catalog", DataType::Utf8, false),
+ Field::new("table_schema", DataType::Utf8, false),
+ Field::new("table_name", DataType::Utf8, false),
+ Field::new("table_type", DataType::Utf8, false),
+ ]));
+
+ Self { schema, config }
+ }
+
+ fn builder(&self) -> InformationSchemaTablesBuilder {
+ InformationSchemaTablesBuilder {
+ catalog_names: StringBuilder::new(),
+ schema_names: StringBuilder::new(),
+ table_names: StringBuilder::new(),
+ table_types: StringBuilder::new(),
+ schema: self.schema.clone(),
+ }
+ }
+}
+
+impl PartitionStream for InformationSchemaTables {
+ fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+
+ fn execute(&self) -> SendableRecordBatchStream {
+ let mut builder = self.builder();
+ let config = self.config.clone();
+ Box::pin(RecordBatchStreamAdapter::new(
+ self.schema.clone(),
+ // TODO: Stream this
+ futures::stream::once(async move {
+ config.make_tables(&mut builder);
+ Ok(builder.finish())
+ }),
+ ))
+ }
+}
+
/// Builds the `information_schema.TABLE` table row by row
///
/// Columns are based on
<https://www.postgresql.org/docs/current/infoschema-columns.html>
struct InformationSchemaTablesBuilder {
+ schema: SchemaRef,
catalog_names: StringBuilder,
schema_names: StringBuilder,
table_names: StringBuilder,
@@ -279,15 +324,6 @@ struct InformationSchemaTablesBuilder {
}
impl InformationSchemaTablesBuilder {
- fn new() -> Self {
- Self {
- catalog_names: StringBuilder::new(),
- schema_names: StringBuilder::new(),
- table_names: StringBuilder::new(),
- table_types: StringBuilder::new(),
- }
- }
-
fn add_table(
&mut self,
catalog_name: impl AsRef<str>,
@@ -305,37 +341,65 @@ impl InformationSchemaTablesBuilder {
TableType::Temporary => "LOCAL TEMPORARY",
});
}
+
+ fn finish(&mut self) -> RecordBatch {
+ RecordBatch::try_new(
+ self.schema.clone(),
+ vec![
+ Arc::new(self.catalog_names.finish()),
+ Arc::new(self.schema_names.finish()),
+ Arc::new(self.table_names.finish()),
+ Arc::new(self.table_types.finish()),
+ ],
+ )
+ .unwrap()
+ }
+}
+
+struct InformationSchemaViews {
+ schema: SchemaRef,
+ config: InformationSchemaConfig,
}
-impl From<InformationSchemaTablesBuilder> for MemTable {
- fn from(value: InformationSchemaTablesBuilder) -> MemTable {
- let schema = Schema::new(vec![
+impl InformationSchemaViews {
+ fn new(config: InformationSchemaConfig) -> Self {
+ let schema = Arc::new(Schema::new(vec![
Field::new("table_catalog", DataType::Utf8, false),
Field::new("table_schema", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
- Field::new("table_type", DataType::Utf8, false),
- ]);
-
- let InformationSchemaTablesBuilder {
- mut catalog_names,
- mut schema_names,
- mut table_names,
- mut table_types,
- } = value;
-
- let schema = Arc::new(schema);
- let batch = RecordBatch::try_new(
- schema.clone(),
- vec![
- Arc::new(catalog_names.finish()),
- Arc::new(schema_names.finish()),
- Arc::new(table_names.finish()),
- Arc::new(table_types.finish()),
- ],
- )
- .unwrap();
+ Field::new("definition", DataType::Utf8, true),
+ ]));
+
+ Self { schema, config }
+ }
- MemTable::try_new(schema, vec![vec![batch]]).unwrap()
+ fn builder(&self) -> InformationSchemaViewBuilder {
+ InformationSchemaViewBuilder {
+ catalog_names: StringBuilder::new(),
+ schema_names: StringBuilder::new(),
+ table_names: StringBuilder::new(),
+ definitions: StringBuilder::new(),
+ schema: self.schema.clone(),
+ }
+ }
+}
+
+impl PartitionStream for InformationSchemaViews {
+ fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+
+ fn execute(&self) -> SendableRecordBatchStream {
+ let mut builder = self.builder();
+ let config = self.config.clone();
+ Box::pin(RecordBatchStreamAdapter::new(
+ self.schema.clone(),
+ // TODO: Stream this
+ futures::stream::once(async move {
+ config.make_views(&mut builder);
+ Ok(builder.finish())
+ }),
+ ))
}
}
@@ -343,6 +407,7 @@ impl From<InformationSchemaTablesBuilder> for MemTable {
///
/// Columns are based on
<https://www.postgresql.org/docs/current/infoschema-columns.html>
struct InformationSchemaViewBuilder {
+ schema: SchemaRef,
catalog_names: StringBuilder,
schema_names: StringBuilder,
table_names: StringBuilder,
@@ -350,15 +415,6 @@ struct InformationSchemaViewBuilder {
}
impl InformationSchemaViewBuilder {
- fn new() -> Self {
- Self {
- catalog_names: StringBuilder::new(),
- schema_names: StringBuilder::new(),
- table_names: StringBuilder::new(),
- definitions: StringBuilder::new(),
- }
- }
-
fn add_view(
&mut self,
catalog_name: impl AsRef<str>,
@@ -372,68 +428,56 @@ impl InformationSchemaViewBuilder {
self.table_names.append_value(table_name.as_ref());
self.definitions.append_option(definition.as_ref());
}
-}
-impl From<InformationSchemaViewBuilder> for MemTable {
- fn from(value: InformationSchemaViewBuilder) -> Self {
- let schema = Schema::new(vec![
- Field::new("table_catalog", DataType::Utf8, false),
- Field::new("table_schema", DataType::Utf8, false),
- Field::new("table_name", DataType::Utf8, false),
- Field::new("definition", DataType::Utf8, true),
- ]);
-
- let InformationSchemaViewBuilder {
- mut catalog_names,
- mut schema_names,
- mut table_names,
- mut definitions,
- } = value;
-
- let schema = Arc::new(schema);
- let batch = RecordBatch::try_new(
- schema.clone(),
+ fn finish(&mut self) -> RecordBatch {
+ RecordBatch::try_new(
+ self.schema.clone(),
vec![
- Arc::new(catalog_names.finish()),
- Arc::new(schema_names.finish()),
- Arc::new(table_names.finish()),
- Arc::new(definitions.finish()),
+ Arc::new(self.catalog_names.finish()),
+ Arc::new(self.schema_names.finish()),
+ Arc::new(self.table_names.finish()),
+ Arc::new(self.definitions.finish()),
],
)
- .unwrap();
-
- MemTable::try_new(schema, vec![vec![batch]]).unwrap()
+ .unwrap()
}
}
-/// Builds the `information_schema.COLUMNS` table row by row
-///
-/// Columns are based on
<https://www.postgresql.org/docs/current/infoschema-columns.html>
-struct InformationSchemaColumnsBuilder {
- catalog_names: StringBuilder,
- schema_names: StringBuilder,
- table_names: StringBuilder,
- column_names: StringBuilder,
- ordinal_positions: UInt64Builder,
- column_defaults: StringBuilder,
- is_nullables: StringBuilder,
- data_types: StringBuilder,
- character_maximum_lengths: UInt64Builder,
- character_octet_lengths: UInt64Builder,
- numeric_precisions: UInt64Builder,
- numeric_precision_radixes: UInt64Builder,
- numeric_scales: UInt64Builder,
- datetime_precisions: UInt64Builder,
- interval_types: StringBuilder,
+struct InformationSchemaColumns {
+ schema: SchemaRef,
+ config: InformationSchemaConfig,
}
-impl InformationSchemaColumnsBuilder {
- fn new() -> Self {
+impl InformationSchemaColumns {
+ fn new(config: InformationSchemaConfig) -> Self {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("table_catalog", DataType::Utf8, false),
+ Field::new("table_schema", DataType::Utf8, false),
+ Field::new("table_name", DataType::Utf8, false),
+ Field::new("column_name", DataType::Utf8, false),
+ Field::new("ordinal_position", DataType::UInt64, false),
+ Field::new("column_default", DataType::Utf8, true),
+ Field::new("is_nullable", DataType::Utf8, false),
+ Field::new("data_type", DataType::Utf8, false),
+ Field::new("character_maximum_length", DataType::UInt64, true),
+ Field::new("character_octet_length", DataType::UInt64, true),
+ Field::new("numeric_precision", DataType::UInt64, true),
+ Field::new("numeric_precision_radix", DataType::UInt64, true),
+ Field::new("numeric_scale", DataType::UInt64, true),
+ Field::new("datetime_precision", DataType::UInt64, true),
+ Field::new("interval_type", DataType::Utf8, true),
+ ]));
+
+ Self { schema, config }
+ }
+
+ fn builder(&self) -> InformationSchemaColumnsBuilder {
// StringBuilder requires providing an initial capacity, so
// pick 10 here arbitrarily as this is not performance
// critical code and the number of tables is unavailable here.
let default_capacity = 10;
- Self {
+
+ InformationSchemaColumnsBuilder {
catalog_names: StringBuilder::new(),
schema_names: StringBuilder::new(),
table_names: StringBuilder::new(),
@@ -449,9 +493,53 @@ impl InformationSchemaColumnsBuilder {
numeric_scales: UInt64Builder::with_capacity(default_capacity),
datetime_precisions:
UInt64Builder::with_capacity(default_capacity),
interval_types: StringBuilder::new(),
+ schema: self.schema.clone(),
}
}
+}
+
+impl PartitionStream for InformationSchemaColumns {
+ fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+
+ fn execute(&self) -> SendableRecordBatchStream {
+ let mut builder = self.builder();
+ let config = self.config.clone();
+ Box::pin(RecordBatchStreamAdapter::new(
+ self.schema.clone(),
+ // TODO: Stream this
+ futures::stream::once(async move {
+ config.make_columns(&mut builder);
+ Ok(builder.finish())
+ }),
+ ))
+ }
+}
+
+/// Builds the `information_schema.COLUMNS` table row by row
+///
+/// Columns are based on
<https://www.postgresql.org/docs/current/infoschema-columns.html>
+struct InformationSchemaColumnsBuilder {
+ schema: SchemaRef,
+ catalog_names: StringBuilder,
+ schema_names: StringBuilder,
+ table_names: StringBuilder,
+ column_names: StringBuilder,
+ ordinal_positions: UInt64Builder,
+ column_defaults: StringBuilder,
+ is_nullables: StringBuilder,
+ data_types: StringBuilder,
+ character_maximum_lengths: UInt64Builder,
+ character_octet_lengths: UInt64Builder,
+ numeric_precisions: UInt64Builder,
+ numeric_precision_radixes: UInt64Builder,
+ numeric_scales: UInt64Builder,
+ datetime_precisions: UInt64Builder,
+ interval_types: StringBuilder,
+}
+impl InformationSchemaColumnsBuilder {
#[allow(clippy::too_many_arguments)]
fn add_column(
&mut self,
@@ -547,111 +635,96 @@ impl InformationSchemaColumnsBuilder {
self.datetime_precisions.append_option(None);
self.interval_types.append_null();
}
-}
-impl From<InformationSchemaColumnsBuilder> for MemTable {
- fn from(value: InformationSchemaColumnsBuilder) -> MemTable {
- let schema = Schema::new(vec![
- Field::new("table_catalog", DataType::Utf8, false),
- Field::new("table_schema", DataType::Utf8, false),
- Field::new("table_name", DataType::Utf8, false),
- Field::new("column_name", DataType::Utf8, false),
- Field::new("ordinal_position", DataType::UInt64, false),
- Field::new("column_default", DataType::Utf8, true),
- Field::new("is_nullable", DataType::Utf8, false),
- Field::new("data_type", DataType::Utf8, false),
- Field::new("character_maximum_length", DataType::UInt64, true),
- Field::new("character_octet_length", DataType::UInt64, true),
- Field::new("numeric_precision", DataType::UInt64, true),
- Field::new("numeric_precision_radix", DataType::UInt64, true),
- Field::new("numeric_scale", DataType::UInt64, true),
- Field::new("datetime_precision", DataType::UInt64, true),
- Field::new("interval_type", DataType::Utf8, true),
- ]);
-
- let InformationSchemaColumnsBuilder {
- mut catalog_names,
- mut schema_names,
- mut table_names,
- mut column_names,
- mut ordinal_positions,
- mut column_defaults,
- mut is_nullables,
- mut data_types,
- mut character_maximum_lengths,
- mut character_octet_lengths,
- mut numeric_precisions,
- mut numeric_precision_radixes,
- mut numeric_scales,
- mut datetime_precisions,
- mut interval_types,
- } = value;
-
- let schema = Arc::new(schema);
- let batch = RecordBatch::try_new(
- schema.clone(),
+ fn finish(&mut self) -> RecordBatch {
+ RecordBatch::try_new(
+ self.schema.clone(),
vec![
- Arc::new(catalog_names.finish()),
- Arc::new(schema_names.finish()),
- Arc::new(table_names.finish()),
- Arc::new(column_names.finish()),
- Arc::new(ordinal_positions.finish()),
- Arc::new(column_defaults.finish()),
- Arc::new(is_nullables.finish()),
- Arc::new(data_types.finish()),
- Arc::new(character_maximum_lengths.finish()),
- Arc::new(character_octet_lengths.finish()),
- Arc::new(numeric_precisions.finish()),
- Arc::new(numeric_precision_radixes.finish()),
- Arc::new(numeric_scales.finish()),
- Arc::new(datetime_precisions.finish()),
- Arc::new(interval_types.finish()),
+ Arc::new(self.catalog_names.finish()),
+ Arc::new(self.schema_names.finish()),
+ Arc::new(self.table_names.finish()),
+ Arc::new(self.column_names.finish()),
+ Arc::new(self.ordinal_positions.finish()),
+ Arc::new(self.column_defaults.finish()),
+ Arc::new(self.is_nullables.finish()),
+ Arc::new(self.data_types.finish()),
+ Arc::new(self.character_maximum_lengths.finish()),
+ Arc::new(self.character_octet_lengths.finish()),
+ Arc::new(self.numeric_precisions.finish()),
+ Arc::new(self.numeric_precision_radixes.finish()),
+ Arc::new(self.numeric_scales.finish()),
+ Arc::new(self.datetime_precisions.finish()),
+ Arc::new(self.interval_types.finish()),
],
)
- .unwrap();
-
- MemTable::try_new(schema, vec![vec![batch]]).unwrap()
+ .unwrap()
}
}
-struct InformationSchemaDfSettingsBuilder {
- names: StringBuilder,
- settings: StringBuilder,
+struct InformationSchemaDfSettings {
+ schema: SchemaRef,
+ config: InformationSchemaConfig,
}
-impl InformationSchemaDfSettingsBuilder {
- fn new() -> Self {
- Self {
+impl InformationSchemaDfSettings {
+ fn new(config: InformationSchemaConfig) -> Self {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("name", DataType::Utf8, false),
+ Field::new("setting", DataType::Utf8, false),
+ ]));
+
+ Self { schema, config }
+ }
+
+ fn builder(&self) -> InformationSchemaDfSettingsBuilder {
+ InformationSchemaDfSettingsBuilder {
names: StringBuilder::new(),
settings: StringBuilder::new(),
+ schema: self.schema.clone(),
}
}
+}
- fn add_setting(&mut self, name: impl AsRef<str>, setting: impl AsRef<str>)
{
- self.names.append_value(name.as_ref());
- self.settings.append_value(setting.as_ref());
+impl PartitionStream for InformationSchemaDfSettings {
+ fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+
+ fn execute(&self) -> SendableRecordBatchStream {
+ let mut builder = self.builder();
+ let config = self.config.clone();
+ Box::pin(RecordBatchStreamAdapter::new(
+ self.schema.clone(),
+ // TODO: Stream this
+ futures::stream::once(async move {
+ // create a mem table with the names of tables
+ config.make_df_settings(&mut builder);
+ Ok(builder.finish())
+ }),
+ ))
}
}
-impl From<InformationSchemaDfSettingsBuilder> for MemTable {
- fn from(value: InformationSchemaDfSettingsBuilder) -> MemTable {
- let schema = Schema::new(vec![
- Field::new("name", DataType::Utf8, false),
- Field::new("setting", DataType::Utf8, false),
- ]);
+struct InformationSchemaDfSettingsBuilder {
+ schema: SchemaRef,
+ names: StringBuilder,
+ settings: StringBuilder,
+}
- let InformationSchemaDfSettingsBuilder {
- mut names,
- mut settings,
- } = value;
+impl InformationSchemaDfSettingsBuilder {
+ fn add_setting(&mut self, name: impl AsRef<str>, setting: impl AsRef<str>)
{
+ self.names.append_value(name.as_ref());
+ self.settings.append_value(setting.as_ref());
+ }
- let schema = Arc::new(schema);
- let batch = RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(names.finish()), Arc::new(settings.finish())],
+ fn finish(&mut self) -> RecordBatch {
+ RecordBatch::try_new(
+ self.schema.clone(),
+ vec![
+ Arc::new(self.names.finish()),
+ Arc::new(self.settings.finish()),
+ ],
)
- .unwrap();
-
- MemTable::try_new(schema, vec![vec![batch]]).unwrap()
+ .unwrap()
}
}
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index fc3e8f2d2..8610607b6 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -26,6 +26,7 @@ pub mod listing;
pub mod listing_table_factory;
pub mod memory;
pub mod object_store;
+pub mod streaming;
pub mod view;
use futures::Stream;
diff --git a/datafusion/core/src/datasource/streaming.rs
b/datafusion/core/src/datasource/streaming.rs
new file mode 100644
index 000000000..88de34efa
--- /dev/null
+++ b/datafusion/core/src/datasource/streaming.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A simplified [`TableProvider`] for streaming partitioned datasets
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::{Expr, TableType};
+
+use crate::datasource::TableProvider;
+use crate::execution::context::SessionState;
+use crate::physical_plan::streaming::StreamingTableExec;
+use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
+
+/// A partition that can be converted into a [`SendableRecordBatchStream`]
+pub trait PartitionStream: Send + Sync {
+ /// Returns the schema of this partition
+ fn schema(&self) -> &SchemaRef;
+
+ /// Returns a stream yielding this partitions values
+ fn execute(&self) -> SendableRecordBatchStream;
+}
+
+/// A [`TableProvider`] that streams a set of [`PartitionStream`]
+pub struct StreamingTable {
+ schema: SchemaRef,
+ partitions: Vec<Arc<dyn PartitionStream>>,
+}
+
+impl StreamingTable {
+ /// Try to create a new [`StreamingTable`] returning an error if the
schema is incorrect
+ pub fn try_new(
+ schema: SchemaRef,
+ partitions: Vec<Arc<dyn PartitionStream>>,
+ ) -> Result<Self> {
+ if !partitions.iter().all(|x| schema.contains(x.schema())) {
+ return Err(DataFusionError::Plan(
+ "Mismatch between schema and batches".to_string(),
+ ));
+ }
+
+ Ok(Self { schema, partitions })
+ }
+}
+
+#[async_trait]
+impl TableProvider for StreamingTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ async fn scan(
+ &self,
+ _ctx: &SessionState,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // TODO: push limit down
+ Ok(Arc::new(StreamingTableExec::try_new(
+ self.schema.clone(),
+ self.partitions.clone(),
+ projection,
+ )?))
+ }
+}
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index 2b4cd63c6..aa365ea45 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -648,6 +648,7 @@ pub mod repartition;
pub mod rewrite;
pub mod sorts;
pub mod stream;
+pub mod streaming;
pub mod udaf;
pub mod union;
pub mod values;
diff --git a/datafusion/core/src/physical_plan/streaming.rs
b/datafusion/core/src/physical_plan/streaming.rs
new file mode 100644
index 000000000..a6ab51bb1
--- /dev/null
+++ b/datafusion/core/src/physical_plan/streaming.rs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for streaming [`PartitionStream`]
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+use futures::stream::StreamExt;
+
+use datafusion_common::{DataFusionError, Result, Statistics};
+use datafusion_physical_expr::PhysicalSortExpr;
+
+use crate::datasource::streaming::PartitionStream;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use crate::physical_plan::{ExecutionPlan, Partitioning,
SendableRecordBatchStream};
+
+/// An [`ExecutionPlan`] for [`PartitionStream`]
+pub struct StreamingTableExec {
+ partitions: Vec<Arc<dyn PartitionStream>>,
+ projection: Option<Arc<[usize]>>,
+ projected_schema: SchemaRef,
+}
+
+impl StreamingTableExec {
+ /// Try to create a new [`StreamingTableExec`] returning an error if the
schema is incorrect
+ pub fn try_new(
+ schema: SchemaRef,
+ partitions: Vec<Arc<dyn PartitionStream>>,
+ projection: Option<&Vec<usize>>,
+ ) -> Result<Self> {
+ if !partitions.iter().all(|x| schema.contains(x.schema())) {
+ return Err(DataFusionError::Plan(
+ "Mismatch between schema and batches".to_string(),
+ ));
+ }
+
+ let projected_schema = match projection {
+ Some(p) => Arc::new(schema.project(p)?),
+ None => schema,
+ };
+
+ Ok(Self {
+ partitions,
+ projected_schema,
+ projection: projection.cloned().map(Into::into),
+ })
+ }
+}
+
+impl std::fmt::Debug for StreamingTableExec {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
+ }
+}
+
+#[async_trait]
+impl ExecutionPlan for StreamingTableExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.projected_schema.clone()
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(self.partitions.len())
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Err(DataFusionError::Internal(format!(
+ "Children cannot be replaced in {:?}",
+ self
+ )))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ let stream = self.partitions[partition].execute();
+ Ok(match self.projection.clone() {
+ Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
+ self.projected_schema.clone(),
+ stream.map(move |x| x.and_then(|b|
b.project(projection.as_ref()))),
+ )),
+ None => stream,
+ })
+ }
+
+ fn statistics(&self) -> Statistics {
+ Default::default()
+ }
+}