Jefffrey commented on code in PR #17009:
URL: https://github.com/apache/datafusion/pull/17009#discussion_r2366203884


##########
datafusion/core/src/dataframe/parquet.rs:
##########
@@ -82,9 +83,19 @@ impl DataFrame {
                 .build()?
         };
 
+        let path = if file_type.get_ext() != DEFAULT_PARQUET_EXTENSION
+            && options.single_file_output
+        {
+            let mut path = path.to_owned();
+            path.push_str(SINGLE_FILE_EXTENSION);
+            path
+        } else {
+            path.to_owned()
+        };

Review Comment:
   This part confuses me; If I'm not wrong `file_type.get_ext() != 
DEFAULT_PARQUET_EXTENSION` will always be true, because:
   
   
https://github.com/apache/datafusion/blob/602475fbac6b5fb78c28406775f8792f9ba5d70f/datafusion/datasource-parquet/src/file_format.rs#L157-L162



##########
datafusion/core/src/execution/context/parquet.rs:
##########
@@ -216,6 +217,83 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn write_multiple_file_parquet_no_extensions() -> Result<()> {
+        let ctx = SessionContext::new();
+        let df = ctx.read_batch(RecordBatch::try_new(
+            Arc::new(Schema::new(vec![
+                Field::new("purchase_id", DataType::Int32, false),
+                Field::new("price", DataType::Float32, false),
+                Field::new("quantity", DataType::Int32, false),
+            ])),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
+                Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 
6.66])),
+                Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])),
+            ],
+        )?)?;
+
+        // Repartition to have the desired.
+        let partitioned_df = df.repartition(Partitioning::RoundRobinBatch(2))?;
+        let tmp_dir = tempdir()?;
+        let path = tmp_dir
+            .path()
+            .join("no_ext_parquet")
+            .to_str()
+            .unwrap()
+            .to_string();
+
+        let options = DataFrameWriteOptions::new();
+
+        partitioned_df.write_parquet(&path, options, None).await?;
+
+        let test_path = std::path::Path::new(&path);
+        assert!(
+            test_path.is_dir(),
+            "No extension and default DataFrameWriteOptons should have yielded 
a dir."
+        );

Review Comment:
   I think we would need to also check there are indeed multiple parquet files



##########
datafusion/core/src/execution/context/parquet.rs:
##########
@@ -216,6 +217,83 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn write_multiple_file_parquet_no_extensions() -> Result<()> {
+        let ctx = SessionContext::new();
+        let df = ctx.read_batch(RecordBatch::try_new(
+            Arc::new(Schema::new(vec![
+                Field::new("purchase_id", DataType::Int32, false),
+                Field::new("price", DataType::Float32, false),
+                Field::new("quantity", DataType::Int32, false),
+            ])),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
+                Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 
6.66])),
+                Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])),
+            ],
+        )?)?;
+
+        // Repartition to have the desired.
+        let partitioned_df = df.repartition(Partitioning::RoundRobinBatch(2))?;
+        let tmp_dir = tempdir()?;
+        let path = tmp_dir
+            .path()
+            .join("no_ext_parquet")
+            .to_str()
+            .unwrap()
+            .to_string();
+
+        let options = DataFrameWriteOptions::new();
+
+        partitioned_df.write_parquet(&path, options, None).await?;
+
+        let test_path = std::path::Path::new(&path);
+        assert!(
+            test_path.is_dir(),
+            "No extension and default DataFrameWriteOptons should have yielded 
a dir."
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn write_single_file_parquet_no_extensions() -> Result<()> {
+        let ctx = SessionContext::new();
+        let df = ctx.read_batch(RecordBatch::try_new(
+            Arc::new(Schema::new(vec![
+                Field::new("purchase_id", DataType::Int32, false),
+                Field::new("price", DataType::Float32, false),
+                Field::new("quantity", DataType::Int32, false),
+            ])),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
+                Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 
6.66])),
+                Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])),
+            ],
+        )?)?;
+        // Repartition to have
+        let partitioned_df = df.repartition(Partitioning::RoundRobinBatch(2))?;
+        let tmp_dir = tempdir()?;
+        let path = tmp_dir
+            .path()
+            .join("no_ext_parquet")
+            .to_str()
+            .unwrap()
+            .to_string();
+
+        let options = 
DataFrameWriteOptions::new().with_single_file_output(true);
+
+        partitioned_df.write_parquet(&path, options, None).await?;
+
+        let test_path = std::path::Path::new(&path);
+        assert!(
+            test_path.is_file(),
+            "No extension and 
DataFrameWriteOptons::with_single_file_output(true) should have yielded a 
single file."
+        );

Review Comment:
   Similarly here we would need to check that only one file is written, not 
just a file is written



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to