alamb commented on code in PR #9912:
URL: https://github.com/apache/arrow-datafusion/pull/9912#discussion_r1548559994
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -438,6 +439,111 @@ impl ListingOptions {
self.format.infer_schema(state, &store, &files).await
}
+
+ /// Infers the partition columns stored in `LOCATION` and compares
+ /// them with the columns provided in `PARTITIONED BY` to help prevent
+ /// accidental corrupts of partitioned tables.
+ ///
+ /// Allows specifying partial partitions.
+ pub async fn validate_partitions(
+ &self,
+ state: &SessionState,
+ table_path: &ListingTableUrl,
+ ) -> Result<()> {
+ if self.table_partition_cols.is_empty() {
+ return Ok(());
+ }
+
+ if !table_path.is_collection() {
+ return plan_err!(
+ "Can't create a partitioned table backed by a single file, \
+ perhaps the URL is missing a trailing slash?"
+ );
+ }
+
+ let inferred = self.infer_partitions(state, table_path).await?;
+
+ // no partitioned files found on disk
+ if inferred.is_empty() {
+ return Ok(());
+ }
+
+ let table_partition_names = self
+ .table_partition_cols
+ .iter()
+ .map(|(col_name, _)| col_name.clone())
+ .collect_vec();
+
+ if inferred.len() < table_partition_names.len() {
+ return plan_err!(
+ "Inferred partitions to be {:?}, but got {:?}",
+ inferred,
+ table_partition_names
+ );
+ }
+
+ // match prefix to allow creating a tables using
+ // some of the partition keys
+ for (idx, col) in table_partition_names.iter().enumerate() {
+ if &inferred[idx] != col {
+ return plan_err!(
+ "Inferred partitions to be {:?}, but got {:?}",
+ inferred,
+ table_partition_names
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Infer the partitioning at the given path on the provided object store.
+ /// For performance reasons, it doesn't read all the files on disk
Review Comment:
💯
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -438,6 +439,111 @@ impl ListingOptions {
self.format.infer_schema(state, &store, &files).await
}
+
+ /// Infers the partition columns stored in `LOCATION` and compares
+ /// them with the columns provided in `PARTITIONED BY` to help prevent
+ /// accidental corrupts of partitioned tables.
+ ///
+ /// Allows specifying partial partitions.
+ pub async fn validate_partitions(
+ &self,
+ state: &SessionState,
+ table_path: &ListingTableUrl,
+ ) -> Result<()> {
+ if self.table_partition_cols.is_empty() {
+ return Ok(());
+ }
+
+ if !table_path.is_collection() {
+ return plan_err!(
+ "Can't create a partitioned table backed by a single file, \
+ perhaps the URL is missing a trailing slash?"
+ );
+ }
+
+ let inferred = self.infer_partitions(state, table_path).await?;
+
+ // no partitioned files found on disk
+ if inferred.is_empty() {
+ return Ok(());
+ }
+
+ let table_partition_names = self
+ .table_partition_cols
+ .iter()
+ .map(|(col_name, _)| col_name.clone())
+ .collect_vec();
+
+ if inferred.len() < table_partition_names.len() {
+ return plan_err!(
+ "Inferred partitions to be {:?}, but got {:?}",
+ inferred,
+ table_partition_names
+ );
+ }
+
+ // match prefix to allow creating a tables using
+ // some of the partition keys
+ for (idx, col) in table_partition_names.iter().enumerate() {
+ if &inferred[idx] != col {
+ return plan_err!(
+ "Inferred partitions to be {:?}, but got {:?}",
+ inferred,
+ table_partition_names
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Infer the partitioning at the given path on the provided object store.
+ /// For performance reasons, it doesn't read all the files on disk
+ /// and therefore may fail to detect invalid partitioning.
+ async fn infer_partitions(
+ &self,
+ state: &SessionState,
+ table_path: &ListingTableUrl,
+ ) -> Result<Vec<String>> {
+ let store = state.runtime_env().object_store(table_path)?;
+
+ // only use 10 files for inference
+ // This can fail to detect inconsistent partition keys
+ // A DFS traversal approach of the store can help here
+ let files: Vec<_> = table_path
+ .list_all_files(state, store.as_ref(), &self.file_extension)
+ .await?
+ .take(10)
+ .try_collect()
+ .await?;
+
+ let stripped_path_parts = files.iter().map(|file| {
+ table_path
+ .strip_prefix(&file.location)
+ .unwrap()
+ .collect_vec()
+ });
+
+ let partition_keys = stripped_path_parts
+ .map(|path_parts| {
+ path_parts
+ .into_iter()
+ .rev()
+ .skip(1) // get parents only; skip the file itself
+ .rev()
+ .map(|s| s.split('=').take(1).collect())
+ .collect_vec()
+ })
+ .collect_vec();
+
+ match partition_keys.into_iter().all_equal_value() {
+ Ok(v) => Ok(v),
+ Err(None) => Ok(vec![]),
+ Err(Some(diff)) => {
+ plan_err!("Found mixed partition values on disk {:?}", diff)
Review Comment:
Can you please add test coverage for this error condition as well (like
maybe it is looking for directories like `x=1` and `y=1`? Or maybe the other
directory should be ignored if it is not the specified partitioning (aka maybe
we shouldn't have this check)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]