This is an automated email from the ASF dual-hosted git repository.

liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b644583539 [MINOR] Correct the native write check in 
VeloxParquetWriteSuite (#11391)
b644583539 is described below

commit b644583539c77e19eeb0477a6a25c6f1768101d3
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jan 30 17:54:59 2026 +0800

    [MINOR] Correct the native write check in VeloxParquetWriteSuite (#11391)
---
 .../execution/VeloxParquetWriteForHiveSuite.scala  | 71 +++++-----------------
 .../sql/execution/VeloxParquetWriteSuite.scala     | 52 ++++++++++------
 .../{BucketWriteUtils.scala => WriteUtils.scala}   | 33 +++++++++-
 3 files changed, 81 insertions(+), 75 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 60ea00be6a..2522e2898d 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config
@@ -30,7 +29,6 @@ import org.apache.spark.sql.classic.ClassicTypes._
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.util.QueryExecutionListener
 import org.apache.spark.util.Utils
 
 import org.apache.hadoop.fs.Path
@@ -39,10 +37,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile
 
 import java.io.File
 
-class VeloxParquetWriteForHiveSuite
-  extends GlutenQueryTest
-  with SQLTestUtils
-  with BucketWriteUtils {
+class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils 
with WriteUtils {
   private var _spark: SparkSession = _
   import testImplicits._
 
@@ -95,32 +90,6 @@ class VeloxParquetWriteForHiveSuite
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
   }
 
-  private def checkNativeWrite(sqlStr: String, checkNative: Boolean): Unit = {
-    var nativeUsed = false
-    val queryListener = new QueryExecutionListener {
-      override def onFailure(f: String, qe: QueryExecution, e: Exception): 
Unit = {}
-      override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
-        if (!nativeUsed) {
-          nativeUsed = if (isSparkVersionGE("3.4")) {
-            
qe.executedPlan.find(_.isInstanceOf[ColumnarWriteFilesExec]).isDefined
-          } else {
-            
qe.executedPlan.find(_.isInstanceOf[VeloxColumnarToCarrierRowExec]).isDefined
-          }
-        }
-      }
-    }
-    try {
-      spark.listenerManager.register(queryListener)
-      spark.sql(sqlStr)
-      spark.sparkContext.listenerBus.waitUntilEmpty()
-      if (checkNative) {
-        assert(nativeUsed)
-      }
-    } finally {
-      spark.listenerManager.unregister(queryListener)
-    }
-  }
-
   test("test hive static partition write table") {
     withTable("t") {
       spark.sql(
@@ -129,8 +98,7 @@ class VeloxParquetWriteForHiveSuite
       withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
         checkNativeWrite(
           "INSERT OVERWRITE TABLE t partition(c=1, d=2)" +
-            " SELECT 3 as e",
-          checkNative = true)
+            " SELECT 3 as e")
       }
       checkAnswer(spark.table("t"), Row(3, 1, 2))
       checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2")))
@@ -145,8 +113,7 @@ class VeloxParquetWriteForHiveSuite
       withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
         checkNativeWrite(
           "INSERT OVERWRITE TABLE t partition(c=1, d)" +
-            " SELECT 3 as e, 2 as d",
-          checkNative = true)
+            " SELECT 3 as e, 2 as d")
       }
       checkAnswer(spark.table("t"), Row(3, 1, 2))
       checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2")))
@@ -163,8 +130,7 @@ class VeloxParquetWriteForHiveSuite
           "INSERT OVERWRITE TABLE t partition(c=1, d)" +
             " SELECT 3 as e, 2 as d" +
             " UNION ALL" +
-            " SELECT 4 as e, 5 as d",
-          checkNative = true)
+            " SELECT 4 as e, 5 as d")
       }
       checkAnswer(spark.table("t"), Seq(Row(3, 1, 2), Row(4, 1, 5)))
       checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2"), 
Row("c=1/d=5")))
@@ -185,8 +151,7 @@ class VeloxParquetWriteForHiveSuite
             " UNION ALL" +
             " SELECT 6 as e, 2 as d, 7 as f" + // Partition 0.
             " UNION ALL" +
-            " SELECT 8 as e, 5 as d, 7 as f", // Partition 2.
-          checkNative = true
+            " SELECT 8 as e, 5 as d, 7 as f" // Partition 2.
         )
       }
       checkAnswer(
@@ -213,9 +178,7 @@ class VeloxParquetWriteForHiveSuite
             " (3, 2, 7)," +
             " (4, 5, 9)," +
             " (6, 2, 7)," +
-            " (8, 5, 7)",
-          checkNative = true
-        )
+            " (8, 5, 7)")
       }
       checkAnswer(
         spark.table("t"),
@@ -230,7 +193,7 @@ class VeloxParquetWriteForHiveSuite
     withTable("t") {
       spark.sql("CREATE TABLE t (c int) STORED AS PARQUET")
       withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
-        checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative 
= true)
+        checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c")
       }
       checkAnswer(spark.table("t"), Row(1))
     }
@@ -246,15 +209,13 @@ class VeloxParquetWriteForHiveSuite
               s"""
                  |INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS 
PARQUET SELECT 1 as c
                  |""".stripMargin,
-              checkNative = false
+              expectNative = false
             )
           } else {
             checkNativeWrite(
               s"""
                  |INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS 
PARQUET SELECT 1 as c
-                 |""".stripMargin,
-              checkNative = true
-            )
+                 |""".stripMargin)
           }
           checkAnswer(spark.read.parquet(f.getCanonicalPath), Row(1))
         }
@@ -271,7 +232,7 @@ class VeloxParquetWriteForHiveSuite
   test("native writer support CreateHiveTableAsSelectCommand") {
     withTable("t") {
       withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
-        checkNativeWrite("CREATE TABLE t STORED AS PARQUET AS SELECT 1 as c", 
checkNative = true)
+        checkNativeWrite("CREATE TABLE t STORED AS PARQUET AS SELECT 1 as c")
       }
       checkAnswer(spark.table("t"), Row(1))
     }
@@ -288,7 +249,7 @@ class VeloxParquetWriteForHiveSuite
               checkNativeWrite(
                 "CREATE TABLE t STORED AS PARQUET TBLPROPERTIES 
('parquet.compression'='zstd') " +
                   "AS SELECT 1 as c",
-                checkNative = enableNativeWrite)
+                expectNative = enableNativeWrite)
               val tableDir = new 
Path(s"${conf.getConf(StaticSQLConf.WAREHOUSE_PATH)}/t")
               val configuration = spark.sessionState.newHadoopConf()
               val files = tableDir
@@ -329,7 +290,7 @@ class VeloxParquetWriteForHiveSuite
               df.write.mode(SaveMode.Overwrite).saveAsTable(source)
 
               withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
-                checkNativeWrite(s"INSERT INTO $target SELECT * FROM $source", 
checkNative = true)
+                checkNativeWrite(s"INSERT INTO $target SELECT * FROM $source")
               }
 
               for (k <- 0 until 5) {
@@ -369,9 +330,7 @@ class VeloxParquetWriteForHiveSuite
               df.write.mode(SaveMode.Overwrite).saveAsTable(source)
 
               // hive relation convert always use dynamic, so it will offload 
to native.
-              checkNativeWrite(
-                s"INSERT INTO $target PARTITION(k='0') SELECT i, j FROM 
$source",
-                checkNative = true)
+              checkNativeWrite(s"INSERT INTO $target PARTITION(k='0') SELECT 
i, j FROM $source")
               val files = tableDir(target)
                 .listFiles()
                 .filterNot(f => f.getName.startsWith(".") || 
f.getName.startsWith("_"))
@@ -401,7 +360,7 @@ class VeloxParquetWriteForHiveSuite
                 (0 until 50).map(i => (i % 13, i.toString)).toDF("i", "j")
               df.write.mode(SaveMode.Overwrite).saveAsTable(source)
 
-              checkNativeWrite(s"INSERT INTO $target SELECT i, j FROM 
$source", checkNative = true)
+              checkNativeWrite(s"INSERT INTO $target SELECT i, j FROM $source")
 
               checkAnswer(spark.table(target), df)
             }
@@ -427,7 +386,7 @@ class VeloxParquetWriteForHiveSuite
               checkNativeWrite(
                 "CREATE TABLE t STORED AS PARQUET TBLPROPERTIES 
('parquet.compression'='zstd') " +
                   "AS SELECT 1 as c",
-                checkNative = enableNativeWrite)
+                expectNative = enableNativeWrite)
               val tableDir = new 
Path(s"${conf.getConf(StaticSQLConf.WAREHOUSE_PATH)}/t")
               val configuration = spark.sessionState.newHadoopConf()
               val files = tableDir
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index 4bc20142a1..6c61afb962 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -18,17 +18,16 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.VeloxWholeStageTransformerSuite
-import org.apache.gluten.test.FallbackUtil
 
 import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
 import org.apache.spark.util.Utils
 
 import org.apache.hadoop.fs.Path
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.hadoop.util.HadoopInputFile
-import org.junit.Assert
 
-class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
+class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with 
WriteUtils {
   override protected val resourcePath: String = "/tpch-data-parquet"
   override protected val fileFormat: String = "parquet"
 
@@ -74,13 +73,9 @@ class VeloxParquetWriteSuite extends 
VeloxWholeStageTransformerSuite {
     withTempPath {
       f =>
         val path = f.getCanonicalPath
-        val testAppender = new LogAppender("native write tracker")
-        withLogAppender(testAppender) {
-          spark.sql("select array(struct(1), null) as 
var1").write.mode("overwrite").save(path)
-        }
-        assert(
-          !testAppender.loggingEvents.exists(
-            _.getMessage.toString.contains("Use Gluten parquet write for 
hive")))
+        checkNativeWrite(
+          s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET SELECT 
array(struct(1), null) as var1",
+          expectNative = false)
     }
   }
 
@@ -128,18 +123,27 @@ class VeloxParquetWriteSuite extends 
VeloxWholeStageTransformerSuite {
       }
   }
 
+  test("test insert into") {
+    withTable("t") {
+      spark.sql("CREATE TABLE t (id INT) USING PARQUET")
+      checkNativeWrite("INSERT INTO t VALUES 1")
+      checkAnswer(spark.sql("SELECT * FROM t"), Row(1))
+    }
+  }
+
   test("test ctas") {
     withTable("velox_ctas") {
       spark
         .range(100)
         .toDF("id")
         .createOrReplaceTempView("ctas_temp")
-      val df = spark.sql("CREATE TABLE velox_ctas USING PARQUET AS SELECT * 
FROM ctas_temp")
-      
Assert.assertTrue(FallbackUtil.hasFallback(df.queryExecution.executedPlan))
+      checkNativeWrite(
+        "CREATE TABLE velox_ctas USING PARQUET AS SELECT * FROM ctas_temp",
+        expectNative = isSparkVersionGE("3.4"))
     }
   }
 
-  test("test parquet dynamic partition write") {
+  test("test insert overwrite dir") {
     withTempPath {
       f =>
         val path = f.getCanonicalPath
@@ -147,8 +151,20 @@ class VeloxParquetWriteSuite extends 
VeloxWholeStageTransformerSuite {
           .range(100)
           .selectExpr("id as c1", "id % 7 as p")
           .createOrReplaceTempView("temp")
-        val df = spark.sql(s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET 
SELECT * FROM temp")
-        
Assert.assertTrue(FallbackUtil.hasFallback(df.queryExecution.executedPlan))
+        checkNativeWrite(s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET 
SELECT * FROM temp")
+    }
+  }
+
+  test("test dynamic and static partition write table") {
+    withTable("t") {
+      spark.sql(
+        "CREATE TABLE t (c int, d long, e long)" +
+          " USING PARQUET partitioned by (c, d)")
+      checkNativeWrite(
+        "INSERT OVERWRITE TABLE t partition(c=1, d)" +
+          " SELECT 3 as e, 2 as d")
+      checkAnswer(spark.table("t"), Row(3, 1, 2))
+      checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2")))
     }
   }
 
@@ -158,10 +174,10 @@ class VeloxParquetWriteSuite extends 
VeloxWholeStageTransformerSuite {
         .range(100)
         .selectExpr("id as c1", "id % 7 as p")
         .createOrReplaceTempView("bucket_temp")
-      val df = spark.sql(
+      checkNativeWrite(
         "CREATE TABLE bucket USING PARQUET CLUSTERED BY (p) INTO 7 BUCKETS " +
-          "AS SELECT * FROM bucket_temp")
-      
Assert.assertTrue(FallbackUtil.hasFallback(df.queryExecution.executedPlan))
+          "AS SELECT * FROM bucket_temp",
+        expectNative = false)
     }
   }
 
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
 b/backends-velox/src/test/scala/org/apache/spark/sql/execution/WriteUtils.scala
similarity index 77%
rename from 
backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
rename to 
backends-velox/src/test/scala/org/apache/spark/sql/execution/WriteUtils.scala
index a9fe8269e9..416ce41f57 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/WriteUtils.scala
@@ -16,20 +16,51 @@
  */
 package org.apache.spark.sql.execution
 
+import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec
+
 import org.apache.spark.sql.{DataFrame, GlutenQueryTest}
 import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, Expression, 
HiveHash, Literal, Pmod, UnsafeProjection}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.util.QueryExecutionListener
 
 import java.io.File
 
-trait BucketWriteUtils extends GlutenQueryTest with SQLTestUtils {
+trait WriteUtils extends GlutenQueryTest with SQLTestUtils {
 
   def tableDir(table: String): File = {
     val identifier = spark.sessionState.sqlParser.parseTableIdentifier(table)
     new File(spark.sessionState.catalog.defaultTablePath(identifier))
   }
 
+  def checkNativeWrite(sqlStr: String, expectNative: Boolean = true): Unit = {
+    var nativeUsed = false
+    val queryListener = new QueryExecutionListener {
+      override def onFailure(f: String, qe: QueryExecution, e: Exception): 
Unit = {}
+      override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
+        if (!nativeUsed) {
+          nativeUsed = if (isSparkVersionGE("3.4")) {
+            qe.executedPlan.exists(_.isInstanceOf[ColumnarWriteFilesExec])
+          } else {
+            
qe.executedPlan.exists(_.isInstanceOf[VeloxColumnarToCarrierRowExec])
+          }
+        }
+      }
+    }
+    try {
+      spark.listenerManager.register(queryListener)
+      spark.sql(sqlStr)
+      spark.sparkContext.listenerBus.waitUntilEmpty()
+      if (expectNative) {
+        assert(nativeUsed)
+      } else {
+        assert(!nativeUsed)
+      }
+    } finally {
+      spark.listenerManager.unregister(queryListener)
+    }
+  }
+
   protected def testBucketing(
       dataDir: File,
       source: String = "parquet",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to