This is an automated email from the ASF dual-hosted git repository.
liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b644583539 [MINOR] Correct the native write check in
VeloxParquetWriteSuite (#11391)
b644583539 is described below
commit b644583539c77e19eeb0477a6a25c6f1768101d3
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jan 30 17:54:59 2026 +0800
[MINOR] Correct the native write check in VeloxParquetWriteSuite (#11391)
---
.../execution/VeloxParquetWriteForHiveSuite.scala | 71 +++++-----------------
.../sql/execution/VeloxParquetWriteSuite.scala | 52 ++++++++++------
.../{BucketWriteUtils.scala => WriteUtils.scala} | 33 +++++++++-
3 files changed, 81 insertions(+), 75 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 60ea00be6a..2522e2898d 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
@@ -30,7 +29,6 @@ import org.apache.spark.sql.classic.ClassicTypes._
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.util.Utils
import org.apache.hadoop.fs.Path
@@ -39,10 +37,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile
import java.io.File
-class VeloxParquetWriteForHiveSuite
- extends GlutenQueryTest
- with SQLTestUtils
- with BucketWriteUtils {
+class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils
with WriteUtils {
private var _spark: SparkSession = _
import testImplicits._
@@ -95,32 +90,6 @@ class VeloxParquetWriteForHiveSuite
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
}
- private def checkNativeWrite(sqlStr: String, checkNative: Boolean): Unit = {
- var nativeUsed = false
- val queryListener = new QueryExecutionListener {
- override def onFailure(f: String, qe: QueryExecution, e: Exception):
Unit = {}
- override def onSuccess(funcName: String, qe: QueryExecution, duration:
Long): Unit = {
- if (!nativeUsed) {
- nativeUsed = if (isSparkVersionGE("3.4")) {
-
qe.executedPlan.find(_.isInstanceOf[ColumnarWriteFilesExec]).isDefined
- } else {
-
qe.executedPlan.find(_.isInstanceOf[VeloxColumnarToCarrierRowExec]).isDefined
- }
- }
- }
- }
- try {
- spark.listenerManager.register(queryListener)
- spark.sql(sqlStr)
- spark.sparkContext.listenerBus.waitUntilEmpty()
- if (checkNative) {
- assert(nativeUsed)
- }
- } finally {
- spark.listenerManager.unregister(queryListener)
- }
- }
-
test("test hive static partition write table") {
withTable("t") {
spark.sql(
@@ -129,8 +98,7 @@ class VeloxParquetWriteForHiveSuite
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
checkNativeWrite(
"INSERT OVERWRITE TABLE t partition(c=1, d=2)" +
- " SELECT 3 as e",
- checkNative = true)
+ " SELECT 3 as e")
}
checkAnswer(spark.table("t"), Row(3, 1, 2))
checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2")))
@@ -145,8 +113,7 @@ class VeloxParquetWriteForHiveSuite
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
checkNativeWrite(
"INSERT OVERWRITE TABLE t partition(c=1, d)" +
- " SELECT 3 as e, 2 as d",
- checkNative = true)
+ " SELECT 3 as e, 2 as d")
}
checkAnswer(spark.table("t"), Row(3, 1, 2))
checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2")))
@@ -163,8 +130,7 @@ class VeloxParquetWriteForHiveSuite
"INSERT OVERWRITE TABLE t partition(c=1, d)" +
" SELECT 3 as e, 2 as d" +
" UNION ALL" +
- " SELECT 4 as e, 5 as d",
- checkNative = true)
+ " SELECT 4 as e, 5 as d")
}
checkAnswer(spark.table("t"), Seq(Row(3, 1, 2), Row(4, 1, 5)))
checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2"),
Row("c=1/d=5")))
@@ -185,8 +151,7 @@ class VeloxParquetWriteForHiveSuite
" UNION ALL" +
" SELECT 6 as e, 2 as d, 7 as f" + // Partition 0.
" UNION ALL" +
- " SELECT 8 as e, 5 as d, 7 as f", // Partition 2.
- checkNative = true
+ " SELECT 8 as e, 5 as d, 7 as f" // Partition 2.
)
}
checkAnswer(
@@ -213,9 +178,7 @@ class VeloxParquetWriteForHiveSuite
" (3, 2, 7)," +
" (4, 5, 9)," +
" (6, 2, 7)," +
- " (8, 5, 7)",
- checkNative = true
- )
+ " (8, 5, 7)")
}
checkAnswer(
spark.table("t"),
@@ -230,7 +193,7 @@ class VeloxParquetWriteForHiveSuite
withTable("t") {
spark.sql("CREATE TABLE t (c int) STORED AS PARQUET")
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
- checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative
= true)
+ checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c")
}
checkAnswer(spark.table("t"), Row(1))
}
@@ -246,15 +209,13 @@ class VeloxParquetWriteForHiveSuite
s"""
|INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS
PARQUET SELECT 1 as c
|""".stripMargin,
- checkNative = false
+ expectNative = false
)
} else {
checkNativeWrite(
s"""
|INSERT OVERWRITE DIRECTORY '${f.getCanonicalPath}' STORED AS
PARQUET SELECT 1 as c
- |""".stripMargin,
- checkNative = true
- )
+ |""".stripMargin)
}
checkAnswer(spark.read.parquet(f.getCanonicalPath), Row(1))
}
@@ -271,7 +232,7 @@ class VeloxParquetWriteForHiveSuite
test("native writer support CreateHiveTableAsSelectCommand") {
withTable("t") {
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
- checkNativeWrite("CREATE TABLE t STORED AS PARQUET AS SELECT 1 as c",
checkNative = true)
+ checkNativeWrite("CREATE TABLE t STORED AS PARQUET AS SELECT 1 as c")
}
checkAnswer(spark.table("t"), Row(1))
}
@@ -288,7 +249,7 @@ class VeloxParquetWriteForHiveSuite
checkNativeWrite(
"CREATE TABLE t STORED AS PARQUET TBLPROPERTIES
('parquet.compression'='zstd') " +
"AS SELECT 1 as c",
- checkNative = enableNativeWrite)
+ expectNative = enableNativeWrite)
val tableDir = new
Path(s"${conf.getConf(StaticSQLConf.WAREHOUSE_PATH)}/t")
val configuration = spark.sessionState.newHadoopConf()
val files = tableDir
@@ -329,7 +290,7 @@ class VeloxParquetWriteForHiveSuite
df.write.mode(SaveMode.Overwrite).saveAsTable(source)
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- checkNativeWrite(s"INSERT INTO $target SELECT * FROM $source",
checkNative = true)
+ checkNativeWrite(s"INSERT INTO $target SELECT * FROM $source")
}
for (k <- 0 until 5) {
@@ -369,9 +330,7 @@ class VeloxParquetWriteForHiveSuite
df.write.mode(SaveMode.Overwrite).saveAsTable(source)
// hive relation convert always use dynamic, so it will offload
to native.
- checkNativeWrite(
- s"INSERT INTO $target PARTITION(k='0') SELECT i, j FROM
$source",
- checkNative = true)
+ checkNativeWrite(s"INSERT INTO $target PARTITION(k='0') SELECT
i, j FROM $source")
val files = tableDir(target)
.listFiles()
.filterNot(f => f.getName.startsWith(".") ||
f.getName.startsWith("_"))
@@ -401,7 +360,7 @@ class VeloxParquetWriteForHiveSuite
(0 until 50).map(i => (i % 13, i.toString)).toDF("i", "j")
df.write.mode(SaveMode.Overwrite).saveAsTable(source)
- checkNativeWrite(s"INSERT INTO $target SELECT i, j FROM
$source", checkNative = true)
+ checkNativeWrite(s"INSERT INTO $target SELECT i, j FROM $source")
checkAnswer(spark.table(target), df)
}
@@ -427,7 +386,7 @@ class VeloxParquetWriteForHiveSuite
checkNativeWrite(
"CREATE TABLE t STORED AS PARQUET TBLPROPERTIES
('parquet.compression'='zstd') " +
"AS SELECT 1 as c",
- checkNative = enableNativeWrite)
+ expectNative = enableNativeWrite)
val tableDir = new
Path(s"${conf.getConf(StaticSQLConf.WAREHOUSE_PATH)}/t")
val configuration = spark.sessionState.newHadoopConf()
val files = tableDir
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index 4bc20142a1..6c61afb962 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -18,17 +18,16 @@ package org.apache.spark.sql.execution
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.VeloxWholeStageTransformerSuite
-import org.apache.gluten.test.FallbackUtil
import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
import org.apache.spark.util.Utils
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
-import org.junit.Assert
-class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
+class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with
WriteUtils {
override protected val resourcePath: String = "/tpch-data-parquet"
override protected val fileFormat: String = "parquet"
@@ -74,13 +73,9 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
withTempPath {
f =>
val path = f.getCanonicalPath
- val testAppender = new LogAppender("native write tracker")
- withLogAppender(testAppender) {
- spark.sql("select array(struct(1), null) as
var1").write.mode("overwrite").save(path)
- }
- assert(
- !testAppender.loggingEvents.exists(
- _.getMessage.toString.contains("Use Gluten parquet write for
hive")))
+ checkNativeWrite(
+ s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET SELECT
array(struct(1), null) as var1",
+ expectNative = false)
}
}
@@ -128,18 +123,27 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
}
}
+ test("test insert into") {
+ withTable("t") {
+ spark.sql("CREATE TABLE t (id INT) USING PARQUET")
+ checkNativeWrite("INSERT INTO t VALUES 1")
+ checkAnswer(spark.sql("SELECT * FROM t"), Row(1))
+ }
+ }
+
test("test ctas") {
withTable("velox_ctas") {
spark
.range(100)
.toDF("id")
.createOrReplaceTempView("ctas_temp")
- val df = spark.sql("CREATE TABLE velox_ctas USING PARQUET AS SELECT *
FROM ctas_temp")
-
Assert.assertTrue(FallbackUtil.hasFallback(df.queryExecution.executedPlan))
+ checkNativeWrite(
+ "CREATE TABLE velox_ctas USING PARQUET AS SELECT * FROM ctas_temp",
+ expectNative = isSparkVersionGE("3.4"))
}
}
- test("test parquet dynamic partition write") {
+ test("test insert overwrite dir") {
withTempPath {
f =>
val path = f.getCanonicalPath
@@ -147,8 +151,20 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
.range(100)
.selectExpr("id as c1", "id % 7 as p")
.createOrReplaceTempView("temp")
- val df = spark.sql(s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET
SELECT * FROM temp")
-
Assert.assertTrue(FallbackUtil.hasFallback(df.queryExecution.executedPlan))
+ checkNativeWrite(s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET
SELECT * FROM temp")
+ }
+ }
+
+ test("test dynamic and static partition write table") {
+ withTable("t") {
+ spark.sql(
+ "CREATE TABLE t (c int, d long, e long)" +
+ " USING PARQUET partitioned by (c, d)")
+ checkNativeWrite(
+ "INSERT OVERWRITE TABLE t partition(c=1, d)" +
+ " SELECT 3 as e, 2 as d")
+ checkAnswer(spark.table("t"), Row(3, 1, 2))
+ checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2")))
}
}
@@ -158,10 +174,10 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
.range(100)
.selectExpr("id as c1", "id % 7 as p")
.createOrReplaceTempView("bucket_temp")
- val df = spark.sql(
+ checkNativeWrite(
"CREATE TABLE bucket USING PARQUET CLUSTERED BY (p) INTO 7 BUCKETS " +
- "AS SELECT * FROM bucket_temp")
-
Assert.assertTrue(FallbackUtil.hasFallback(df.queryExecution.executedPlan))
+ "AS SELECT * FROM bucket_temp",
+ expectNative = false)
}
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/WriteUtils.scala
similarity index 77%
rename from
backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
rename to
backends-velox/src/test/scala/org/apache/spark/sql/execution/WriteUtils.scala
index a9fe8269e9..416ce41f57 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/WriteUtils.scala
@@ -16,20 +16,51 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.execution.VeloxColumnarToCarrierRowExec
+
import org.apache.spark.sql.{DataFrame, GlutenQueryTest}
import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, Expression,
HiveHash, Literal, Pmod, UnsafeProjection}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.util.QueryExecutionListener
import java.io.File
-trait BucketWriteUtils extends GlutenQueryTest with SQLTestUtils {
+trait WriteUtils extends GlutenQueryTest with SQLTestUtils {
def tableDir(table: String): File = {
val identifier = spark.sessionState.sqlParser.parseTableIdentifier(table)
new File(spark.sessionState.catalog.defaultTablePath(identifier))
}
+ def checkNativeWrite(sqlStr: String, expectNative: Boolean = true): Unit = {
+ var nativeUsed = false
+ val queryListener = new QueryExecutionListener {
+ override def onFailure(f: String, qe: QueryExecution, e: Exception):
Unit = {}
+ override def onSuccess(funcName: String, qe: QueryExecution, duration:
Long): Unit = {
+ if (!nativeUsed) {
+ nativeUsed = if (isSparkVersionGE("3.4")) {
+ qe.executedPlan.exists(_.isInstanceOf[ColumnarWriteFilesExec])
+ } else {
+
qe.executedPlan.exists(_.isInstanceOf[VeloxColumnarToCarrierRowExec])
+ }
+ }
+ }
+ }
+ try {
+ spark.listenerManager.register(queryListener)
+ spark.sql(sqlStr)
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ if (expectNative) {
+ assert(nativeUsed)
+ } else {
+ assert(!nativeUsed)
+ }
+ } finally {
+ spark.listenerManager.unregister(queryListener)
+ }
+ }
+
protected def testBucketing(
dataDir: File,
source: String = "parquet",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]