This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9cab0bedd8 [GLUTEN-7600][VL] Prepare test case for the removal of 
workaround code for empty schema batches (#7601)
9cab0bedd8 is described below

commit 9cab0bedd8f54b1ffbfa3d25795f8852267ab9f9
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Oct 21 13:16:42 2024 +0800

    [GLUTEN-7600][VL] Prepare test case for the removal of workaround code for 
empty schema batches (#7601)
---
 .github/workflows/velox_backend_cache.yml          |  4 +-
 .../execution/ColumnarPartialProjectExec.scala     |  6 +-
 .../gluten/execution/VeloxColumnarToRowExec.scala  | 82 ++++++++++------------
 .../gluten/extension/EmptySchemaWorkaround.scala   |  4 +-
 .../velox/VeloxFormatWriterInjects.scala           |  3 +-
 .../gluten/execution/MiscOperatorSuite.scala       | 12 ++++
 cpp/velox/compute/VeloxRuntime.h                   |  4 +-
 .../gluten/columnarbatch/ColumnarBatches.java      |  6 +-
 .../gluten/extension/GlutenColumnarRule.scala      |  2 -
 9 files changed, 66 insertions(+), 57 deletions(-)

diff --git a/.github/workflows/velox_backend_cache.yml 
b/.github/workflows/velox_backend_cache.yml
index 41cfcd5477..71a7a5840b 100644
--- a/.github/workflows/velox_backend_cache.yml
+++ b/.github/workflows/velox_backend_cache.yml
@@ -46,7 +46,7 @@ jobs:
           df -a
           yum install ccache -y
           bash dev/ci-velox-buildstatic-centos-7.sh
-      - name: Save ccache
+      - name: Save Ccache
         uses: actions/cache/save@v3
         id: ccache
         with:
@@ -76,7 +76,7 @@ jobs:
           df -a
           bash dev/ci-velox-buildshared-centos-8.sh
           ccache -s
-      - name: Save ccache
+      - name: Save Ccache
         uses: actions/cache/save@v3
         id: ccache
         with:
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index fc0fc041a4..1c394103db 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -270,9 +270,11 @@ case class ColumnarPartialProjectExec(original: 
ProjectExec, child: SparkPlan)(
     val proj = MutableProjection.create(replacedAliasUdf, 
projectAttributes.toSeq)
     val numRows = childData.numRows()
     val start = System.currentTimeMillis()
-    val arrowBatch = if (childData.numCols() == 0 || 
ColumnarBatches.isHeavyBatch(childData)) {
+    val arrowBatch = if (childData.numCols() == 0) {
       childData
-    } else ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), 
childData)
+    } else {
+      ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), childData)
+    }
     c2a += System.currentTimeMillis() - start
 
     val schema =
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
index 639113efce..0df66da833 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.iterator.Iterators
@@ -26,7 +26,7 @@ import 
org.apache.gluten.vectorized.NativeColumnarToRowJniWrapper
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
 import org.apache.spark.sql.execution.{BroadcastUtils, SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
@@ -140,53 +140,49 @@ object VeloxColumnarToRowExec {
 
         if (batch.numRows == 0) {
           batch.close()
-          Iterator.empty
-        } else if (
-          batch.numCols() > 0 &&
-          !ColumnarBatches.isLightBatch(batch)
-        ) {
-          // Fallback to ColumnarToRow of vanilla Spark.
-          val localOutput = output
-          val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
-          batch.rowIterator().asScala.map(toUnsafe)
-        } else if (output.isEmpty) {
+          return Iterator.empty
+        }
+
+        if (output.isEmpty) {
           numInputBatches += 1
           numOutputRows += batch.numRows()
           val rows = ColumnarBatches.emptyRowIterator(batch.numRows()).asScala
           batch.close()
-          rows
-        } else {
-          val cols = batch.numCols()
-          val rows = batch.numRows()
-          val beforeConvert = System.currentTimeMillis()
-          val batchHandle = ColumnarBatches.getNativeHandle(batch)
-          var info =
-            jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0)
-
-          convertTime += (System.currentTimeMillis() - beforeConvert)
-
-          new Iterator[InternalRow] {
-            var rowId = 0
-            var baseLength = 0
-            val row = new UnsafeRow(cols)
-
-            override def hasNext: Boolean = {
-              rowId < rows
-            }
+          return rows
+        }
+
+        VeloxColumnarBatches.checkVeloxBatch(batch)
+
+        val cols = batch.numCols()
+        val rows = batch.numRows()
+        val beforeConvert = System.currentTimeMillis()
+        val batchHandle = ColumnarBatches.getNativeHandle(batch)
+        var info =
+          jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0)
+
+        convertTime += (System.currentTimeMillis() - beforeConvert)
+
+        new Iterator[InternalRow] {
+          var rowId = 0
+          var baseLength = 0
+          val row = new UnsafeRow(cols)
+
+          override def hasNext: Boolean = {
+            rowId < rows
+          }
 
-            override def next: UnsafeRow = {
-              if (rowId == baseLength + info.lengths.length) {
-                baseLength += info.lengths.length
-                val before = System.currentTimeMillis()
-                info = jniWrapper.nativeColumnarToRowConvert(c2rId, 
batchHandle, rowId)
-                convertTime += (System.currentTimeMillis() - before)
-              }
-              val (offset, length) =
-                (info.offsets(rowId - baseLength), info.lengths(rowId - 
baseLength))
-              row.pointTo(null, info.memoryAddress + offset, length)
-              rowId += 1
-              row
+          override def next: UnsafeRow = {
+            if (rowId == baseLength + info.lengths.length) {
+              baseLength += info.lengths.length
+              val before = System.currentTimeMillis()
+              info = jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 
rowId)
+              convertTime += (System.currentTimeMillis() - before)
             }
+            val (offset, length) =
+              (info.offsets(rowId - baseLength), info.lengths(rowId - 
baseLength))
+            row.pointTo(null, info.memoryAddress + offset, length)
+            rowId += 1
+            row
           }
         }
       }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
index 3f34e7fc26..f7d74e378f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
@@ -69,8 +69,8 @@ object EmptySchemaWorkaround {
       case p =>
         if (fallbackOnEmptySchema(p)) {
           if (p.children.exists(_.output.isEmpty)) {
-            // Some backends are not eligible to offload plan with zero-column 
input.
-            // If any child have empty output, mark the plan and that child as 
UNSUPPORTED.
+            // Some backends are not capable to offload plan with zero-column 
input.
+            // If any child has empty output, mark the plan and that child as 
UNSUPPORTED.
             FallbackTags.add(p, "at least one of its children has empty 
output")
             p.children.foreach {
               child =>
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index 91dde1c4c6..08b458ad18 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.utils.SparkArrowUtil
 
-import com.google.common.base.Preconditions
 import org.apache.arrow.c.ArrowSchema
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.TaskAttemptContext
@@ -74,7 +73,7 @@ trait VeloxFormatWriterInjects extends 
GlutenFormatWriterInjectsBase {
     new OutputWriter {
       override def write(row: InternalRow): Unit = {
         val batch = row.asInstanceOf[FakeRow].batch
-        Preconditions.checkState(ColumnarBatches.isLightBatch(batch))
+        ColumnarBatches.checkOffloaded(batch)
         ColumnarBatches.retain(batch)
         val batchHandle = {
           if (batch.numCols == 0) {
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index b71457555e..ed79db951d 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -922,6 +922,18 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
     }
   }
 
+  test("count(1) on csv scan") {
+    val df = runAndCompare("select count(1) from student") {
+      val filePath = rootPath + "/datasource/csv/student.csv"
+      val df = spark.read
+        .format("csv")
+        .option("header", "true")
+        .load(filePath)
+      df.createOrReplaceTempView("student")
+    }
+    checkLengthAndPlan(df, 1)
+  }
+
   test("combine small batches before shuffle") {
     val minBatchSize = 15
     withSQLConf(
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index bb0fb8cc0e..3511c3731f 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -66,8 +66,6 @@ class VeloxRuntime final : public Runtime {
     return iter->getMetrics(exportNanos);
   }
 
-  std::shared_ptr<VeloxDataSource> createDataSource(const std::string& 
filePath, std::shared_ptr<arrow::Schema> schema);
-
   std::shared_ptr<ShuffleReader> createShuffleReader(
       std::shared_ptr<arrow::Schema> schema,
       ShuffleReaderOptions options) override;
@@ -78,6 +76,8 @@ class VeloxRuntime final : public Runtime {
 
   void dumpConf(const std::string& path) override;
 
+  std::shared_ptr<VeloxDataSource> createDataSource(const std::string& 
filePath, std::shared_ptr<arrow::Schema> schema);
+
   std::shared_ptr<const facebook::velox::core::PlanNode> getVeloxPlan() {
     return veloxPlan_;
   }
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index 6b5376d9b2..1529e8f2b6 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -85,7 +85,8 @@ public final class ColumnarBatches {
   }
 
   /** Heavy batch: Data is readable from JVM and formatted as Arrow data. */
-  public static boolean isHeavyBatch(ColumnarBatch batch) {
+  @VisibleForTesting
+  static boolean isHeavyBatch(ColumnarBatch batch) {
     return identifyBatchType(batch) == BatchType.HEAVY;
   }
 
@@ -93,7 +94,8 @@ public final class ColumnarBatches {
    * Light batch: Data is not readable from JVM, a long int handle (which is a 
pointer usually) is
    * used to bind the batch to a native side implementation.
    */
-  public static boolean isLightBatch(ColumnarBatch batch) {
+  @VisibleForTesting
+  static boolean isLightBatch(ColumnarBatch batch) {
     return identifyBatchType(batch) == BatchType.LIGHT;
   }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
index ce6d50350c..a98610ec87 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
@@ -122,5 +122,3 @@ case class GlutenColumnarRule(
   }
 
 }
-
-object ColumnarOverrides {}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to