This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a29f2bec21 Validate partitions columns in `CREATE EXTERNAL TABLE` if 
table already exists. (#9912)
a29f2bec21 is described below

commit a29f2bec21f92ef56b80b110c93761004d8043f4
Author: Mohamed Abdeen <[email protected]>
AuthorDate: Fri Apr 5 20:43:36 2024 +0200

    Validate partitions columns in `CREATE EXTERNAL TABLE` if table already 
exists. (#9912)
    
    * prevent panic
    
    * initial version, bad code
    
    * some error handling
    
    * Some slt tests
    
    * docs and minor refactors
    
    * cleaning up
    
    * fix tests
    
    * clear err message for single-file partitioned tables
    
    * typo
    
    * test invalid/mixed partitions on disk
    
    * ensure order in error msg for testing
---
 datafusion/core/src/datasource/listing/table.rs    | 107 +++++++++++++++++++++
 .../core/src/datasource/listing_table_factory.rs   |   2 +
 .../datasource/physical_plan/file_scan_config.rs   |  12 ++-
 datafusion/core/src/execution/context/mod.rs       |   2 +-
 .../test_files/create_external_table.slt           |  93 +++++++++++++++++-
 5 files changed, 212 insertions(+), 4 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 5ef7b6241b..6625abd650 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -61,6 +61,7 @@ use datafusion_physical_expr::{
 
 use async_trait::async_trait;
 use futures::{future, stream, StreamExt, TryStreamExt};
+use itertools::Itertools;
 use object_store::ObjectStore;
 
 /// Configuration for creating a [`ListingTable`]
@@ -438,6 +439,112 @@ impl ListingOptions {
 
         self.format.infer_schema(state, &store, &files).await
     }
+
+    /// Infers the partition columns stored in `LOCATION` and compares
+    /// them with the columns provided in `PARTITIONED BY` to help prevent
+    /// accidental corrupts of partitioned tables.
+    ///
+    /// Allows specifying partial partitions.
+    pub async fn validate_partitions(
+        &self,
+        state: &SessionState,
+        table_path: &ListingTableUrl,
+    ) -> Result<()> {
+        if self.table_partition_cols.is_empty() {
+            return Ok(());
+        }
+
+        if !table_path.is_collection() {
+            return plan_err!(
+                "Can't create a partitioned table backed by a single file, \
+                perhaps the URL is missing a trailing slash?"
+            );
+        }
+
+        let inferred = self.infer_partitions(state, table_path).await?;
+
+        // no partitioned files found on disk
+        if inferred.is_empty() {
+            return Ok(());
+        }
+
+        let table_partition_names = self
+            .table_partition_cols
+            .iter()
+            .map(|(col_name, _)| col_name.clone())
+            .collect_vec();
+
+        if inferred.len() < table_partition_names.len() {
+            return plan_err!(
+                "Inferred partitions to be {:?}, but got {:?}",
+                inferred,
+                table_partition_names
+            );
+        }
+
+        // match prefix to allow creating tables with partial partitions
+        for (idx, col) in table_partition_names.iter().enumerate() {
+            if &inferred[idx] != col {
+                return plan_err!(
+                    "Inferred partitions to be {:?}, but got {:?}",
+                    inferred,
+                    table_partition_names
+                );
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Infer the partitioning at the given path on the provided object store.
+    /// For performance reasons, it doesn't read all the files on disk
+    /// and therefore may fail to detect invalid partitioning.
+    async fn infer_partitions(
+        &self,
+        state: &SessionState,
+        table_path: &ListingTableUrl,
+    ) -> Result<Vec<String>> {
+        let store = state.runtime_env().object_store(table_path)?;
+
+        // only use 10 files for inference
+        // This can fail to detect inconsistent partition keys
+        // A DFS traversal approach of the store can help here
+        let files: Vec<_> = table_path
+            .list_all_files(state, store.as_ref(), &self.file_extension)
+            .await?
+            .take(10)
+            .try_collect()
+            .await?;
+
+        let stripped_path_parts = files.iter().map(|file| {
+            table_path
+                .strip_prefix(&file.location)
+                .unwrap()
+                .collect_vec()
+        });
+
+        let partition_keys = stripped_path_parts
+            .map(|path_parts| {
+                path_parts
+                    .into_iter()
+                    .rev()
+                    .skip(1) // get parents only; skip the file itself
+                    .rev()
+                    .map(|s| s.split('=').take(1).collect())
+                    .collect_vec()
+            })
+            .collect_vec();
+
+        match partition_keys.into_iter().all_equal_value() {
+            Ok(v) => Ok(v),
+            Err(None) => Ok(vec![]),
+            Err(Some(diff)) => {
+                let mut sorted_diff = [diff.0, diff.1];
+                sorted_diff.sort();
+                plan_err!("Found mixed partition values on disk {:?}", 
sorted_diff)
+            }
+        }
+    }
 }
 
 /// Reads data from one or more files via an
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index cbadf163ce..1a0eb34d12 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -137,6 +137,8 @@ impl TableProviderFactory for ListingTableFactory {
             .with_table_partition_cols(table_partition_cols)
             .with_file_sort_order(cmd.order_exprs.clone());
 
+        options.validate_partitions(state, &table_path).await?;
+
         let resolved_schema = match provided_schema {
             None => options.infer_schema(state, &table_path).await?,
             Some(s) => s,
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 370ca91a0b..1ea411cb6f 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -32,7 +32,7 @@ use arrow::datatypes::{ArrowNativeType, UInt16Type};
 use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions};
 use arrow_schema::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::stats::Precision;
-use datafusion_common::{exec_err, ColumnStatistics, Statistics};
+use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, 
Statistics};
 use datafusion_physical_expr::LexOrdering;
 
 use log::warn;
@@ -256,9 +256,17 @@ impl PartitionColumnProjector {
                 file_batch.columns().len()
             );
         }
+
         let mut cols = file_batch.columns().to_vec();
         for &(pidx, sidx) in &self.projected_partition_indexes {
-            let mut partition_value = Cow::Borrowed(&partition_values[pidx]);
+            let p_value =
+                partition_values
+                    .get(pidx)
+                    .ok_or(DataFusionError::Execution(
+                        "Invalid partitioning found on disk".to_string(),
+                    ))?;
+
+            let mut partition_value = Cow::Borrowed(p_value);
 
             // check if user forgot to dict-encode the partition value
             let field = self.projected_schema.field(sidx);
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index 1a582be301..31a474bd21 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -1113,7 +1113,7 @@ impl SessionContext {
         table_ref: impl Into<TableReference>,
         provider: Arc<dyn TableProvider>,
     ) -> Result<Option<Arc<dyn TableProvider>>> {
-        let table_ref = table_ref.into();
+        let table_ref: TableReference = table_ref.into();
         let table = table_ref.table().to_owned();
         self.state
             .read()
diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt 
b/datafusion/sqllogictest/test_files/create_external_table.slt
index a200217af6..8aeeb06c19 100644
--- a/datafusion/sqllogictest/test_files/create_external_table.slt
+++ b/datafusion/sqllogictest/test_files/create_external_table.slt
@@ -113,4 +113,95 @@ statement error DataFusion error: Invalid or Unsupported 
Configuration: Config v
 CREATE EXTERNAL TABLE csv_table (column1 int)
 STORED AS CSV
 LOCATION 'foo.csv'
-OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
\ No newline at end of file
+OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
+
+# Partitioned table on a single file
+query error DataFusion error: Error during planning: Can't create a 
partitioned table backed by a single file, perhaps the URL is missing a 
trailing slash\?
+CREATE EXTERNAL TABLE single_file_partition(c1 int)
+PARTITIONED BY (p2 string, p1 string)
+STORED AS CSV
+LOCATION 'foo.csv';
+
+# Wrong partition order error
+
+statement ok
+CREATE EXTERNAL TABLE partitioned (c1 int)
+PARTITIONED BY (p1 string, p2 string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
+
+query ITT
+INSERT INTO partitioned VALUES (1, 'x', 'y');
+----
+1
+
+query error DataFusion error: Error during planning: Inferred partitions to be 
\["p1", "p2"\], but got \["p2", "p1"\]
+CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int)
+PARTITIONED BY (p2 string, p1 string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
+
+statement error DataFusion error: Error during planning: Inferred partitions 
to be \["p1", "p2"\], but got \["p2"\]
+CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int)
+PARTITIONED BY (p2 string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
+
+# But allows partial partition selection
+
+statement ok
+CREATE EXTERNAL TABLE partial_partitioned (c1 int)
+PARTITIONED BY (p1 string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';
+
+query IT
+SELECT * FROM partial_partitioned;
+----
+1 x
+
+statement ok
+CREATE EXTERNAL TABLE inner_partition (c1 int)
+PARTITIONED BY (p2 string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/bad_partitioning/p1=x/';
+
+query IT
+SELECT * FROM inner_partition;
+----
+1 y
+
+# Simulate manual creation of invalid (mixed) partitions on disk
+
+statement ok
+CREATE EXTERNAL TABLE test(name string)
+PARTITIONED BY (year string, month string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';
+
+statement ok
+-- passes the partition check since the previous statement didn't write to disk
+CREATE EXTERNAL TABLE test2(name string)
+PARTITIONED BY (month string, year string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';
+
+query TTT
+-- creates year -> month partitions
+INSERT INTO test VALUES('name', '2024', '03');
+----
+1
+
+query TTT
+-- creates month -> year partitions.
+-- now table have both partitions (year -> month and month -> year)
+INSERT INTO test2 VALUES('name', '2024', '03');
+----
+1
+
+statement error DataFusion error: Error during planning: Found mixed partition 
values on disk \[\["month", "year"\], \["year", "month"\]\]
+-- fails to infer as partitions are not consistent
+CREATE EXTERNAL TABLE test3(name string)
+PARTITIONED BY (month string, year string)
+STORED AS parquet
+LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';

Reply via email to