This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a29f2bec21 Validate partitions columns in `CREATE EXTERNAL TABLE` if
table already exists. (#9912)
a29f2bec21 is described below
commit a29f2bec21f92ef56b80b110c93761004d8043f4
Author: Mohamed Abdeen <[email protected]>
AuthorDate: Fri Apr 5 20:43:36 2024 +0200
Validate partitions columns in `CREATE EXTERNAL TABLE` if table already
exists. (#9912)
* prevent panic
* initial version, bad code
* some error handling
* Some slt tests
* docs and minor refactors
* cleaning up
* fix tests
* clear err message for single-file partitioned tables
* typo
* test invalid/mixed partitions on disk
* ensure order in error msg for testing
---
datafusion/core/src/datasource/listing/table.rs | 107 +++++++++++++++++++++
.../core/src/datasource/listing_table_factory.rs | 2 +
.../datasource/physical_plan/file_scan_config.rs | 12 ++-
datafusion/core/src/execution/context/mod.rs | 2 +-
.../test_files/create_external_table.slt | 93 +++++++++++++++++-
5 files changed, 212 insertions(+), 4 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 5ef7b6241b..6625abd650 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -61,6 +61,7 @@ use datafusion_physical_expr::{
use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
+use itertools::Itertools;
use object_store::ObjectStore;
/// Configuration for creating a [`ListingTable`]
@@ -438,6 +439,112 @@ impl ListingOptions {
self.format.infer_schema(state, &store, &files).await
}
+
+ /// Infers the partition columns stored in `LOCATION` and compares
+ /// them with the columns provided in `PARTITIONED BY` to help prevent
+ /// accidental corrupts of partitioned tables.
+ ///
+ /// Allows specifying partial partitions.
+ pub async fn validate_partitions(
+ &self,
+ state: &SessionState,
+ table_path: &ListingTableUrl,
+ ) -> Result<()> {
+ if self.table_partition_cols.is_empty() {
+ return Ok(());
+ }
+
+ if !table_path.is_collection() {
+ return plan_err!(
+ "Can't create a partitioned table backed by a single file, \
+ perhaps the URL is missing a trailing slash?"
+ );
+ }
+
+ let inferred = self.infer_partitions(state, table_path).await?;
+
+ // no partitioned files found on disk
+ if inferred.is_empty() {
+ return Ok(());
+ }
+
+ let table_partition_names = self
+ .table_partition_cols
+ .iter()
+ .map(|(col_name, _)| col_name.clone())
+ .collect_vec();
+
+ if inferred.len() < table_partition_names.len() {
+ return plan_err!(
+ "Inferred partitions to be {:?}, but got {:?}",
+ inferred,
+ table_partition_names
+ );
+ }
+
+ // match prefix to allow creating tables with partial partitions
+ for (idx, col) in table_partition_names.iter().enumerate() {
+ if &inferred[idx] != col {
+ return plan_err!(
+ "Inferred partitions to be {:?}, but got {:?}",
+ inferred,
+ table_partition_names
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Infer the partitioning at the given path on the provided object store.
+ /// For performance reasons, it doesn't read all the files on disk
+ /// and therefore may fail to detect invalid partitioning.
+ async fn infer_partitions(
+ &self,
+ state: &SessionState,
+ table_path: &ListingTableUrl,
+ ) -> Result<Vec<String>> {
+ let store = state.runtime_env().object_store(table_path)?;
+
+ // only use 10 files for inference
+ // This can fail to detect inconsistent partition keys
+ // A DFS traversal approach of the store can help here
+ let files: Vec<_> = table_path
+ .list_all_files(state, store.as_ref(), &self.file_extension)
+ .await?
+ .take(10)
+ .try_collect()
+ .await?;
+
+ let stripped_path_parts = files.iter().map(|file| {
+ table_path
+ .strip_prefix(&file.location)
+ .unwrap()
+ .collect_vec()
+ });
+
+ let partition_keys = stripped_path_parts
+ .map(|path_parts| {
+ path_parts
+ .into_iter()
+ .rev()
+ .skip(1) // get parents only; skip the file itself
+ .rev()
+ .map(|s| s.split('=').take(1).collect())
+ .collect_vec()
+ })
+ .collect_vec();
+
+ match partition_keys.into_iter().all_equal_value() {
+ Ok(v) => Ok(v),
+ Err(None) => Ok(vec![]),
+ Err(Some(diff)) => {
+ let mut sorted_diff = [diff.0, diff.1];
+ sorted_diff.sort();
+ plan_err!("Found mixed partition values on disk {:?}",
sorted_diff)
+ }
+ }
+ }
}
/// Reads data from one or more files via an
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index cbadf163ce..1a0eb34d12 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -137,6 +137,8 @@ impl TableProviderFactory for ListingTableFactory {
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(cmd.order_exprs.clone());
+ options.validate_partitions(state, &table_path).await?;
+
let resolved_schema = match provided_schema {
None => options.infer_schema(state, &table_path).await?,
Some(s) => s,
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 370ca91a0b..1ea411cb6f 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -32,7 +32,7 @@ use arrow::datatypes::{ArrowNativeType, UInt16Type};
use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::stats::Precision;
-use datafusion_common::{exec_err, ColumnStatistics, Statistics};
+use datafusion_common::{exec_err, ColumnStatistics, DataFusionError,
Statistics};
use datafusion_physical_expr::LexOrdering;
use log::warn;
@@ -256,9 +256,17 @@ impl PartitionColumnProjector {
file_batch.columns().len()
);
}
+
let mut cols = file_batch.columns().to_vec();
for &(pidx, sidx) in &self.projected_partition_indexes {
- let mut partition_value = Cow::Borrowed(&partition_values[pidx]);
+ let p_value =
+ partition_values
+ .get(pidx)
+ .ok_or(DataFusionError::Execution(
+ "Invalid partitioning found on disk".to_string(),
+ ))?;
+
+ let mut partition_value = Cow::Borrowed(p_value);
// check if user forgot to dict-encode the partition value
let field = self.projected_schema.field(sidx);
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index 1a582be301..31a474bd21 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -1113,7 +1113,7 @@ impl SessionContext {
table_ref: impl Into<TableReference>,
provider: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
- let table_ref = table_ref.into();
+ let table_ref: TableReference = table_ref.into();
let table = table_ref.table().to_owned();
self.state
.read()
diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt
b/datafusion/sqllogictest/test_files/create_external_table.slt
index a200217af6..8aeeb06c19 100644
--- a/datafusion/sqllogictest/test_files/create_external_table.slt
+++ b/datafusion/sqllogictest/test_files/create_external_table.slt
@@ -113,4 +113,95 @@ statement error DataFusion error: Invalid or Unsupported
Configuration: Config v
CREATE EXTERNAL TABLE csv_table (column1 int)
STORED AS CSV
LOCATION 'foo.csv'
-OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
\ No newline at end of file
+OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
+
+# Partitioned table on a single file
+query error DataFusion error: Error during planning: Can't create a
partitioned table backed by a single file, perhaps the URL is missing a
trailing slash\?
+CREATE EXTERNAL TABLE single_file_partition(c1 int)
+PARTITIONED BY (p2 string, p1 string)
+STORED AS CSV
+LOCATION 'foo.csv';
+
+# Wrong partition order error
+
+statement ok
+CREATE EXTERNAL TABLE partitioned (c1 int)
+PARTITIONED BY (p1 string, p2 string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
+
+query ITT
+INSERT INTO partitioned VALUES (1, 'x', 'y');
+----
+1
+
+query error DataFusion error: Error during planning: Inferred partitions to be
\["p1", "p2"\], but got \["p2", "p1"\]
+CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int)
+PARTITIONED BY (p2 string, p1 string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
+
+statement error DataFusion error: Error during planning: Inferred partitions
to be \["p1", "p2"\], but got \["p2"\]
+CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int)
+PARTITIONED BY (p2 string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
+
+# But allows partial partition selection
+
+statement ok
+CREATE EXTERNAL TABLE partial_partitioned (c1 int)
+PARTITIONED BY (p1 string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
+
+query IT
+SELECT * FROM partial_partitioned;
+----
+1 x
+
+statement ok
+CREATE EXTERNAL TABLE inner_partition (c1 int)
+PARTITIONED BY (p2 string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/bad_partitioning/p1=x/';
+
+query IT
+SELECT * FROM inner_partition;
+----
+1 y
+
+# Simulate manual creation of invalid (mixed) partitions on disk
+
+statement ok
+CREATE EXTERNAL TABLE test(name string)
+PARTITIONED BY (year string, month string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';
+
+statement ok
+-- passes the partition check since the previous statement didn't write to disk
+CREATE EXTERNAL TABLE test2(name string)
+PARTITIONED BY (month string, year string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';
+
+query TTT
+-- creates year -> month partitions
+INSERT INTO test VALUES('name', '2024', '03');
+----
+1
+
+query TTT
+-- creates month -> year partitions.
+-- now table have both partitions (year -> month and month -> year)
+INSERT INTO test2 VALUES('name', '2024', '03');
+----
+1
+
+statement error DataFusion error: Error during planning: Found mixed partition
values on disk \[\["month", "year"\], \["year", "month"\]\]
+-- fails to infer as partitions are not consistent
+CREATE EXTERNAL TABLE test3(name string)
+PARTITIONED BY (month string, year string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';