kazantsev-maksim commented on code in PR #4670:
URL: https://github.com/apache/datafusion-comet/pull/4670#discussion_r3435122965


##########
spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala:
##########
@@ -35,8 +35,285 @@ import org.apache.comet.testing.{DataGenOptions, 
FuzzDataGenerator, SchemaGenOpt
 
 class CometParquetWriterSuite extends CometTestBase {
 
+  private val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
+
   import testImplicits._
 
+  test("simple partition parquet write") {
+    withSQLConf(
+      CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+      CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> 
"true") {
+      withTempPath { dir =>
+        val table = "test_write_partitions"
+        val outputPath = new File(dir, "output.parquet").getAbsolutePath
+        withTable(table) {
+          sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet")
+          sql(s"INSERT INTO $table VALUES(1, 'a')")
+          sql(s"INSERT INTO $table VALUES(2, 'b')")
+          sql(s"INSERT INTO $table VALUES(3, 'c')")
+          val plan = captureWritePlan(
+            path =>
+              sql(s"SELECT * FROM $table").write
+                .partitionBy("col1")
+                .mode(SaveMode.Overwrite)
+                .parquet(path),
+            outputPath)
+          assertHasCometNativeWriteExec(plan)
+          Seq((1, "a"), (2, "b"), (3, "c")).foreach { case (col1, col2) =>
+            val rows = spark.read
+              .parquet(s"$outputPath/col1=$col1")
+              .collect()
+            assert(rows.length == 1)
+            assert(rows.head.getAs[String]("col2") == col2)
+          }
+        }
+      }
+    }
+  }
+
+  test("default hive partition parquet write") {
+    withSQLConf(
+      CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+      CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> 
"true") {
+      withTempPath { dir =>
+        val table = "test_write_partitions"
+        val outputPath = new File(dir, "output.parquet").getAbsolutePath
+        withTable(table) {
+          sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet")
+          sql(s"INSERT INTO $table VALUES(null, 'a')")
+          sql(s"INSERT INTO $table VALUES(null, 'b')")
+          sql(s"INSERT INTO $table VALUES(null, 'c')")
+          val plan = captureWritePlan(
+            path =>
+              sql(s"SELECT * FROM $table").write
+                .partitionBy("col1")
+                .mode(SaveMode.Overwrite)
+                .parquet(path),
+            outputPath)
+          assertHasCometNativeWriteExec(plan)
+          val rows = spark.read
+            .parquet(s"$outputPath/col1=$DEFAULT_PARTITION_NAME")
+            .collect()
+          assert(rows.length == 3)
+          assert(rows.map(_.getAs[String]("col2")).toSeq.sorted == Seq("a", 
"b", "c").sorted)
+        }
+      }
+    }
+  }
+
+  test("multiple partition columns parquet write") {
+    withSQLConf(
+      CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+      CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> 
"true") {
+      withTempPath { dir =>
+        val table = "test_write_partitions"
+        val outputPath = new File(dir, "output.parquet").getAbsolutePath
+        withTable(table) {
+          sql(s"CREATE TABLE $table(col1 INT, col2 STRING, col3 STRING) USING 
parquet")
+          sql(s"INSERT INTO $table VALUES(1, 'x', 'a')")
+          sql(s"INSERT INTO $table VALUES(1, 'y', 'b')")
+          sql(s"INSERT INTO $table VALUES(2, 'x', 'c')")
+          val plan = captureWritePlan(
+            path =>
+              sql(s"SELECT * FROM $table").write
+                .partitionBy("col1", "col2")

Review Comment:
   Currently, some tests are failing because Spark has added a sort operator to 
the physical execution plan:
   
   ```
   Execute InsertIntoHadoopFsRelationCommand 
file:/Users/tendoo/Desktop/datafusion-comet/spark/target/tmp/spark-9c83c0b5-e1fc-47fd-8247-d76f4a3bce91/output.parquet,
 false, [col1#199], Parquet, [__partition_columns=["col1"], 
path=/Users/tendoo/Desktop/datafusion-comet/spark/target/tmp/spark-9c83c0b5-e1fc-47fd-8247-d76f4a3bce91/output.parquet],
 Overwrite, [col1, col2]                                                        
                                                                              
     +- WriteFiles
        +- *(1) Sort [col1#199 ASC NULLS FIRST], false, 0
           +- *(1) Project [empty2null(col1#196) AS col1#199, col2#197]
              +- CometNativeColumnarToRow
                 +- CometNativeScan parquet 
spark_catalog.default.test_write_partitions[col1#196,col2#197] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to