This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b1741c6c23 [VL] Code cleanup for Arrow CSV UTs (#7651)
b1741c6c23 is described below

commit b1741c6c233a9e37ce13a6d8d971c10f30cb2cdd
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Oct 24 10:00:01 2024 +0800

    [VL] Code cleanup for Arrow CSV UTs (#7651)
---
 .../datasource/v2/ArrowBatchScanExec.scala         |  11 +-
 .../sql/execution/ArrowFileSourceScanExec.scala    |  10 +-
 .../spark/sql/execution/BaseArrowScanExec.scala    |  28 +++
 .../gluten/execution/ArrowCsvScanSuite.scala       | 199 +++++++++++++++++
 .../gluten/execution/MiscOperatorSuite.scala       | 237 +--------------------
 5 files changed, 231 insertions(+), 254 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
index 7255bca58f..abd6d96731 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
@@ -16,27 +16,20 @@
  */
 package org.apache.gluten.execution.datasource.v2
 
-import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.connector.read.{Batch, PartitionReaderFactory, 
Scan}
+import org.apache.spark.sql.execution.BaseArrowScanExec
 import org.apache.spark.sql.execution.datasources.v2.{ArrowBatchScanExecShim, 
BatchScanExec}
 
 case class ArrowBatchScanExec(original: BatchScanExec)
   extends ArrowBatchScanExecShim(original)
-  with GlutenPlan {
+  with BaseArrowScanExec {
 
   @transient lazy val batch: Batch = original.batch
 
-  override protected def batchType0(): Convention.BatchType = {
-    ArrowBatches.ArrowJavaBatch
-  }
-
   override lazy val readerFactory: PartitionReaderFactory = 
original.readerFactory
 
   override lazy val inputRDD: RDD[InternalRow] = original.inputRDD
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
index ee19425aeb..85bc682234 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
@@ -16,10 +16,6 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -29,7 +25,7 @@ import scala.concurrent.duration.NANOSECONDS
 
 case class ArrowFileSourceScanExec(original: FileSourceScanExec)
   extends ArrowFileSourceScanLikeShim(original)
-  with GlutenPlan {
+  with BaseArrowScanExec {
 
   lazy val inputRDD: RDD[InternalRow] = original.inputRDD
 
@@ -41,10 +37,6 @@ case class ArrowFileSourceScanExec(original: 
FileSourceScanExec)
 
   override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()
 
-  override protected def batchType0(): Convention.BatchType = {
-    ArrowBatches.ArrowJavaBatch
-  }
-
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
     val numOutputRows = longMetric("numOutputRows")
     val scanTime = longMetric("scanTime")
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
new file mode 100644
index 0000000000..38a6d1803d
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.columnarbatch.ArrowBatches
+import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
+
+trait BaseArrowScanExec extends GlutenPlan {
+
+  final override protected def batchType0(): Convention.BatchType = {
+    ArrowBatches.ArrowJavaBatch
+  }
+}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
new file mode 100644
index 0000000000..116813fcf6
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.datasource.ArrowCSVFileFormat
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.{ArrowFileSourceScanExec, 
BaseArrowScanExec, ColumnarToRowExec}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+class ArrowCsvScanSuiteV1 extends ArrowCsvScanSuite {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.sql.sources.useV1SourceList", "csv")
+  }
+
+  test("csv scan v1") {
+    val df = runAndCompare("select * from student")()
+    val plan = df.queryExecution.executedPlan
+    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
+    assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+    val scan = plan.find(_.isInstanceOf[BaseArrowScanExec]).toList.head
+    assert(
+      scan
+        .asInstanceOf[ArrowFileSourceScanExec]
+        .relation
+        .fileFormat
+        .isInstanceOf[ArrowCSVFileFormat])
+  }
+
+  test("csv scan with schema v1") {
+    val df = runAndCompare("select * from student_option_schema")()
+    val plan = df.queryExecution.executedPlan
+    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
+    val scan = plan.find(_.isInstanceOf[BaseArrowScanExec])
+    assert(scan.isDefined)
+    assert(
+      !scan.get
+        .asInstanceOf[ArrowFileSourceScanExec]
+        .original
+        .relation
+        .fileFormat
+        .asInstanceOf[ArrowCSVFileFormat]
+        .fallback)
+  }
+}
+
+class ArrowCsvScanSuiteV2 extends ArrowCsvScanSuite {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.sql.sources.useV1SourceList", "")
+  }
+}
+
+/** Since https://github.com/apache/incubator-gluten/pull/5850. */
+abstract class ArrowCsvScanSuite extends VeloxWholeStageTransformerSuite {
+  override protected val resourcePath: String = "N/A"
+  override protected val fileFormat: String = "N/A"
+
+  protected val rootPath: String = getClass.getResource("/").getPath
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    createCsvTables()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+  }
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.sql.files.maxPartitionBytes", "1g")
+      .set("spark.sql.shuffle.partitions", "1")
+      .set("spark.memory.offHeap.size", "2g")
+      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
+  }
+
+  test("csv scan with option string as null") {
+    val df = runAndCompare("select * from student_option_str")()
+    val plan = df.queryExecution.executedPlan
+    assert(plan.find(_.isInstanceOf[ColumnarToRowExec]).isDefined)
+    assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+  }
+
+  test("csv scan with option delimiter") {
+    val df = runAndCompare("select * from student_option")()
+    val plan = df.queryExecution.executedPlan
+    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
+    assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+  }
+
+  test("csv scan with missing columns") {
+    val df =
+      runAndCompare("select languagemissing, language, id_new_col from 
student_option_schema_lm")()
+    val plan = df.queryExecution.executedPlan
+    assert(plan.find(s => s.isInstanceOf[VeloxColumnarToRowExec]).isDefined)
+    assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+  }
+
+  test("csv scan with different name") {
+    val df = runAndCompare("select * from student_option_schema")()
+    val plan = df.queryExecution.executedPlan
+    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
+    assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+
+    val df2 = runAndCompare("select * from student_option_schema")()
+    val plan2 = df2.queryExecution.executedPlan
+    assert(plan2.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
+    assert(plan2.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
+  }
+
+  test("csv scan with filter") {
+    val df = runAndCompare("select * from student where Name = 'Peter'")()
+    assert(df.queryExecution.executedPlan.find(s => 
s.isInstanceOf[ColumnarToRowExec]).isEmpty)
+    assert(
+      df.queryExecution.executedPlan
+        .find(s => s.isInstanceOf[BaseArrowScanExec])
+        .isDefined)
+  }
+
+  test("insert into select from csv") {
+    withTable("insert_csv_t") {
+      spark.sql("create table insert_csv_t(Name string, Language string) using 
parquet;")
+      runQueryAndCompare("""
+                           |insert into insert_csv_t select * from student;
+                           |""".stripMargin) {
+        checkGlutenOperatorMatch[BaseArrowScanExec]
+      }
+    }
+  }
+
+  test("count(1) on csv scan") {
+    val df = runAndCompare("select count(1) from student")()
+    checkLengthAndPlan(df, 1)
+  }
+
+  private def createCsvTables(): Unit = {
+    spark.read
+      .format("csv")
+      .option("header", "true")
+      .load(rootPath + "/datasource/csv/student.csv")
+      .createOrReplaceTempView("student")
+
+    spark.read
+      .format("csv")
+      .option("header", "true")
+      .load(rootPath + "/datasource/csv/student_option_str.csv")
+      .createOrReplaceTempView("student_option_str")
+
+    spark.read
+      .format("csv")
+      .option("header", "true")
+      .option("delimiter", ";")
+      .load(rootPath + "/datasource/csv/student_option.csv")
+      .createOrReplaceTempView("student_option")
+
+    spark.read
+      .schema(
+        new StructType()
+          .add("id", StringType)
+          .add("name", StringType)
+          .add("language", StringType))
+      .format("csv")
+      .option("header", "true")
+      .load(rootPath + "/datasource/csv/student_option_schema.csv")
+      .createOrReplaceTempView("student_option_schema")
+
+    spark.read
+      .schema(
+        new StructType()
+          .add("id_new_col", IntegerType)
+          .add("name", StringType)
+          .add("language", StringType)
+          .add("languagemissing", StringType))
+      .format("csv")
+      .option("header", "true")
+      .load(rootPath + "/datasource/csv/student_option_schema.csv")
+      .createOrReplaceTempView("student_option_schema_lm")
+  }
+}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 76a46836a1..ba42b57f1b 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -17,8 +17,6 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.datasource.ArrowCSVFileFormat
-import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
 import org.apache.gluten.expression.VeloxDummyExpression
 import org.apache.gluten.sql.shims.SparkShimLoader
 
@@ -29,7 +27,7 @@ import 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ArrayType, DecimalType, IntegerType, 
StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 
 import java.util.concurrent.TimeUnit
 
@@ -701,239 +699,6 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
     }
   }
 
-  test("csv scan") {
-    val df = runAndCompare("select * from student") {
-      val filePath = rootPath + "/datasource/csv/student.csv"
-      val df = spark.read
-        .format("csv")
-        .option("header", "true")
-        .load(filePath)
-      df.createOrReplaceTempView("student")
-    }
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-    assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
-    val scan = plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).toList.head
-    assert(
-      scan
-        .asInstanceOf[ArrowFileSourceScanExec]
-        .relation
-        .fileFormat
-        .isInstanceOf[ArrowCSVFileFormat])
-  }
-
-  test("csv scan with option string as null") {
-    val df = runAndCompare("select * from student") {
-      val filePath = rootPath + "/datasource/csv/student_option_str.csv"
-      // test strings as null
-      val df = spark.read
-        .format("csv")
-        .option("header", "true")
-        .load(filePath)
-      df.createOrReplaceTempView("student")
-    }
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(_.isInstanceOf[ColumnarToRowExec]).isDefined)
-    assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
-  }
-
-  test("csv scan with option delimiter") {
-    val df = runAndCompare("select * from student") {
-      val filePath = rootPath + "/datasource/csv/student_option.csv"
-      val df = spark.read
-        .format("csv")
-        .option("header", "true")
-        .option("delimiter", ";")
-        .load(filePath)
-      df.createOrReplaceTempView("student")
-    }
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-    assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
-  }
-
-  test("csv scan with schema") {
-    val df = runAndCompare("select * from student") {
-      val filePath = rootPath + "/datasource/csv/student_option_schema.csv"
-      val schema = new StructType()
-        .add("id", StringType)
-        .add("name", StringType)
-        .add("language", StringType)
-      val df = spark.read
-        .schema(schema)
-        .format("csv")
-        .option("header", "true")
-        .load(filePath)
-      df.createOrReplaceTempView("student")
-    }
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-    val scan = plan.find(_.isInstanceOf[ArrowFileSourceScanExec])
-    assert(scan.isDefined)
-    assert(
-      !scan.get
-        .asInstanceOf[ArrowFileSourceScanExec]
-        .original
-        .relation
-        .fileFormat
-        .asInstanceOf[ArrowCSVFileFormat]
-        .fallback)
-  }
-
-  test("csv scan with missing columns") {
-    val df = runAndCompare("select languagemissing, language, id_new_col from 
student") {
-      val filePath = rootPath + "/datasource/csv/student_option_schema.csv"
-      val schema = new StructType()
-        .add("id_new_col", IntegerType)
-        .add("name", StringType)
-        .add("language", StringType)
-        .add("languagemissing", StringType)
-      val df = spark.read
-        .schema(schema)
-        .format("csv")
-        .option("header", "true")
-        .load(filePath)
-      df.createOrReplaceTempView("student")
-    }
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(s => s.isInstanceOf[VeloxColumnarToRowExec]).isDefined)
-    assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
-  }
-
-  test("csv scan with different name") {
-    val df = runAndCompare("select * from student") {
-      val filePath = rootPath + "/datasource/csv/student_option_schema.csv"
-      val schema = new StructType()
-        .add("id_new_col", StringType)
-        .add("name", StringType)
-        .add("language", StringType)
-      val df = spark.read
-        .schema(schema)
-        .format("csv")
-        .option("header", "true")
-        .load(filePath)
-      df.createOrReplaceTempView("student")
-    }
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-    assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
-
-    val df2 = runAndCompare("select * from student_schema") {
-      val filePath = rootPath + "/datasource/csv/student_option_schema.csv"
-      val schema = new StructType()
-        .add("name", StringType)
-        .add("language", StringType)
-      val df = spark.read
-        .schema(schema)
-        .format("csv")
-        .option("header", "true")
-        .load(filePath)
-      df.createOrReplaceTempView("student_schema")
-    }
-    val plan2 = df2.queryExecution.executedPlan
-    assert(plan2.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-    assert(plan2.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
-  }
-
-  test("csv scan with filter") {
-    val df = runAndCompare("select * from student where Name = 'Peter'") {
-      val filePath = rootPath + "/datasource/csv/student.csv"
-      val df = spark.read
-        .format("csv")
-        .option("header", "true")
-        .load(filePath)
-      df.createOrReplaceTempView("student")
-    }
-    assert(df.queryExecution.executedPlan.find(s => 
s.isInstanceOf[ColumnarToRowExec]).isEmpty)
-    assert(
-      df.queryExecution.executedPlan
-        .find(s => s.isInstanceOf[ArrowFileSourceScanExec])
-        .isDefined)
-  }
-
-  test("insert into select from csv") {
-    withTable("insert_csv_t") {
-      val filePath = rootPath + "/datasource/csv/student.csv"
-      val df = spark.read
-        .format("csv")
-        .option("header", "true")
-        .load(filePath)
-      df.createOrReplaceTempView("student")
-      spark.sql("create table insert_csv_t(Name string, Language string) using 
parquet;")
-      runQueryAndCompare("""
-                           |insert into insert_csv_t select * from student;
-                           |""".stripMargin) {
-        checkGlutenOperatorMatch[ArrowFileSourceScanExec]
-      }
-    }
-  }
-
-  test("csv scan datasource v2") {
-    withSQLConf("spark.sql.sources.useV1SourceList" -> "") {
-      val df = runAndCompare("select * from student") {
-        val filePath = rootPath + "/datasource/csv/student.csv"
-        val df = spark.read
-          .format("csv")
-          .option("header", "true")
-          .load(filePath)
-        df.createOrReplaceTempView("student")
-      }
-      val plan = df.queryExecution.executedPlan
-      assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-      assert(plan.find(s => s.isInstanceOf[ArrowBatchScanExec]).isDefined)
-    }
-  }
-
-  test("csv scan datasource v2 with filter") {
-    withSQLConf("spark.sql.sources.useV1SourceList" -> "") {
-      val df = runAndCompare("select * from student where Name = 'Peter'") {
-        val filePath = rootPath + "/datasource/csv/student.csv"
-        val df = spark.read
-          .format("csv")
-          .option("header", "true")
-          .load(filePath)
-        df.createOrReplaceTempView("student")
-      }
-
-      val plan = df.queryExecution.executedPlan
-      assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isEmpty)
-      assert(plan.find(s => s.isInstanceOf[ArrowBatchScanExec]).isDefined)
-    }
-  }
-
-  test("csv scan with schema datasource v2") {
-    withSQLConf("spark.sql.sources.useV1SourceList" -> "") {
-      val df = runAndCompare("select * from student") {
-        val filePath = rootPath + "/datasource/csv/student_option_schema.csv"
-        val schema = new StructType()
-          .add("id", StringType)
-          .add("name", StringType)
-          .add("language", StringType)
-        val df = spark.read
-          .schema(schema)
-          .format("csv")
-          .option("header", "true")
-          .load(filePath)
-        df.createOrReplaceTempView("student")
-      }
-      val plan = df.queryExecution.executedPlan
-      assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-      assert(plan.find(_.isInstanceOf[ArrowBatchScanExec]).isDefined)
-    }
-  }
-
-  test("count(1) on csv scan") {
-    val df = runAndCompare("select count(1) from student") {
-      val filePath = rootPath + "/datasource/csv/student.csv"
-      val df = spark.read
-        .format("csv")
-        .option("header", "true")
-        .load(filePath)
-      df.createOrReplaceTempView("student")
-    }
-    checkLengthAndPlan(df, 1)
-  }
-
   test("combine small batches before shuffle") {
     val minBatchSize = 15
     withSQLConf(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to