MohamedAbdeen21 commented on code in PR #9912:
URL: https://github.com/apache/arrow-datafusion/pull/9912#discussion_r1553553054
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -438,6 +439,112 @@ impl ListingOptions {
self.format.infer_schema(state, &store, &files).await
}
+
+ /// Infers the partition columns stored in `LOCATION` and compares
+ /// them with the columns provided in `PARTITIONED BY` to help prevent
+ /// accidental corrupts of partitioned tables.
+ ///
+ /// Allows specifying partial partitions.
+ pub async fn validate_partitions(
+ &self,
+ state: &SessionState,
+ table_path: &ListingTableUrl,
+ ) -> Result<()> {
+ if self.table_partition_cols.is_empty() {
+ return Ok(());
+ }
+
+ if !table_path.is_collection() {
+ return plan_err!(
+ "Can't create a partitioned table backed by a single file, \
+ perhaps the URL is missing a trailing slash?"
+ );
+ }
+
+ let inferred = self.infer_partitions(state, table_path).await?;
+
+ // no partitioned files found on disk
+ if inferred.is_empty() {
+ return Ok(());
+ }
+
+ let table_partition_names = self
+ .table_partition_cols
+ .iter()
+ .map(|(col_name, _)| col_name.clone())
+ .collect_vec();
+
+ if inferred.len() < table_partition_names.len() {
+ return plan_err!(
+ "Inferred partitions to be {:?}, but got {:?}",
+ inferred,
+ table_partition_names
+ );
+ }
+
+ // match prefix to allow creating tables with partial partitions
+ for (idx, col) in table_partition_names.iter().enumerate() {
+ if &inferred[idx] != col {
+ return plan_err!(
+ "Inferred partitions to be {:?}, but got {:?}",
+ inferred,
+ table_partition_names
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Infer the partitioning at the given path on the provided object store.
+ /// For performance reasons, it doesn't read all the files on disk
+ /// and therefore may fail to detect invalid partitioning.
+ async fn infer_partitions(
+ &self,
+ state: &SessionState,
+ table_path: &ListingTableUrl,
+ ) -> Result<Vec<String>> {
+ let store = state.runtime_env().object_store(table_path)?;
+
+ // only use 10 files for inference
+ // This can fail to detect inconsistent partition keys
+ // A DFS traversal approach of the store can help here
+ let files: Vec<_> = table_path
+ .list_all_files(state, store.as_ref(), &self.file_extension)
+ .await?
+ .take(10)
+ .try_collect()
+ .await?;
+
+ let stripped_path_parts = files.iter().map(|file| {
+ table_path
+ .strip_prefix(&file.location)
+ .unwrap()
+ .collect_vec()
+ });
+
+ let partition_keys = stripped_path_parts
+ .map(|path_parts| {
+ path_parts
+ .into_iter()
+ .rev()
+ .skip(1) // get parents only; skip the file itself
+ .rev()
+ .map(|s| s.split('=').take(1).collect())
Review Comment:
We do need to consider this. Now that you mention it, I realize that I never
actually thought about how these column names are written to disk. I'll have to
take a look on how other systems do it before suggesting anything.
After looking at the linked issue, I think this should be addressed in a
follow-up since it may requiring changing other parts of DF first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]