andygrove commented on code in PR #3531:
URL: https://github.com/apache/datafusion-comet/pull/3531#discussion_r2940647823


##########
spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala:
##########
@@ -569,6 +569,176 @@ class CometParquetWriterSuite extends CometTestBase {
     }
   }
 
+  // Tests for issue #2957: INSERT OVERWRITE DIRECTORY with native writer
+  // Note: INSERT OVERWRITE DIRECTORY uses InsertIntoDataSourceDirCommand 
which internally
+  // executes InsertIntoHadoopFsRelationCommand. The outer plan shows 
ExecutedCommandExec,
+  // but the actual write happens in an internal execution which should use 
Comet's native writer.
+  //
+  // Fix: CometExecRule now auto-converts children to Arrow format when native 
write is requested.
+  // This enables native writes even when the source is a Spark operator like 
RangeExec.
+  test("INSERT OVERWRITE DIRECTORY using parquet - basic with RangeExec 
source") {
+    withTempPath { dir =>
+      val outputPath = dir.getAbsolutePath
+
+      withSQLConf(
+        CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+        
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> 
"true",
+        CometConf.COMET_EXEC_ENABLED.key -> "true") {
+        // Note: COMET_SPARK_TO_ARROW_ENABLED is NOT needed - the fix 
auto-converts RangeExec
+
+        // Create source table using RangeExec
+        spark.range(1, 10).toDF("id").createOrReplaceTempView("source_table")
+
+        // Execute INSERT OVERWRITE DIRECTORY
+        spark.sql(s"""
+          INSERT OVERWRITE DIRECTORY '$outputPath'
+          USING PARQUET
+          SELECT id FROM source_table
+        """)
+
+        // Verify data was written correctly
+        val result = spark.read.parquet(outputPath)
+        assert(result.count() == 9, "INSERT OVERWRITE DIRECTORY should write 9 
rows")
+      }
+    }
+  }
+
+  // Test with Parquet source file (native scan) - this should use native 
writer
+  test("INSERT OVERWRITE DIRECTORY using parquet - with parquet source") {
+    withTempPath { srcDir =>
+      withTempPath { outDir =>
+        val srcPath = srcDir.getAbsolutePath
+        val outputPath = outDir.getAbsolutePath
+
+        // Create source parquet file
+        spark.range(1, 10).toDF("id").write.parquet(srcPath)
+
+        withSQLConf(
+          CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+          
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> 
"true",
+          CometConf.COMET_EXEC_ENABLED.key -> "true",
+          CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true") {
+
+          // Create table from parquet file
+          spark.read.parquet(srcPath).createOrReplaceTempView("parquet_source")
+
+          // Execute INSERT OVERWRITE DIRECTORY
+          spark.sql(s"""
+            INSERT OVERWRITE DIRECTORY '$outputPath'
+            USING PARQUET
+            SELECT id FROM parquet_source
+          """)
+
+          // Verify data was written correctly
+          val result = spark.read.parquet(outputPath)
+          assert(result.count() == 9, "INSERT OVERWRITE DIRECTORY should write 
9 rows")
+        }
+      }
+    }
+  }
+
+  test("INSERT OVERWRITE DIRECTORY using parquet with repartition hint") {
+    withTempPath { dir =>
+      val outputPath = dir.getAbsolutePath
+
+      withSQLConf(
+        CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+        
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> 
"true",
+        CometConf.COMET_EXEC_ENABLED.key -> "true") {
+
+        // Create source table
+        spark.range(1, 100).toDF("value").createOrReplaceTempView("df")
+
+        // Execute INSERT OVERWRITE DIRECTORY with REPARTITION hint (as in 
issue #2957)
+        val df = spark.sql(s"""
+          INSERT OVERWRITE DIRECTORY '$outputPath'
+          USING PARQUET
+          SELECT /*+ REPARTITION(3) */ value FROM df
+        """)
+
+        // Verify data was written correctly
+        val result = spark.read.parquet(outputPath)
+        assert(result.count() == 99)
+
+        // Check if native write was used
+        val plan = df.queryExecution.executedPlan
+        val hasNativeWrite = plan.collect { case _: CometNativeWriteExec => 
true }.nonEmpty
+        if (!hasNativeWrite) {
+          logWarning(s"Native write not used for INSERT OVERWRITE 
DIRECTORY:\n${plan.treeString}")
+        }

Review Comment:
   Should this be an assertion rather than a warning?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to