This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b1741c6c23 [VL] Code cleanup for Arrow CSV UTs (#7651)
b1741c6c23 is described below
commit b1741c6c233a9e37ce13a6d8d971c10f30cb2cdd
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Oct 24 10:00:01 2024 +0800
[VL] Code cleanup for Arrow CSV UTs (#7651)
---
.../datasource/v2/ArrowBatchScanExec.scala | 11 +-
.../sql/execution/ArrowFileSourceScanExec.scala | 10 +-
.../spark/sql/execution/BaseArrowScanExec.scala | 28 +++
.../gluten/execution/ArrowCsvScanSuite.scala | 199 +++++++++++++++++
.../gluten/execution/MiscOperatorSuite.scala | 237 +--------------------
5 files changed, 231 insertions(+), 254 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
index 7255bca58f..abd6d96731 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
@@ -16,27 +16,20 @@
*/
package org.apache.gluten.execution.datasource.v2
-import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.connector.read.{Batch, PartitionReaderFactory,
Scan}
+import org.apache.spark.sql.execution.BaseArrowScanExec
import org.apache.spark.sql.execution.datasources.v2.{ArrowBatchScanExecShim,
BatchScanExec}
case class ArrowBatchScanExec(original: BatchScanExec)
extends ArrowBatchScanExecShim(original)
- with GlutenPlan {
+ with BaseArrowScanExec {
@transient lazy val batch: Batch = original.batch
- override protected def batchType0(): Convention.BatchType = {
- ArrowBatches.ArrowJavaBatch
- }
-
override lazy val readerFactory: PartitionReaderFactory =
original.readerFactory
override lazy val inputRDD: RDD[InternalRow] = original.inputRDD
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
index ee19425aeb..85bc682234 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
@@ -16,10 +16,6 @@
*/
package org.apache.spark.sql.execution
-import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -29,7 +25,7 @@ import scala.concurrent.duration.NANOSECONDS
case class ArrowFileSourceScanExec(original: FileSourceScanExec)
extends ArrowFileSourceScanLikeShim(original)
- with GlutenPlan {
+ with BaseArrowScanExec {
lazy val inputRDD: RDD[InternalRow] = original.inputRDD
@@ -41,10 +37,6 @@ case class ArrowFileSourceScanExec(original:
FileSourceScanExec)
override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()
- override protected def batchType0(): Convention.BatchType = {
- ArrowBatches.ArrowJavaBatch
- }
-
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val scanTime = longMetric("scanTime")
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
new file mode 100644
index 0000000000..38a6d1803d
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.columnarbatch.ArrowBatches
+import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
+
+trait BaseArrowScanExec extends GlutenPlan {
+
+ final override protected def batchType0(): Convention.BatchType = {
+ ArrowBatches.ArrowJavaBatch
+ }
+}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
new file mode 100644
index 0000000000..116813fcf6
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.datasource.ArrowCSVFileFormat
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.{ArrowFileSourceScanExec,
BaseArrowScanExec, ColumnarToRowExec}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+class ArrowCsvScanSuiteV1 extends ArrowCsvScanSuite {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.sql.sources.useV1SourceList", "csv")
+ }
+
+ test("csv scan v1") {
+ val df = runAndCompare("select * from student")()
+ val plan = df.queryExecution.executedPlan
+ assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
+ assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+ val scan = plan.find(_.isInstanceOf[BaseArrowScanExec]).toList.head
+ assert(
+ scan
+ .asInstanceOf[ArrowFileSourceScanExec]
+ .relation
+ .fileFormat
+ .isInstanceOf[ArrowCSVFileFormat])
+ }
+
+ test("csv scan with schema v1") {
+ val df = runAndCompare("select * from student_option_schema")()
+ val plan = df.queryExecution.executedPlan
+ assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
+ val scan = plan.find(_.isInstanceOf[BaseArrowScanExec])
+ assert(scan.isDefined)
+ assert(
+ !scan.get
+ .asInstanceOf[ArrowFileSourceScanExec]
+ .original
+ .relation
+ .fileFormat
+ .asInstanceOf[ArrowCSVFileFormat]
+ .fallback)
+ }
+}
+
+class ArrowCsvScanSuiteV2 extends ArrowCsvScanSuite {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.sql.sources.useV1SourceList", "")
+ }
+}
+
+/** Since https://github.com/apache/incubator-gluten/pull/5850. */
+abstract class ArrowCsvScanSuite extends VeloxWholeStageTransformerSuite {
+ override protected val resourcePath: String = "N/A"
+ override protected val fileFormat: String = "N/A"
+
+ protected val rootPath: String = getClass.getResource("/").getPath
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ createCsvTables()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ }
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.sql.files.maxPartitionBytes", "1g")
+ .set("spark.sql.shuffle.partitions", "1")
+ .set("spark.memory.offHeap.size", "2g")
+ .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+ .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
+ }
+
+ test("csv scan with option string as null") {
+ val df = runAndCompare("select * from student_option_str")()
+ val plan = df.queryExecution.executedPlan
+ assert(plan.find(_.isInstanceOf[ColumnarToRowExec]).isDefined)
+ assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+ }
+
+ test("csv scan with option delimiter") {
+ val df = runAndCompare("select * from student_option")()
+ val plan = df.queryExecution.executedPlan
+ assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
+ assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+ }
+
+ test("csv scan with missing columns") {
+ val df =
+ runAndCompare("select languagemissing, language, id_new_col from
student_option_schema_lm")()
+ val plan = df.queryExecution.executedPlan
+ assert(plan.find(s => s.isInstanceOf[VeloxColumnarToRowExec]).isDefined)
+ assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+ }
+
+ test("csv scan with different name") {
+ val df = runAndCompare("select * from student_option_schema")()
+ val plan = df.queryExecution.executedPlan
+ assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
+ assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+
+ val df2 = runAndCompare("select * from student_option_schema")()
+ val plan2 = df2.queryExecution.executedPlan
+ assert(plan2.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
+ assert(plan2.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+ }
+
+ test("csv scan with filter") {
+ val df = runAndCompare("select * from student where Name = 'Peter'")()
+ assert(df.queryExecution.executedPlan.find(s =>
s.isInstanceOf[ColumnarToRowExec]).isEmpty)
+ assert(
+ df.queryExecution.executedPlan
+ .find(s => s.isInstanceOf[BaseArrowScanExec])
+ .isDefined)
+ }
+
+ test("insert into select from csv") {
+ withTable("insert_csv_t") {
+ spark.sql("create table insert_csv_t(Name string, Language string) using
parquet;")
+ runQueryAndCompare("""
+ |insert into insert_csv_t select * from student;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[BaseArrowScanExec]
+ }
+ }
+ }
+
+ test("count(1) on csv scan") {
+ val df = runAndCompare("select count(1) from student")()
+ checkLengthAndPlan(df, 1)
+ }
+
+ private def createCsvTables(): Unit = {
+ spark.read
+ .format("csv")
+ .option("header", "true")
+ .load(rootPath + "/datasource/csv/student.csv")
+ .createOrReplaceTempView("student")
+
+ spark.read
+ .format("csv")
+ .option("header", "true")
+ .load(rootPath + "/datasource/csv/student_option_str.csv")
+ .createOrReplaceTempView("student_option_str")
+
+ spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("delimiter", ";")
+ .load(rootPath + "/datasource/csv/student_option.csv")
+ .createOrReplaceTempView("student_option")
+
+ spark.read
+ .schema(
+ new StructType()
+ .add("id", StringType)
+ .add("name", StringType)
+ .add("language", StringType))
+ .format("csv")
+ .option("header", "true")
+ .load(rootPath + "/datasource/csv/student_option_schema.csv")
+ .createOrReplaceTempView("student_option_schema")
+
+ spark.read
+ .schema(
+ new StructType()
+ .add("id_new_col", IntegerType)
+ .add("name", StringType)
+ .add("language", StringType)
+ .add("languagemissing", StringType))
+ .format("csv")
+ .option("header", "true")
+ .load(rootPath + "/datasource/csv/student_option_schema.csv")
+ .createOrReplaceTempView("student_option_schema_lm")
+ }
+}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 76a46836a1..ba42b57f1b 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -17,8 +17,6 @@
package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.datasource.ArrowCSVFileFormat
-import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
import org.apache.gluten.expression.VeloxDummyExpression
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -29,7 +27,7 @@ import
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ArrayType, DecimalType, IntegerType,
StringType, StructField, StructType}
+import org.apache.spark.sql.types._
import java.util.concurrent.TimeUnit
@@ -701,239 +699,6 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
- test("csv scan") {
- val df = runAndCompare("select * from student") {
- val filePath = rootPath + "/datasource/csv/student.csv"
- val df = spark.read
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
- val scan = plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).toList.head
- assert(
- scan
- .asInstanceOf[ArrowFileSourceScanExec]
- .relation
- .fileFormat
- .isInstanceOf[ArrowCSVFileFormat])
- }
-
- test("csv scan with option string as null") {
- val df = runAndCompare("select * from student") {
- val filePath = rootPath + "/datasource/csv/student_option_str.csv"
- // test strings as null
- val df = spark.read
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
- val plan = df.queryExecution.executedPlan
- assert(plan.find(_.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
- }
-
- test("csv scan with option delimiter") {
- val df = runAndCompare("select * from student") {
- val filePath = rootPath + "/datasource/csv/student_option.csv"
- val df = spark.read
- .format("csv")
- .option("header", "true")
- .option("delimiter", ";")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
- }
-
- test("csv scan with schema") {
- val df = runAndCompare("select * from student") {
- val filePath = rootPath + "/datasource/csv/student_option_schema.csv"
- val schema = new StructType()
- .add("id", StringType)
- .add("name", StringType)
- .add("language", StringType)
- val df = spark.read
- .schema(schema)
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- val scan = plan.find(_.isInstanceOf[ArrowFileSourceScanExec])
- assert(scan.isDefined)
- assert(
- !scan.get
- .asInstanceOf[ArrowFileSourceScanExec]
- .original
- .relation
- .fileFormat
- .asInstanceOf[ArrowCSVFileFormat]
- .fallback)
- }
-
- test("csv scan with missing columns") {
- val df = runAndCompare("select languagemissing, language, id_new_col from
student") {
- val filePath = rootPath + "/datasource/csv/student_option_schema.csv"
- val schema = new StructType()
- .add("id_new_col", IntegerType)
- .add("name", StringType)
- .add("language", StringType)
- .add("languagemissing", StringType)
- val df = spark.read
- .schema(schema)
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[VeloxColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
- }
-
- test("csv scan with different name") {
- val df = runAndCompare("select * from student") {
- val filePath = rootPath + "/datasource/csv/student_option_schema.csv"
- val schema = new StructType()
- .add("id_new_col", StringType)
- .add("name", StringType)
- .add("language", StringType)
- val df = spark.read
- .schema(schema)
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
-
- val df2 = runAndCompare("select * from student_schema") {
- val filePath = rootPath + "/datasource/csv/student_option_schema.csv"
- val schema = new StructType()
- .add("name", StringType)
- .add("language", StringType)
- val df = spark.read
- .schema(schema)
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student_schema")
- }
- val plan2 = df2.queryExecution.executedPlan
- assert(plan2.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan2.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
- }
-
- test("csv scan with filter") {
- val df = runAndCompare("select * from student where Name = 'Peter'") {
- val filePath = rootPath + "/datasource/csv/student.csv"
- val df = spark.read
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
- assert(df.queryExecution.executedPlan.find(s =>
s.isInstanceOf[ColumnarToRowExec]).isEmpty)
- assert(
- df.queryExecution.executedPlan
- .find(s => s.isInstanceOf[ArrowFileSourceScanExec])
- .isDefined)
- }
-
- test("insert into select from csv") {
- withTable("insert_csv_t") {
- val filePath = rootPath + "/datasource/csv/student.csv"
- val df = spark.read
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- spark.sql("create table insert_csv_t(Name string, Language string) using
parquet;")
- runQueryAndCompare("""
- |insert into insert_csv_t select * from student;
- |""".stripMargin) {
- checkGlutenOperatorMatch[ArrowFileSourceScanExec]
- }
- }
- }
-
- test("csv scan datasource v2") {
- withSQLConf("spark.sql.sources.useV1SourceList" -> "") {
- val df = runAndCompare("select * from student") {
- val filePath = rootPath + "/datasource/csv/student.csv"
- val df = spark.read
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan.find(s => s.isInstanceOf[ArrowBatchScanExec]).isDefined)
- }
- }
-
- test("csv scan datasource v2 with filter") {
- withSQLConf("spark.sql.sources.useV1SourceList" -> "") {
- val df = runAndCompare("select * from student where Name = 'Peter'") {
- val filePath = rootPath + "/datasource/csv/student.csv"
- val df = spark.read
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
-
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isEmpty)
- assert(plan.find(s => s.isInstanceOf[ArrowBatchScanExec]).isDefined)
- }
- }
-
- test("csv scan with schema datasource v2") {
- withSQLConf("spark.sql.sources.useV1SourceList" -> "") {
- val df = runAndCompare("select * from student") {
- val filePath = rootPath + "/datasource/csv/student_option_schema.csv"
- val schema = new StructType()
- .add("id", StringType)
- .add("name", StringType)
- .add("language", StringType)
- val df = spark.read
- .schema(schema)
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[ArrowBatchScanExec]).isDefined)
- }
- }
-
- test("count(1) on csv scan") {
- val df = runAndCompare("select count(1) from student") {
- val filePath = rootPath + "/datasource/csv/student.csv"
- val df = spark.read
- .format("csv")
- .option("header", "true")
- .load(filePath)
- df.createOrReplaceTempView("student")
- }
- checkLengthAndPlan(df, 1)
- }
-
test("combine small batches before shuffle") {
val minBatchSize = 15
withSQLConf(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]