This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 66aead7f51 Allow DynamicFileCatalog support to query partitioned file
(#12683)
66aead7f51 is described below
commit 66aead7f51624d532ed9b3d011c5378d3643f624
Author: Jax Liu <[email protected]>
AuthorDate: Thu Oct 3 06:04:50 2024 +0800
Allow DynamicFileCatalog support to query partitioned file (#12683)
* support to query partitioned table for dynamic file catalog
* cargo clippy
* split partitions inferring to another function
---
datafusion/core/src/datasource/dynamic_file.rs | 13 +-
datafusion/core/src/datasource/listing/table.rs | 36 ++++-
.../sqllogictest/test_files/dynamic_file.slt | 167 ++++++++++++++++++++-
3 files changed, 208 insertions(+), 8 deletions(-)
diff --git a/datafusion/core/src/datasource/dynamic_file.rs
b/datafusion/core/src/datasource/dynamic_file.rs
index 3c409af297..6654d0871c 100644
--- a/datafusion/core/src/datasource/dynamic_file.rs
+++ b/datafusion/core/src/datasource/dynamic_file.rs
@@ -69,11 +69,18 @@ impl UrlTableFactory for DynamicListTableFactory {
.ok_or_else(|| plan_datafusion_err!("get current SessionStore
error"))?;
match ListingTableConfig::new(table_url.clone())
- .infer(state)
+ .infer_options(state)
.await
{
- Ok(cfg) => ListingTable::try_new(cfg)
- .map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>)),
+ Ok(cfg) => {
+ let cfg = cfg
+ .infer_partitions_from_path(state)
+ .await?
+ .infer_schema(state)
+ .await?;
+ ListingTable::try_new(cfg)
+ .map(|table| Some(Arc::new(table) as Arc<dyn
TableProvider>))
+ }
Err(_) => Ok(None),
}
}
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 3eb8eed9de..a9c6aec175 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -33,7 +33,7 @@ use crate::datasource::{
};
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
@@ -192,6 +192,38 @@ impl ListingTableConfig {
pub async fn infer(self, state: &SessionState) -> Result<Self> {
self.infer_options(state).await?.infer_schema(state).await
}
+
+ /// Infer the partition columns from the path. Requires `self.options` to
be set prior to using.
+ pub async fn infer_partitions_from_path(self, state: &SessionState) ->
Result<Self> {
+ match self.options {
+ Some(options) => {
+ let Some(url) = self.table_paths.first() else {
+ return config_err!("No table path found");
+ };
+ let partitions = options
+ .infer_partitions(state, url)
+ .await?
+ .into_iter()
+ .map(|col_name| {
+ (
+ col_name,
+ DataType::Dictionary(
+ Box::new(DataType::UInt16),
+ Box::new(DataType::Utf8),
+ ),
+ )
+ })
+ .collect::<Vec<_>>();
+ let options = options.with_table_partition_cols(partitions);
+ Ok(Self {
+ table_paths: self.table_paths,
+ file_schema: self.file_schema,
+ options: Some(options),
+ })
+ }
+ None => config_err!("No `ListingOptions` set for inferring
schema"),
+ }
+ }
}
/// Options for creating a [`ListingTable`]
@@ -505,7 +537,7 @@ impl ListingOptions {
/// Infer the partitioning at the given path on the provided object store.
/// For performance reasons, it doesn't read all the files on disk
/// and therefore may fail to detect invalid partitioning.
- async fn infer_partitions(
+ pub(crate) async fn infer_partitions(
&self,
state: &SessionState,
table_path: &ListingTableUrl,
diff --git a/datafusion/sqllogictest/test_files/dynamic_file.slt
b/datafusion/sqllogictest/test_files/dynamic_file.slt
index e177fd3de2..69f9a43ad4 100644
--- a/datafusion/sqllogictest/test_files/dynamic_file.slt
+++ b/datafusion/sqllogictest/test_files/dynamic_file.slt
@@ -25,9 +25,170 @@ SELECT * FROM
'../core/tests/data/partitioned_table_arrow/part=123' ORDER BY f0;
1 foo true
2 bar false
-# dynamic file query doesn't support partitioned table
-statement error DataFusion error: Error during planning: table
'datafusion.public.../core/tests/data/partitioned_table_arrow' not found
-SELECT * FROM '../core/tests/data/partitioned_table_arrow' ORDER BY f0;
+# Read partitioned file
+statement ok
+CREATE TABLE src_table_1 (
+ int_col INT,
+ string_col TEXT,
+ bigint_col BIGINT,
+ partition_col INT
+) AS VALUES
+(1, 'aaa', 100, 1),
+(2, 'bbb', 200, 1),
+(3, 'ccc', 300, 1),
+(4, 'ddd', 400, 1);
+
+statement ok
+CREATE TABLE src_table_2 (
+ int_col INT,
+ string_col TEXT,
+ bigint_col BIGINT,
+ partition_col INT
+) AS VALUES
+(5, 'eee', 500, 2),
+(6, 'fff', 600, 2),
+(7, 'ggg', 700, 2),
+(8, 'hhh', 800, 2);
+
+# Read partitioned csv file
+
+query I
+COPY src_table_1 TO 'test_files/scratch/dynamic_file/csv_partitions'
+STORED AS CSV
+PARTITIONED BY (partition_col);
+----
+4
+
+query I
+COPY src_table_2 TO 'test_files/scratch/dynamic_file/csv_partitions'
+STORED AS CSV
+PARTITIONED BY (partition_col);
+----
+4
+
+query ITIT rowsort
+SELECT int_col, string_col, bigint_col, partition_col FROM
'test_files/scratch/dynamic_file/csv_partitions';
+----
+1 aaa 100 1
+2 bbb 200 1
+3 ccc 300 1
+4 ddd 400 1
+5 eee 500 2
+6 fff 600 2
+7 ggg 700 2
+8 hhh 800 2
+
+# Read partitioned json file
+
+query I
+COPY src_table_1 TO 'test_files/scratch/dynamic_file/json_partitions'
+STORED AS JSON
+PARTITIONED BY (partition_col);
+----
+4
+
+query I
+COPY src_table_2 TO 'test_files/scratch/dynamic_file/json_partitions'
+STORED AS JSON
+PARTITIONED BY (partition_col);
+----
+4
+
+query ITIT rowsort
+SELECT int_col, string_col, bigint_col, partition_col FROM
'test_files/scratch/dynamic_file/json_partitions';
+----
+1 aaa 100 1
+2 bbb 200 1
+3 ccc 300 1
+4 ddd 400 1
+5 eee 500 2
+6 fff 600 2
+7 ggg 700 2
+8 hhh 800 2
+
+# Read partitioned arrow file
+
+query I
+COPY src_table_1 TO 'test_files/scratch/dynamic_file/arrow_partitions'
+STORED AS ARROW
+PARTITIONED BY (partition_col);
+----
+4
+
+query I
+COPY src_table_2 TO 'test_files/scratch/dynamic_file/arrow_partitions'
+STORED AS ARROW
+PARTITIONED BY (partition_col);
+----
+4
+
+query ITIT rowsort
+SELECT int_col, string_col, bigint_col, partition_col FROM
'test_files/scratch/dynamic_file/arrow_partitions';
+----
+1 aaa 100 1
+2 bbb 200 1
+3 ccc 300 1
+4 ddd 400 1
+5 eee 500 2
+6 fff 600 2
+7 ggg 700 2
+8 hhh 800 2
+
+# Read partitioned parquet file
+
+query I
+COPY src_table_1 TO 'test_files/scratch/dynamic_file/parquet_partitions'
+STORED AS PARQUET
+PARTITIONED BY (partition_col);
+----
+4
+
+query I
+COPY src_table_2 TO 'test_files/scratch/dynamic_file/parquet_partitions'
+STORED AS PARQUET
+PARTITIONED BY (partition_col);
+----
+4
+
+query ITIT rowsort
+select * from 'test_files/scratch/dynamic_file/parquet_partitions';
+----
+1 aaa 100 1
+2 bbb 200 1
+3 ccc 300 1
+4 ddd 400 1
+5 eee 500 2
+6 fff 600 2
+7 ggg 700 2
+8 hhh 800 2
+
+# Read partitioned parquet file with multiple partition columns
+
+query I
+COPY src_table_1 TO 'test_files/scratch/dynamic_file/nested_partition'
+STORED AS PARQUET
+PARTITIONED BY (partition_col, string_col);
+----
+4
+
+query I
+COPY src_table_2 TO 'test_files/scratch/dynamic_file/nested_partition'
+STORED AS PARQUET
+PARTITIONED BY (partition_col, string_col);
+----
+4
+
+query IITT rowsort
+select * from 'test_files/scratch/dynamic_file/nested_partition';
+----
+1 100 1 aaa
+2 200 1 bbb
+3 300 1 ccc
+4 400 1 ddd
+5 500 2 eee
+6 600 2 fff
+7 700 2 ggg
+8 800 2 hhh
# read avro file
query IT
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]