This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9cab0bedd8 [GLUTEN-7600][VL] Prepare test case for the removal of
workaround code for empty schema batches (#7601)
9cab0bedd8 is described below
commit 9cab0bedd8f54b1ffbfa3d25795f8852267ab9f9
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Oct 21 13:16:42 2024 +0800
[GLUTEN-7600][VL] Prepare test case for the removal of workaround code for
empty schema batches (#7601)
---
.github/workflows/velox_backend_cache.yml | 4 +-
.../execution/ColumnarPartialProjectExec.scala | 6 +-
.../gluten/execution/VeloxColumnarToRowExec.scala | 82 ++++++++++------------
.../gluten/extension/EmptySchemaWorkaround.scala | 4 +-
.../velox/VeloxFormatWriterInjects.scala | 3 +-
.../gluten/execution/MiscOperatorSuite.scala | 12 ++++
cpp/velox/compute/VeloxRuntime.h | 4 +-
.../gluten/columnarbatch/ColumnarBatches.java | 6 +-
.../gluten/extension/GlutenColumnarRule.scala | 2 -
9 files changed, 66 insertions(+), 57 deletions(-)
diff --git a/.github/workflows/velox_backend_cache.yml
b/.github/workflows/velox_backend_cache.yml
index 41cfcd5477..71a7a5840b 100644
--- a/.github/workflows/velox_backend_cache.yml
+++ b/.github/workflows/velox_backend_cache.yml
@@ -46,7 +46,7 @@ jobs:
df -a
yum install ccache -y
bash dev/ci-velox-buildstatic-centos-7.sh
- - name: Save ccache
+ - name: Save Ccache
uses: actions/cache/save@v3
id: ccache
with:
@@ -76,7 +76,7 @@ jobs:
df -a
bash dev/ci-velox-buildshared-centos-8.sh
ccache -s
- - name: Save ccache
+ - name: Save Ccache
uses: actions/cache/save@v3
id: ccache
with:
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index fc0fc041a4..1c394103db 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -270,9 +270,11 @@ case class ColumnarPartialProjectExec(original:
ProjectExec, child: SparkPlan)(
val proj = MutableProjection.create(replacedAliasUdf,
projectAttributes.toSeq)
val numRows = childData.numRows()
val start = System.currentTimeMillis()
- val arrowBatch = if (childData.numCols() == 0 ||
ColumnarBatches.isHeavyBatch(childData)) {
+ val arrowBatch = if (childData.numCols() == 0) {
childData
- } else ColumnarBatches.load(ArrowBufferAllocators.contextInstance(),
childData)
+ } else {
+ ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), childData)
+ }
c2a += System.currentTimeMillis() - start
val schema =
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
index 639113efce..0df66da833 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.iterator.Iterators
@@ -26,7 +26,7 @@ import
org.apache.gluten.vectorized.NativeColumnarToRowJniWrapper
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection,
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
import org.apache.spark.sql.execution.{BroadcastUtils, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
@@ -140,53 +140,49 @@ object VeloxColumnarToRowExec {
if (batch.numRows == 0) {
batch.close()
- Iterator.empty
- } else if (
- batch.numCols() > 0 &&
- !ColumnarBatches.isLightBatch(batch)
- ) {
- // Fallback to ColumnarToRow of vanilla Spark.
- val localOutput = output
- val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
- batch.rowIterator().asScala.map(toUnsafe)
- } else if (output.isEmpty) {
+ return Iterator.empty
+ }
+
+ if (output.isEmpty) {
numInputBatches += 1
numOutputRows += batch.numRows()
val rows = ColumnarBatches.emptyRowIterator(batch.numRows()).asScala
batch.close()
- rows
- } else {
- val cols = batch.numCols()
- val rows = batch.numRows()
- val beforeConvert = System.currentTimeMillis()
- val batchHandle = ColumnarBatches.getNativeHandle(batch)
- var info =
- jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0)
-
- convertTime += (System.currentTimeMillis() - beforeConvert)
-
- new Iterator[InternalRow] {
- var rowId = 0
- var baseLength = 0
- val row = new UnsafeRow(cols)
-
- override def hasNext: Boolean = {
- rowId < rows
- }
+ return rows
+ }
+
+ VeloxColumnarBatches.checkVeloxBatch(batch)
+
+ val cols = batch.numCols()
+ val rows = batch.numRows()
+ val beforeConvert = System.currentTimeMillis()
+ val batchHandle = ColumnarBatches.getNativeHandle(batch)
+ var info =
+ jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0)
+
+ convertTime += (System.currentTimeMillis() - beforeConvert)
+
+ new Iterator[InternalRow] {
+ var rowId = 0
+ var baseLength = 0
+ val row = new UnsafeRow(cols)
+
+ override def hasNext: Boolean = {
+ rowId < rows
+ }
- override def next: UnsafeRow = {
- if (rowId == baseLength + info.lengths.length) {
- baseLength += info.lengths.length
- val before = System.currentTimeMillis()
- info = jniWrapper.nativeColumnarToRowConvert(c2rId,
batchHandle, rowId)
- convertTime += (System.currentTimeMillis() - before)
- }
- val (offset, length) =
- (info.offsets(rowId - baseLength), info.lengths(rowId -
baseLength))
- row.pointTo(null, info.memoryAddress + offset, length)
- rowId += 1
- row
+ override def next: UnsafeRow = {
+ if (rowId == baseLength + info.lengths.length) {
+ baseLength += info.lengths.length
+ val before = System.currentTimeMillis()
+ info = jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle,
rowId)
+ convertTime += (System.currentTimeMillis() - before)
}
+ val (offset, length) =
+ (info.offsets(rowId - baseLength), info.lengths(rowId -
baseLength))
+ row.pointTo(null, info.memoryAddress + offset, length)
+ rowId += 1
+ row
}
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
index 3f34e7fc26..f7d74e378f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala
@@ -69,8 +69,8 @@ object EmptySchemaWorkaround {
case p =>
if (fallbackOnEmptySchema(p)) {
if (p.children.exists(_.output.isEmpty)) {
- // Some backends are not eligible to offload plan with zero-column
input.
- // If any child have empty output, mark the plan and that child as
UNSUPPORTED.
+ // Some backends are not capable to offload plan with zero-column
input.
+ // If any child has empty output, mark the plan and that child as
UNSUPPORTED.
FallbackTags.add(p, "at least one of its children has empty
output")
p.children.foreach {
child =>
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index 91dde1c4c6..08b458ad18 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkArrowUtil
-import com.google.common.base.Preconditions
import org.apache.arrow.c.ArrowSchema
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.TaskAttemptContext
@@ -74,7 +73,7 @@ trait VeloxFormatWriterInjects extends
GlutenFormatWriterInjectsBase {
new OutputWriter {
override def write(row: InternalRow): Unit = {
val batch = row.asInstanceOf[FakeRow].batch
- Preconditions.checkState(ColumnarBatches.isLightBatch(batch))
+ ColumnarBatches.checkOffloaded(batch)
ColumnarBatches.retain(batch)
val batchHandle = {
if (batch.numCols == 0) {
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index b71457555e..ed79db951d 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -922,6 +922,18 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
+ test("count(1) on csv scan") {
+ val df = runAndCompare("select count(1) from student") {
+ val filePath = rootPath + "/datasource/csv/student.csv"
+ val df = spark.read
+ .format("csv")
+ .option("header", "true")
+ .load(filePath)
+ df.createOrReplaceTempView("student")
+ }
+ checkLengthAndPlan(df, 1)
+ }
+
test("combine small batches before shuffle") {
val minBatchSize = 15
withSQLConf(
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index bb0fb8cc0e..3511c3731f 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -66,8 +66,6 @@ class VeloxRuntime final : public Runtime {
return iter->getMetrics(exportNanos);
}
- std::shared_ptr<VeloxDataSource> createDataSource(const std::string&
filePath, std::shared_ptr<arrow::Schema> schema);
-
std::shared_ptr<ShuffleReader> createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ShuffleReaderOptions options) override;
@@ -78,6 +76,8 @@ class VeloxRuntime final : public Runtime {
void dumpConf(const std::string& path) override;
+ std::shared_ptr<VeloxDataSource> createDataSource(const std::string&
filePath, std::shared_ptr<arrow::Schema> schema);
+
std::shared_ptr<const facebook::velox::core::PlanNode> getVeloxPlan() {
return veloxPlan_;
}
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index 6b5376d9b2..1529e8f2b6 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -85,7 +85,8 @@ public final class ColumnarBatches {
}
/** Heavy batch: Data is readable from JVM and formatted as Arrow data. */
- public static boolean isHeavyBatch(ColumnarBatch batch) {
+ @VisibleForTesting
+ static boolean isHeavyBatch(ColumnarBatch batch) {
return identifyBatchType(batch) == BatchType.HEAVY;
}
@@ -93,7 +94,8 @@ public final class ColumnarBatches {
* Light batch: Data is not readable from JVM, a long int handle (which is a
pointer usually) is
* used to bind the batch to a native side implementation.
*/
- public static boolean isLightBatch(ColumnarBatch batch) {
+ @VisibleForTesting
+ static boolean isLightBatch(ColumnarBatch batch) {
return identifyBatchType(batch) == BatchType.LIGHT;
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
index ce6d50350c..a98610ec87 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
@@ -122,5 +122,3 @@ case class GlutenColumnarRule(
}
}
-
-object ColumnarOverrides {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]