Jefffrey commented on code in PR #9912:
URL: https://github.com/apache/arrow-datafusion/pull/9912#discussion_r1553267681


##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -438,6 +439,112 @@ impl ListingOptions {
 
         self.format.infer_schema(state, &store, &files).await
     }
+
+    /// Infers the partition columns stored in `LOCATION` and compares
+    /// them with the columns provided in `PARTITIONED BY` to help prevent
+    /// accidental corrupts of partitioned tables.
+    ///
+    /// Allows specifying partial partitions.
+    pub async fn validate_partitions(
+        &self,
+        state: &SessionState,
+        table_path: &ListingTableUrl,
+    ) -> Result<()> {
+        if self.table_partition_cols.is_empty() {
+            return Ok(());
+        }
+
+        if !table_path.is_collection() {
+            return plan_err!(
+                "Can't create a partitioned table backed by a single file, \
+                perhaps the URL is missing a trailing slash?"
+            );
+        }
+
+        let inferred = self.infer_partitions(state, table_path).await?;
+
+        // no partitioned files found on disk
+        if inferred.is_empty() {
+            return Ok(());
+        }
+
+        let table_partition_names = self
+            .table_partition_cols
+            .iter()
+            .map(|(col_name, _)| col_name.clone())
+            .collect_vec();
+
+        if inferred.len() < table_partition_names.len() {
+            return plan_err!(
+                "Inferred partitions to be {:?}, but got {:?}",
+                inferred,
+                table_partition_names
+            );
+        }
+
+        // match prefix to allow creating tables with partial partitions
+        for (idx, col) in table_partition_names.iter().enumerate() {
+            if &inferred[idx] != col {
+                return plan_err!(
+                    "Inferred partitions to be {:?}, but got {:?}",
+                    inferred,
+                    table_partition_names
+                );
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Infer the partitioning at the given path on the provided object store.
+    /// For performance reasons, it doesn't read all the files on disk
+    /// and therefore may fail to detect invalid partitioning.
+    async fn infer_partitions(
+        &self,
+        state: &SessionState,
+        table_path: &ListingTableUrl,
+    ) -> Result<Vec<String>> {
+        let store = state.runtime_env().object_store(table_path)?;
+
+        // only use 10 files for inference
+        // This can fail to detect inconsistent partition keys
+        // A DFS traversal approach of the store can help here
+        let files: Vec<_> = table_path
+            .list_all_files(state, store.as_ref(), &self.file_extension)
+            .await?
+            .take(10)
+            .try_collect()
+            .await?;
+
+        let stripped_path_parts = files.iter().map(|file| {
+            table_path
+                .strip_prefix(&file.location)
+                .unwrap()
+                .collect_vec()
+        });
+
+        let partition_keys = stripped_path_parts
+            .map(|path_parts| {
+                path_parts
+                    .into_iter()
+                    .rev()
+                    .skip(1) // get parents only; skip the file itself
+                    .rev()
+                    .map(|s| s.split('=').take(1).collect())

Review Comment:
   Is the partition column name containing a `=` a cheeky edge case we need to 
consider here?
   
   Edit: related 
https://github.com/apache/arrow-datafusion/issues/9269#issuecomment-2039365128



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to