MohamedAbdeen21 commented on code in PR #9912:
URL: https://github.com/apache/arrow-datafusion/pull/9912#discussion_r1548582519


##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -438,6 +439,111 @@ impl ListingOptions {
 
         self.format.infer_schema(state, &store, &files).await
     }
+
+    /// Infers the partition columns stored in `LOCATION` and compares
+    /// them with the columns provided in `PARTITIONED BY` to help prevent
+    /// accidental corrupts of partitioned tables.
+    ///
+    /// Allows specifying partial partitions.
+    pub async fn validate_partitions(
+        &self,
+        state: &SessionState,
+        table_path: &ListingTableUrl,
+    ) -> Result<()> {
+        if self.table_partition_cols.is_empty() {
+            return Ok(());
+        }
+
+        if !table_path.is_collection() {
+            return plan_err!(
+                "Can't create a partitioned table backed by a single file, \
+                perhaps the URL is missing a trailing slash?"
+            );
+        }
+
+        let inferred = self.infer_partitions(state, table_path).await?;
+
+        // no partitioned files found on disk
+        if inferred.is_empty() {
+            return Ok(());
+        }
+
+        let table_partition_names = self
+            .table_partition_cols
+            .iter()
+            .map(|(col_name, _)| col_name.clone())
+            .collect_vec();
+
+        if inferred.len() < table_partition_names.len() {
+            return plan_err!(
+                "Inferred partitions to be {:?}, but got {:?}",
+                inferred,
+                table_partition_names
+            );
+        }
+
+        // match prefix to allow creating a tables using
+        // some of the partition keys
+        for (idx, col) in table_partition_names.iter().enumerate() {
+            if &inferred[idx] != col {
+                return plan_err!(
+                    "Inferred partitions to be {:?}, but got {:?}",
+                    inferred,
+                    table_partition_names
+                );
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Infer the partitioning at the given path on the provided object store.
+    /// For performance reasons, it doesn't read all the files on disk
+    /// and therefore may fail to detect invalid partitioning.
+    async fn infer_partitions(
+        &self,
+        state: &SessionState,
+        table_path: &ListingTableUrl,
+    ) -> Result<Vec<String>> {
+        let store = state.runtime_env().object_store(table_path)?;
+
+        // only use 10 files for inference
+        // This can fail to detect inconsistent partition keys
+        // A DFS traversal approach of the store can help here
+        let files: Vec<_> = table_path
+            .list_all_files(state, store.as_ref(), &self.file_extension)
+            .await?
+            .take(10)
+            .try_collect()
+            .await?;
+
+        let stripped_path_parts = files.iter().map(|file| {
+            table_path
+                .strip_prefix(&file.location)
+                .unwrap()
+                .collect_vec()
+        });
+
+        let partition_keys = stripped_path_parts
+            .map(|path_parts| {
+                path_parts
+                    .into_iter()
+                    .rev()
+                    .skip(1) // get parents only; skip the file itself
+                    .rev()
+                    .map(|s| s.split('=').take(1).collect())
+                    .collect_vec()
+            })
+            .collect_vec();
+
+        match partition_keys.into_iter().all_equal_value() {
+            Ok(v) => Ok(v),
+            Err(None) => Ok(vec![]),
+            Err(Some(diff)) => {
+                plan_err!("Found mixed partition values on disk {:?}", diff)

Review Comment:
   That's a behavior we briefly discussed in the issue, I generally dislike the 
idea of having two identical tables under the same path but with different 
partitions, as it can lead to confusion.
   
   I think the only cases this error is triggered are either:
   
   1. Partitions are manually created
   
   2. Something like this:
   
   ```sql
   -- an empty/new directory
   CREATE EXTERNAL TABLE test(name string) PARTITIONED BY (year string, month 
string) STORED AS parquet LOCATION 'tmp/';
   
   -- passes the check since the previous create table doesn't write to disk
   CREATE EXTERNAL TABLE test2(name string) PARTITIONED BY (month string, year 
string) STORED AS parquet LOCATION 'tmp/';
   
   -- creates year -> month partitions
   INSERT INTO test VALUES('name', '2024', '03');
   
   -- now table have both partitions (year -> month and month -> year)
   INSERT INTO test2 VALUES('name', '2024', '03');
   
   -- fails to infer as partitions are not consistent
   CREATE EXTERNAL TABLE test3(name string) PARTITIONED BY (month string, year 
string) STORED AS parquet LOCATION 'tmp/';
   ```
   
   I can add the query as a test, but I just found the query pattern very 
abnormal and unlikely to happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to