This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 75fcad5ae [VL] Improve checkNativeWrite logic in 
VeloxParquetWriteForHiveSuite (#5496)
75fcad5ae is described below

commit 75fcad5aeedff6dce77d4759aed1d22332b2a72d
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Apr 25 16:14:10 2024 +0800

    [VL] Improve checkNativeWrite logic in VeloxParquetWriteForHiveSuite (#5496)
---
 .../execution/VeloxParquetWriteForHiveSuite.scala  | 83 +++++++++-------------
 1 file changed, 35 insertions(+), 48 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index bb338d530..9597e3110 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -16,17 +16,17 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.gluten.sql.shims.SparkShimLoader
-
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.{GlutenQueryTest, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
 import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.util.QueryExecutionListener
 
 class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
   private var _spark: SparkSession = null
@@ -79,24 +79,30 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest 
with SQLTestUtils {
       .set("spark.gluten.sql.native.writer.enabled", "true")
   }
 
-  private def checkNativeWrite(sqlStr: String, native: Boolean): Unit = {
-    val testAppender = new LogAppender("native write tracker")
-    withLogAppender(testAppender) {
-      spark.sql(sqlStr)
+  private def checkNativeWrite(sqlStr: String, checkNative: Boolean): Unit = {
+    var nativeUsed = false
+    val queryListener = new QueryExecutionListener {
+      override def onFailure(f: String, qe: QueryExecution, e: Exception): 
Unit = {}
+      override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
+        if (!nativeUsed) {
+          nativeUsed = if (isSparkVersionGE("3.4")) {
+            
qe.executedPlan.find(_.isInstanceOf[VeloxColumnarWriteFilesExec]).isDefined
+          } else {
+            qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined
+          }
+        }
+      }
     }
-    assert(
-      testAppender.loggingEvents.exists(
-        _.getMessage.toString.contains("Use Gluten parquet write for hive")) 
== native)
-  }
-
-  private def checkNativeStaticPartitionWrite(sqlStr: String, native: 
Boolean): Unit = {
-    val testAppender = new LogAppender("native write tracker")
-    withLogAppender(testAppender) {
+    try {
+      spark.listenerManager.register(queryListener)
       spark.sql(sqlStr)
+      spark.sparkContext.listenerBus.waitUntilEmpty()
+      if (checkNative) {
+        assert(nativeUsed)
+      }
+    } finally {
+      spark.listenerManager.unregister(queryListener)
     }
-    assert(
-      testAppender.loggingEvents.exists(
-        _.getMessage.toString.contains("Use Gluten partition write for hive")) 
== native)
   }
 
   test("test hive static partition write table") {
@@ -105,21 +111,10 @@ class VeloxParquetWriteForHiveSuite extends 
GlutenQueryTest with SQLTestUtils {
         "CREATE TABLE t (c int, d long, e long)" +
           " STORED AS PARQUET partitioned by (c, d)")
       withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
-        if (
-          SparkShimLoader.getSparkVersion.startsWith("3.4") ||
-          SparkShimLoader.getSparkVersion.startsWith("3.5")
-        ) {
-          checkNativeStaticPartitionWrite(
-            "INSERT OVERWRITE TABLE t partition(c=1, d=2)" +
-              " SELECT 3 as e",
-            native = false)
-        } else {
-          checkNativeStaticPartitionWrite(
-            "INSERT OVERWRITE TABLE t partition(c=1, d=2)" +
-              " SELECT 3 as e",
-            native = true)
-        }
-
+        checkNativeWrite(
+          "INSERT OVERWRITE TABLE t partition(c=1, d=2)" +
+            " SELECT 3 as e",
+          checkNative = true)
       }
       checkAnswer(spark.table("t"), Row(3, 1, 2))
     }
@@ -131,10 +126,10 @@ class VeloxParquetWriteForHiveSuite extends 
GlutenQueryTest with SQLTestUtils {
         "CREATE TABLE t (c int, d long, e long)" +
           " STORED AS PARQUET partitioned by (c, d)")
       withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
-        checkNativeStaticPartitionWrite(
+        checkNativeWrite(
           "INSERT OVERWRITE TABLE t partition(c=1, d)" +
             " SELECT 3 as e, 2 as e",
-          native = false)
+          checkNative = false)
       }
       checkAnswer(spark.table("t"), Row(3, 1, 2))
     }
@@ -144,15 +139,11 @@ class VeloxParquetWriteForHiveSuite extends 
GlutenQueryTest with SQLTestUtils {
     withTable("t") {
       spark.sql("CREATE TABLE t (c int) STORED AS PARQUET")
       withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
-        if (
-          SparkShimLoader.getSparkVersion.startsWith("3.4") ||
-          SparkShimLoader.getSparkVersion.startsWith("3.5")
-        ) {
-          checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", native = 
false)
+        if (isSparkVersionGE("3.4")) {
+          checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", 
checkNative = false)
         } else {
-          checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", native = 
true)
+          checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", 
checkNative = true)
         }
-
       }
       checkAnswer(spark.table("t"), Row(1))
     }
@@ -163,25 +154,21 @@ class VeloxParquetWriteForHiveSuite extends 
GlutenQueryTest with SQLTestUtils {
       f =>
         // compatible with Spark3.3 and later
         withSQLConf("spark.sql.hive.convertMetastoreInsertDir" -> "false") {
-          if (
-            SparkShimLoader.getSparkVersion.startsWith("3.4") ||
-            SparkShimLoader.getSparkVersion.startsWith("3.5")
-          ) {
+          if (isSparkVersionGE("3.4")) {
             checkNativeWrite(
               s"""
                  |INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS 
PARQUET SELECT 1 as c
                  |""".stripMargin,
-              native = false
+              checkNative = false
             )
           } else {
             checkNativeWrite(
               s"""
                  |INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS 
PARQUET SELECT 1 as c
                  |""".stripMargin,
-              native = true
+              checkNative = true
             )
           }
-
           checkAnswer(spark.read.parquet(f.getCanonicalPath), Row(1))
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to