metesynnada commented on code in PR #5584:
URL: https://github.com/apache/arrow-datafusion/pull/5584#discussion_r1137479350


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -707,26 +709,58 @@ impl ParquetFileReaderFactory for 
DefaultParquetFileReaderFactory {
     }
 }
 
-/// Executes a query and writes the results to a partitioned Parquet file.
-pub async fn plan_to_parquet(
-    task_ctx: Arc<TaskContext>,
-    plan: Arc<dyn ExecutionPlan>,
+/// create target folder using different save mode strategy, refer to 
[`FileWriterSaveMode`] for more details
+fn create_target_folder(
     path: impl AsRef<str>,
-    writer_properties: Option<WriterProperties>,
-) -> Result<()> {
+    save_mode: FileWriterSaveMode,
+) -> Result<Option<std::path::PathBuf>> {
     let path = path.as_ref();
-    // create directory to contain the Parquet files (one per partition)
     let fs_path = std::path::Path::new(path);
+
+    if fs_path.exists() {
+        match save_mode {
+            FileWriterSaveMode::Overwrite => {
+                std::fs::remove_dir_all(fs_path)?;
+            }
+            FileWriterSaveMode::Ignore => {
+                return Ok(None);
+            }
+            FileWriterSaveMode::Append => {
+                return Ok(Some(fs_path.to_path_buf()));
+            }
+            _ => {}
+        };
+    }
+
     if let Err(e) = fs::create_dir(fs_path) {
         return Err(DataFusionError::Execution(format!(
             "Could not create directory {path}: {e:?}"
         )));
     }
 
+    Ok(Some(fs_path.to_path_buf()))
+}
+
+/// Executes a query and writes the results to a partitioned Parquet file.
+pub async fn plan_to_parquet(
+    task_ctx: Arc<TaskContext>,
+    plan: Arc<dyn ExecutionPlan>,
+    path: impl AsRef<str>,
+    writer_properties: Option<WriterProperties>,
+    save_mode: FileWriterSaveMode,
+) -> Result<()> {
+    // create directory to contain the Parquet files (one per partition)
+    let fs_path = &create_target_folder(path, save_mode)?;

Review Comment:
   There is an unnecessary copy/clone if the folder is not overwritten. We can 
make this part without a clone with a different implementation.



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -541,6 +541,23 @@ pub(crate) fn get_output_ordering(
         }).unwrap_or_else(|| None)
 }
 
+/// Defines strategies for the saving behavior in the case when the saved 
folder already exists
+#[derive(Debug, Clone, Copy)]
+pub enum FileWriterSaveMode {
+    /// Overwrite mode means that when saving a parquet file, if folder 
already exists, existing data is expected to be overwritten.
+    /// If folder does not exist, the folder will be created. This is default 
value.
+    Overwrite,
+    /// Append mode means that when saving a parquet file, if folder already 
exists, new data is expected to be appended on top of the existing.
+    /// If folder does not exist, the folder will be created
+    Append,

Review Comment:
   As I understand, parquet files are not appended, we hold the directory and 
write new files with a new uuid. 
   
   This approach is specific to the parquet file format and may not apply to 
other file formats like CSV, where appending to an existing file is the 
expected behavior. It's worth noting that this structure may be extended to 
support other file formats in the future, so it's important to update the 
comments accordingly.
   



##########
datafusion/core/src/dataframe.rs:
##########
@@ -930,10 +932,11 @@ impl DataFrame {
         self,
         path: &str,
         writer_properties: Option<WriterProperties>,
+        save_mode: FileWriterSaveMode,

Review Comment:
   Spark made the save mode optional, we can also default to `append` mode. 
Also, we can merge the `WriterProperties` and `FileWriterSaveMode` structs like 
in a `ParquetWriterOptions` struct since they are both configurations, this 
would abstract the save mode from immediate usage and contribute the simplicity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to