metesynnada commented on code in PR #5584:
URL: https://github.com/apache/arrow-datafusion/pull/5584#discussion_r1137479350
##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -707,26 +709,58 @@ impl ParquetFileReaderFactory for
DefaultParquetFileReaderFactory {
}
}
-/// Executes a query and writes the results to a partitioned Parquet file.
-pub async fn plan_to_parquet(
- task_ctx: Arc<TaskContext>,
- plan: Arc<dyn ExecutionPlan>,
+/// create target folder using different save mode strategy, refer to
[`FileWriterSaveMode`] for more details
+fn create_target_folder(
path: impl AsRef<str>,
- writer_properties: Option<WriterProperties>,
-) -> Result<()> {
+ save_mode: FileWriterSaveMode,
+) -> Result<Option<std::path::PathBuf>> {
let path = path.as_ref();
- // create directory to contain the Parquet files (one per partition)
let fs_path = std::path::Path::new(path);
+
+ if fs_path.exists() {
+ match save_mode {
+ FileWriterSaveMode::Overwrite => {
+ std::fs::remove_dir_all(fs_path)?;
+ }
+ FileWriterSaveMode::Ignore => {
+ return Ok(None);
+ }
+ FileWriterSaveMode::Append => {
+ return Ok(Some(fs_path.to_path_buf()));
+ }
+ _ => {}
+ };
+ }
+
if let Err(e) = fs::create_dir(fs_path) {
return Err(DataFusionError::Execution(format!(
"Could not create directory {path}: {e:?}"
)));
}
+ Ok(Some(fs_path.to_path_buf()))
+}
+
+/// Executes a query and writes the results to a partitioned Parquet file.
+pub async fn plan_to_parquet(
+ task_ctx: Arc<TaskContext>,
+ plan: Arc<dyn ExecutionPlan>,
+ path: impl AsRef<str>,
+ writer_properties: Option<WriterProperties>,
+ save_mode: FileWriterSaveMode,
+) -> Result<()> {
+ // create directory to contain the Parquet files (one per partition)
+ let fs_path = &create_target_folder(path, save_mode)?;
Review Comment:
There is an unnecessary copy/clone if the folder is not overwritten. We can
make this part without a clone with a different implementation.
##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -541,6 +541,23 @@ pub(crate) fn get_output_ordering(
}).unwrap_or_else(|| None)
}
+/// Defines strategies for the saving behavior in the case when the saved
folder already exists
+#[derive(Debug, Clone, Copy)]
+pub enum FileWriterSaveMode {
+ /// Overwrite mode means that when saving a parquet file, if folder
already exists, existing data is expected to be overwritten.
+ /// If folder does not exist, the folder will be created. This is default
value.
+ Overwrite,
+ /// Append mode means that when saving a parquet file, if folder already
exists, new data is expected to be appended on top of the existing.
+ /// If folder does not exist, the folder will be created
+ Append,
Review Comment:
As I understand, parquet files are not appended, we hold the directory and
write new files with a new uuid.
This approach is specific to the parquet file format and may not apply to
other file formats like CSV, where appending to an existing file is the
expected behavior. It's worth noting that this structure may be extended to
support other file formats in the future, so it's important to update the
comments accordingly.
##########
datafusion/core/src/dataframe.rs:
##########
@@ -930,10 +932,11 @@ impl DataFrame {
self,
path: &str,
writer_properties: Option<WriterProperties>,
+ save_mode: FileWriterSaveMode,
Review Comment:
Spark made the save mode optional, we can also default to `append` mode.
Also, we can merge the `WriterProperties` and `FileWriterSaveMode` structs like
in a `ParquetWriterOptions` struct since they are both configurations, this
would abstract the save mode from immediate usage and contribute the simplicity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]