This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 84da616220 [GLUTEN-7313][VL] Explicit Arrow transitions, part 1: Add
LoadArrowDataExec / OffloadArrowDataExec (#7343)
84da616220 is described below
commit 84da61622068242c47f0a72365aa61542e7f6872
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Sep 26 09:06:30 2024 +0800
[GLUTEN-7313][VL] Explicit Arrow transitions, part 1: Add LoadArrowDataExec
/ OffloadArrowDataExec (#7343)
---
.../apache/gluten/columnarbatch/VeloxBatch.scala | 30 ++++-
.../gluten/datasource/ArrowCSVFileFormat.scala | 2 +-
.../gluten/execution/VeloxResizeBatchesExec.scala | 2 +
.../datasource/v2/ArrowBatchScanExec.scala | 4 +-
.../api/python/ColumnarArrowEvalPythonExec.scala | 40 ++++--
.../sql/execution/ArrowFileSourceScanExec.scala | 4 +-
.../execution/VeloxColumnarWriteFilesExec.scala | 2 +-
.../velox/VeloxFormatWriterInjects.scala | 5 +-
.../spark/sql/execution/utils/ExecUtil.scala | 2 +-
.../gluten/columnarbatch/ColumnarBatchTest.java | 10 +-
.../columnar/transition/VeloxTransitionSuite.scala | 143 ++++++++++++++-------
cpp/core/tests/CMakeLists.txt | 2 +-
.../apache/gluten/columnarbatch/ArrowBatch.scala | 46 -------
.../apache/gluten/columnarbatch/ArrowBatches.scala | 82 ++++++++++++
.../gluten/columnarbatch/ColumnarBatches.java | 17 ++-
.../gluten/execution/LoadArrowDataExec.scala | 31 ++---
.../gluten/execution/OffloadArrowDataExec.scala | 31 ++---
.../gluten/vectorized/ColumnarBatchInIterator.java | 6 +-
.../org/apache/gluten/utils/ArrowAbiUtil.scala | 8 +-
.../apache/spark/sql/utils/SparkVectorUtil.scala | 11 +-
.../gluten/execution/ColumnarToColumnarExec.scala | 97 ++++++++++++++
.../extension/columnar/transition/Convention.scala | 2 +-
.../columnar/transition/Transitions.scala | 2 +-
.../extension/columnar/transition/package.scala | 13 ++
24 files changed, 411 insertions(+), 181 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
b/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
index aa6676dc9b..33bd11b39d 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.columnarbatch
-import org.apache.gluten.execution.{RowToVeloxColumnarExec,
VeloxColumnarToRowExec}
+import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec,
RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.extension.columnar.transition.{Convention,
TransitionDef}
import org.apache.spark.sql.execution.SparkPlan
@@ -34,7 +34,29 @@ object VeloxBatch extends Convention.BatchType {
VeloxColumnarToRowExec(plan)
})
- // Velox batch is considered one-way compatible with Arrow batch.
- // This is practically achieved by utilizing C++ API
VeloxColumnarBatch::from at runtime.
- fromBatch(ArrowBatch, TransitionDef.empty)
+ // TODO: Add explicit transitions between Arrow native batch and Velox batch.
+ // See https://github.com/apache/incubator-gluten/issues/7313.
+
+ fromBatch(
+ ArrowBatches.ArrowJavaBatch,
+ () =>
+ (plan: SparkPlan) => {
+ OffloadArrowDataExec(plan)
+ })
+
+ toBatch(
+ ArrowBatches.ArrowJavaBatch,
+ () =>
+ (plan: SparkPlan) => {
+ LoadArrowDataExec(plan)
+ })
+
+ fromBatch(
+ ArrowBatches.ArrowNativeBatch,
+ () =>
+ (plan: SparkPlan) => {
+ LoadArrowDataExec(plan)
+ })
+
+ toBatch(ArrowBatches.ArrowNativeBatch, TransitionDef.empty)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
index f42b921ab6..52f7756381 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
@@ -315,7 +315,7 @@ object ArrowCSVFileFormat {
batchSize
)
veloxBatch
- .map(v =>
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), v))
+ .map(v => ColumnarBatches.load(ArrowBufferAllocators.contextInstance(),
v))
}
private def toAttribute(field: StructField): AttributeReference =
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index ec62a33bdd..995582024b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -35,6 +35,8 @@ import scala.collection.JavaConverters._
/**
* An operator to resize input batches by appending the later batches to the
one that comes earlier,
* or splitting one batch to smaller ones.
+ *
+ * FIXME: Code duplication with ColumnarToColumnarExec.
*/
case class VeloxResizeBatchesExec(
override val child: SparkPlan,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
index ee0acbf3f4..7255bca58f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.datasource.v2
-import org.apache.gluten.columnarbatch.ArrowBatch
+import org.apache.gluten.columnarbatch.ArrowBatches
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention
@@ -34,7 +34,7 @@ case class ArrowBatchScanExec(original: BatchScanExec)
@transient lazy val batch: Batch = original.batch
override protected def batchType0(): Convention.BatchType = {
- ArrowBatch
+ ArrowBatches.ArrowJavaBatch
}
override lazy val readerFactory: PartitionReaderFactory =
original.readerFactory
diff --git
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index 19f286056e..5c27f94ca8 100644
---
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -16,9 +16,12 @@
*/
package org.apache.spark.api.python
-import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxBatch}
+import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
+import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.utils.PullOutProjectHelper
@@ -209,9 +212,20 @@ case class ColumnarArrowEvalPythonExec(
child: SparkPlan,
evalType: Int)
extends EvalPythonExec
- with GlutenPlan {
+ with GlutenPlan
+ with KnownChildrenConventions {
override def supportsColumnar: Boolean = true
+ override protected def batchType0(): Convention.BatchType = ArrowJavaBatch
+
+ // FIXME: Make this accepts ArrowJavaBatch as input. Before doing that, a
weight-based
+ // shortest patch algorithm should be added into transition factory. So
that the factory
+ // can find out row->velox->arrow-native->arrow-java as the possible viable
transition.
+ // Otherwise with current solution, any input (even already in Arrow Java
format) will be
+ // converted into Velox format then into Arrow Java format before entering
python runner.
+ override def requiredChildrenConventions(): Seq[ConventionReq] = List(
+ ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(VeloxBatch)))
+
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext,
"output_batches"),
@@ -334,17 +348,17 @@ case class ColumnarArrowEvalPythonExec(
val inputBatchIter = contextAwareIterator.map {
inputCb =>
start_time = System.nanoTime()
-
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, inputCb)
- ColumnarBatches.retain(inputCb)
+ val loaded =
ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), inputCb)
+ ColumnarBatches.retain(loaded)
// 0. cache input for later merge
- inputCbCache += inputCb
- numInputRows += inputCb.numRows
+ inputCbCache += loaded
+ numInputRows += loaded.numRows
// We only need to pass the referred cols data to python worker
for evaluation.
var colsForEval = new ArrayBuffer[ColumnVector]()
for (i <- originalOffsets) {
- colsForEval += inputCb.column(i)
+ colsForEval += loaded.column(i)
}
- new ColumnarBatch(colsForEval.toArray, inputCb.numRows())
+ new ColumnarBatch(colsForEval.toArray, loaded.numRows())
}
val outputColumnarBatchIterator =
@@ -366,11 +380,9 @@ case class ColumnarArrowEvalPythonExec(
numOutputBatches += 1
numOutputRows += numRows
val batch = new ColumnarBatch(joinedVectors, numRows)
- val offloaded =
-
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
- ColumnarBatches.release(outputCb)
+ ColumnarBatches.checkLoaded(batch)
procTime += (System.nanoTime() - start_time) / 1000000
- offloaded
+ batch
}
Iterators
.wrap(res)
@@ -390,13 +402,13 @@ case class ColumnarArrowEvalPythonExec(
if (from > to) {
do {
vector.close()
- } while (vector.refCnt() == to)
+ } while (vector.refCnt() != to)
return
}
// from < to
do {
vector.retain()
- } while (vector.refCnt() == to)
+ } while (vector.refCnt() != to)
}
override protected def withNewChildInternal(newChild: SparkPlan):
ColumnarArrowEvalPythonExec =
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
index e3298d7042..ee19425aeb 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution
-import org.apache.gluten.columnarbatch.ArrowBatch
+import org.apache.gluten.columnarbatch.ArrowBatches
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention
@@ -42,7 +42,7 @@ case class ArrowFileSourceScanExec(original:
FileSourceScanExec)
override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()
override protected def batchType0(): Convention.BatchType = {
- ArrowBatch
+ ArrowBatches.ArrowJavaBatch
}
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index c339014c5e..249f5169d8 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -95,7 +95,7 @@ class VeloxColumnarWriteFilesRDD(
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file
writes.
- val loadedCb =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
+ val loadedCb = ColumnarBatches.load(ArrowBufferAllocators.contextInstance,
cb)
assert(loadedCb.numCols() == 3)
val numWrittenRows = loadedCb.column(0).getLong(0)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index e25d3c663d..65eca8a18c 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -82,9 +82,8 @@ trait VeloxFormatWriterInjects extends
GlutenFormatWriterInjectsBase {
// the operation will find a zero column batch from a task-local
pool
ColumnarBatchJniWrapper.create(runtime).getForEmptySchema(batch.numRows)
} else {
- val offloaded =
-
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
- ColumnarBatches.getNativeHandle(offloaded)
+ ColumnarBatches.checkOffloaded(batch)
+ ColumnarBatches.getNativeHandle(batch)
}
}
datasourceJniWrapper.writeBatch(dsHandle, batchHandle)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
index 4452f42c96..2e8a6a479a 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
@@ -145,7 +145,7 @@ object ExecUtil {
val pid =
rangePartitioner.get.getPartition(partitionKeyExtractor(row))
pidVec.putInt(i, pid)
}
- val pidBatch = ColumnarBatches.ensureOffloaded(
+ val pidBatch = ColumnarBatches.offload(
ArrowBufferAllocators.contextInstance(),
new ColumnarBatch(Array[ColumnVector](pidVec), cb.numRows))
val newHandle = ColumnarBatches.compose(pidBatch, cb)
diff --git
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
index 819d35a100..a207a4b326 100644
---
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
+++
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
@@ -44,10 +44,10 @@ public class ColumnarBatchTest extends VeloxBackendTestBase
{
final ColumnarBatch batch = newArrowBatch("a boolean, b int",
numRows);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
-
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
+ ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(),
batch);
Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
final ColumnarBatch loaded =
-
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(),
offloaded);
+ ColumnarBatches.load(ArrowBufferAllocators.contextInstance(),
offloaded);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
long cnt =
StreamSupport.stream(
@@ -69,7 +69,7 @@ public class ColumnarBatchTest extends VeloxBackendTestBase {
final ColumnarBatch batch = newArrowBatch("a boolean, b int",
numRows);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(batch));
final ColumnarBatch offloaded =
-
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
+ ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(),
batch);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
final long handle = ColumnarBatches.getNativeHandle(offloaded);
final ColumnarBatch created = ColumnarBatches.create(handle);
@@ -110,10 +110,10 @@ public class ColumnarBatchTest extends
VeloxBackendTestBase {
col1.putNull(numRows - 1);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
-
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
+ ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(),
batch);
Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
final ColumnarBatch loaded =
-
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(),
offloaded);
+ ColumnarBatches.load(ArrowBufferAllocators.contextInstance(),
offloaded);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
long cnt =
StreamSupport.stream(
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
index 8decedc411..df8ee8cc53 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
@@ -17,9 +17,10 @@
package org.apache.gluten.extension.columnar.transition
import org.apache.gluten.backendsapi.velox.VeloxListenerApi
-import org.apache.gluten.columnarbatch.{ArrowBatch, VeloxBatch}
+import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch,
ArrowNativeBatch}
+import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.execution.{RowToVeloxColumnarExec,
VeloxColumnarToRowExec}
+import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec,
RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
import org.apache.gluten.test.MockVeloxBackend
@@ -49,20 +50,20 @@ class VeloxTransitionSuite extends SharedSparkSession {
assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch,
RowToColumnarExec(RowLeaf()))))
}
- test("Arrow C2R - outputs row") {
- val in = BatchLeaf(ArrowBatch)
+ test("ArrowNative C2R - outputs row") {
+ val in = BatchLeaf(ArrowNativeBatch)
val out = Transitions.insertTransitions(in, outputsColumnar = false)
- assert(out == ColumnarToRowExec(BatchLeaf(ArrowBatch)))
+ assert(out ==
ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch))))
}
- test("Arrow C2R - requires row input") {
- val in = RowUnary(BatchLeaf(ArrowBatch))
+ test("ArrowNative C2R - requires row input") {
+ val in = RowUnary(BatchLeaf(ArrowNativeBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
- assert(out == RowUnary(ColumnarToRowExec(BatchLeaf(ArrowBatch))))
+ assert(out ==
RowUnary(ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
}
- test("Arrow R2C - requires Arrow input") {
- val in = BatchUnary(ArrowBatch, RowLeaf())
+ test("ArrowNative R2C - requires Arrow input") {
+ val in = BatchUnary(ArrowNativeBatch, RowLeaf())
assertThrows[GlutenException] {
// No viable transitions.
// FIXME: Support this case.
@@ -70,41 +71,56 @@ class VeloxTransitionSuite extends SharedSparkSession {
}
}
- test("Velox C2R - outputs row") {
- val in = BatchLeaf(VeloxBatch)
+ test("ArrowNative-to-Velox C2C") {
+ val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowNativeBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
- assert(out == VeloxColumnarToRowExec(BatchLeaf(VeloxBatch)))
+ // No explicit transition needed for ArrowNative-to-Velox.
+ // FIXME: Add explicit transitions.
+ // See https://github.com/apache/incubator-gluten/issues/7313.
+ assert(
+ out == VeloxColumnarToRowExec(
+ BatchUnary(VeloxBatch,
LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
}
- test("Velox C2R - requires row input") {
- val in = RowUnary(BatchLeaf(VeloxBatch))
+ test("Velox-to-ArrowNative C2C") {
+ val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
- assert(out == RowUnary(VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))
+ assert(
+ out == ColumnarToRowExec(
+ LoadArrowDataExec(BatchUnary(ArrowNativeBatch,
BatchLeaf(VeloxBatch)))))
}
- test("Velox R2C - outputs Velox") {
- val in = RowLeaf()
- val out = Transitions.insertTransitions(in, outputsColumnar = true)
- assert(out == RowToVeloxColumnarExec(RowLeaf()))
+ test("Vanilla-to-ArrowNative C2C") {
+ val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VanillaBatch))
+ assertThrows[GlutenException] {
+ // No viable transitions.
+ // FIXME: Support this case.
+ Transitions.insertTransitions(in, outputsColumnar = false)
+ }
}
- test("Velox R2C - requires Velox input") {
- val in = BatchUnary(VeloxBatch, RowLeaf())
+ test("ArrowNative-to-Vanilla C2C") {
+ val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowNativeBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
- assert(out == VeloxColumnarToRowExec(BatchUnary(VeloxBatch,
RowToVeloxColumnarExec(RowLeaf()))))
+ assert(
+ out == ColumnarToRowExec(
+ BatchUnary(VanillaBatch,
LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
}
- test("Arrow-to-Velox C2C") {
- val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowBatch))
+ test("ArrowJava C2R - outputs row") {
+ val in = BatchLeaf(ArrowJavaBatch)
val out = Transitions.insertTransitions(in, outputsColumnar = false)
- // No explicit transition needed for Arrow-to-Velox.
- // FIXME: Add explicit transitions.
- // See https://github.com/apache/incubator-gluten/issues/7313.
- assert(out == VeloxColumnarToRowExec(BatchUnary(VeloxBatch,
BatchLeaf(ArrowBatch))))
+ assert(out == ColumnarToRowExec(BatchLeaf(ArrowJavaBatch)))
+ }
+
+ test("ArrowJava C2R - requires row input") {
+ val in = RowUnary(BatchLeaf(ArrowJavaBatch))
+ val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ assert(out == RowUnary(ColumnarToRowExec(BatchLeaf(ArrowJavaBatch))))
}
- test("Velox-to-Arrow C2C") {
- val in = BatchUnary(ArrowBatch, BatchLeaf(VeloxBatch))
+ test("ArrowJava R2C - requires Arrow input") {
+ val in = BatchUnary(ArrowJavaBatch, RowLeaf())
assertThrows[GlutenException] {
// No viable transitions.
// FIXME: Support this case.
@@ -112,24 +128,24 @@ class VeloxTransitionSuite extends SharedSparkSession {
}
}
- test("Vanilla-to-Velox C2C") {
- val in = BatchUnary(VeloxBatch, BatchLeaf(VanillaBatch))
+ test("ArrowJava-to-Velox C2C") {
+ val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowJavaBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
- BatchUnary(VeloxBatch,
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch))))))
+ BatchUnary(VeloxBatch,
OffloadArrowDataExec(BatchLeaf(ArrowJavaBatch)))))
}
- test("Velox-to-Vanilla C2C") {
- val in = BatchUnary(VanillaBatch, BatchLeaf(VeloxBatch))
+ test("Velox-to-ArrowJava C2C") {
+ val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VeloxBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
- BatchUnary(VanillaBatch,
RowToColumnarExec(VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))))
+ BatchUnary(ArrowJavaBatch, LoadArrowDataExec(BatchLeaf(VeloxBatch)))))
}
- test("Vanilla-to-Arrow C2C") {
- val in = BatchUnary(ArrowBatch, BatchLeaf(VanillaBatch))
+ test("Vanilla-to-ArrowJava C2C") {
+ val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VanillaBatch))
assertThrows[GlutenException] {
// No viable transitions.
// FIXME: Support this case.
@@ -137,13 +153,50 @@ class VeloxTransitionSuite extends SharedSparkSession {
}
}
- test("Arrow-to-Vanilla C2C") {
- val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowBatch))
+ test("ArrowJava-to-Vanilla C2C") {
+ val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowJavaBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
- // No explicit transition needed for Arrow-to-Vanilla.
- // FIXME: Add explicit transitions.
- // See https://github.com/apache/incubator-gluten/issues/7313.
- assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch,
BatchLeaf(ArrowBatch))))
+ assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch,
BatchLeaf(ArrowJavaBatch))))
+ }
+
+ test("Velox C2R - outputs row") {
+ val in = BatchLeaf(VeloxBatch)
+ val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ assert(out == VeloxColumnarToRowExec(BatchLeaf(VeloxBatch)))
+ }
+
+ test("Velox C2R - requires row input") {
+ val in = RowUnary(BatchLeaf(VeloxBatch))
+ val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ assert(out == RowUnary(VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))
+ }
+
+ test("Velox R2C - outputs Velox") {
+ val in = RowLeaf()
+ val out = Transitions.insertTransitions(in, outputsColumnar = true)
+ assert(out == RowToVeloxColumnarExec(RowLeaf()))
+ }
+
+ test("Velox R2C - requires Velox input") {
+ val in = BatchUnary(VeloxBatch, RowLeaf())
+ val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ assert(out == VeloxColumnarToRowExec(BatchUnary(VeloxBatch,
RowToVeloxColumnarExec(RowLeaf()))))
+ }
+
+ test("Vanilla-to-Velox C2C") {
+ val in = BatchUnary(VeloxBatch, BatchLeaf(VanillaBatch))
+ val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ assert(
+ out == VeloxColumnarToRowExec(
+ BatchUnary(VeloxBatch,
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch))))))
+ }
+
+ test("Velox-to-Vanilla C2C") {
+ val in = BatchUnary(VanillaBatch, BatchLeaf(VeloxBatch))
+ val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ assert(
+ out == ColumnarToRowExec(
+ BatchUnary(VanillaBatch,
RowToColumnarExec(VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))))
}
override protected def beforeAll(): Unit = {
diff --git a/cpp/core/tests/CMakeLists.txt b/cpp/core/tests/CMakeLists.txt
index a0f3406f5c..407045d9f9 100644
--- a/cpp/core/tests/CMakeLists.txt
+++ b/cpp/core/tests/CMakeLists.txt
@@ -18,4 +18,4 @@ if(ENABLE_HBM)
endif()
add_test_case(round_robin_partitioner_test SOURCES
RoundRobinPartitionerTest.cc)
-add_test_case(objectstore__test SOURCES ObjectStoreTest.cc)
+add_test_case(object_store_test SOURCES ObjectStoreTest.cc)
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala
deleted file mode 100644
index 58a88e1f49..0000000000
---
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gluten.columnarbatch
-
-import org.apache.gluten.extension.columnar.transition.{Convention,
TransitionDef}
-import
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
-
-import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
-
-/**
- * ArrowBatch stands for Gluten's Arrow-based columnar batch implementation.
Vanilla Spark's
- * ColumnarBatch consisting of
[[org.apache.spark.sql.vectorized.ArrowColumnVector]]s is still
- * treated as [[Convention.BatchType.VanillaBatch]].
- *
- * As of now, ArrowBatch should have
[[org.apache.gluten.vectorized.ArrowWritableColumnVector]]s
- * populated in it. ArrowBatch can be loaded from / offloaded to native to C++
ArrowColumnarBatch
- * through API in [[ColumnarBatches]]. After being offloaded, ArrowBatch is no
longer considered a
- * legal ArrowBatch and cannot be accepted by trivial ColumnarToRowExec. To
follow that rule, Any
- * plan with this batch type should promise it emits loaded batch only.
- */
-object ArrowBatch extends Convention.BatchType {
- toRow(
- () =>
- (plan: SparkPlan) => {
- ColumnarToRowExec(plan)
- })
-
- // Arrow batch is one-way compatible with vanilla batch since it provides
valid
- // #get<type>(...) implementations.
- toBatch(VanillaBatch, TransitionDef.empty)
-}
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatches.scala
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatches.scala
new file mode 100644
index 0000000000..fac84efd35
--- /dev/null
+++
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatches.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.columnarbatch
+
+import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec}
+import org.apache.gluten.extension.columnar.transition.{Convention,
TransitionDef}
+import
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
+
+import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
+
+object ArrowBatches {
+
+ /**
+ * JavaArrowBatch stands for Gluten's Java Arrow-based columnar batch
implementation.
+ *
+ * JavaArrowBatch should have
[[org.apache.gluten.vectorized.ArrowWritableColumnVector]]s
+ * populated in it. JavaArrowBatch can be offloaded to NativeArrowBatch
through API in
+ * [[ColumnarBatches]].
+ *
+ * JavaArrowBatch is compatible with vanilla batch since it provides valid
#get<type>(...)
+ * implementations.
+ */
+ object ArrowJavaBatch extends Convention.BatchType {
+ toRow(
+ () =>
+ (plan: SparkPlan) => {
+ ColumnarToRowExec(plan)
+ })
+
+ toBatch(VanillaBatch, TransitionDef.empty)
+ }
+
+ /**
+ * NativeArrowBatch stands for Gluten's native Arrow-based columnar batch
implementation.
+ *
+ * NativeArrowBatch should have
[[org.apache.gluten.columnarbatch.IndicatorVector]] set as the
+ * first vector. NativeArrowBatch can be loaded to JavaArrowBatch through
API in
+ * [[ColumnarBatches]].
+ */
+ object ArrowNativeBatch extends Convention.BatchType {
+ toRow(
+ () =>
+ (plan: SparkPlan) => {
+ ColumnarToRowExec(LoadArrowDataExec(plan))
+ })
+
+ toBatch(
+ VanillaBatch,
+ () =>
+ (plan: SparkPlan) => {
+ LoadArrowDataExec(plan)
+ })
+
+ fromBatch(
+ ArrowJavaBatch,
+ () =>
+ (plan: SparkPlan) => {
+ OffloadArrowDataExec(plan)
+ })
+
+ toBatch(
+ ArrowJavaBatch,
+ () =>
+ (plan: SparkPlan) => {
+ LoadArrowDataExec(plan)
+ })
+ }
+}
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index e6d97db84b..3f48d8f293 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -25,6 +25,7 @@ import org.apache.gluten.utils.InternalRowUtl;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.CDataDictionaryProvider;
@@ -121,7 +122,7 @@ public class ColumnarBatches {
* Ensure the input batch is offloaded as native-based columnar batch (See
{@link IndicatorVector}
* and {@link PlaceholderVector}). This method will close the input column
batch after offloaded.
*/
- public static ColumnarBatch ensureOffloaded(BufferAllocator allocator,
ColumnarBatch batch) {
+ private static ColumnarBatch ensureOffloaded(BufferAllocator allocator,
ColumnarBatch batch) {
if (ColumnarBatches.isLightBatch(batch)) {
return batch;
}
@@ -133,14 +134,22 @@ public class ColumnarBatches {
* take place if loading is required, which means when the input batch is
not loaded yet. This
* method will close the input column batch after loaded.
*/
- public static ColumnarBatch ensureLoaded(BufferAllocator allocator,
ColumnarBatch batch) {
+ private static ColumnarBatch ensureLoaded(BufferAllocator allocator,
ColumnarBatch batch) {
if (isHeavyBatch(batch)) {
return batch;
}
return load(allocator, batch);
}
- private static ColumnarBatch load(BufferAllocator allocator, ColumnarBatch
input) {
+ public static void checkLoaded(ColumnarBatch batch) {
+ Preconditions.checkArgument(isHeavyBatch(batch), "Input batch is not
loaded");
+ }
+
+ public static void checkOffloaded(ColumnarBatch batch) {
+ Preconditions.checkArgument(isLightBatch(batch), "Input batch is not
offloaded");
+ }
+
+ public static ColumnarBatch load(BufferAllocator allocator, ColumnarBatch
input) {
if (!ColumnarBatches.isLightBatch(input)) {
throw new IllegalArgumentException(
"Input is not light columnar batch. "
@@ -186,7 +195,7 @@ public class ColumnarBatches {
}
}
- private static ColumnarBatch offload(BufferAllocator allocator,
ColumnarBatch input) {
+ public static ColumnarBatch offload(BufferAllocator allocator, ColumnarBatch
input) {
if (!isHeavyBatch(input)) {
throw new IllegalArgumentException("batch is not Arrow columnar batch");
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
b/gluten-arrow/src/main/java/org/apache/gluten/execution/LoadArrowDataExec.scala
similarity index 51%
copy from
backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
copy to
gluten-arrow/src/main/java/org/apache/gluten/execution/LoadArrowDataExec.scala
index aa6676dc9b..36f3c48d4e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
+++
b/gluten-arrow/src/main/java/org/apache/gluten/execution/LoadArrowDataExec.scala
@@ -14,27 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.columnarbatch
+package org.apache.gluten.execution
-import org.apache.gluten.execution.{RowToVeloxColumnarExec,
VeloxColumnarToRowExec}
-import org.apache.gluten.extension.columnar.transition.{Convention,
TransitionDef}
+import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch,
ArrowNativeBatch}
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.ColumnarBatch
-object VeloxBatch extends Convention.BatchType {
- fromRow(
- () =>
- (plan: SparkPlan) => {
- RowToVeloxColumnarExec(plan)
- })
+/** Converts input data with batch type [[ArrowNativeBatch]] to type
[[ArrowJavaBatch]]. */
+case class LoadArrowDataExec(override val child: SparkPlan)
+ extends ColumnarToColumnarExec(ArrowNativeBatch, ArrowJavaBatch) {
+ override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
+ in.map(b => ColumnarBatches.load(ArrowBufferAllocators.contextInstance, b))
+ }
- toRow(
- () =>
- (plan: SparkPlan) => {
- VeloxColumnarToRowExec(plan)
- })
-
- // Velox batch is considered one-way compatible with Arrow batch.
- // This is practically achieved by utilizing C++ API
VeloxColumnarBatch::from at runtime.
- fromBatch(ArrowBatch, TransitionDef.empty)
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ copy(child = newChild)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
b/gluten-arrow/src/main/java/org/apache/gluten/execution/OffloadArrowDataExec.scala
similarity index 51%
copy from
backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
copy to
gluten-arrow/src/main/java/org/apache/gluten/execution/OffloadArrowDataExec.scala
index aa6676dc9b..7b2184a240 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
+++
b/gluten-arrow/src/main/java/org/apache/gluten/execution/OffloadArrowDataExec.scala
@@ -14,27 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.columnarbatch
+package org.apache.gluten.execution
-import org.apache.gluten.execution.{RowToVeloxColumnarExec,
VeloxColumnarToRowExec}
-import org.apache.gluten.extension.columnar.transition.{Convention,
TransitionDef}
+import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch,
ArrowNativeBatch}
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.ColumnarBatch
-object VeloxBatch extends Convention.BatchType {
- fromRow(
- () =>
- (plan: SparkPlan) => {
- RowToVeloxColumnarExec(plan)
- })
+/** Converts input data with batch type [[ArrowJavaBatch]] to type
[[ArrowNativeBatch]]. */
+case class OffloadArrowDataExec(override val child: SparkPlan)
+ extends ColumnarToColumnarExec(ArrowJavaBatch, ArrowNativeBatch) {
+ override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
+ in.map(b => ColumnarBatches.offload(ArrowBufferAllocators.contextInstance,
b))
+ }
- toRow(
- () =>
- (plan: SparkPlan) => {
- VeloxColumnarToRowExec(plan)
- })
-
- // Velox batch is considered one-way compatible with Arrow batch.
- // This is practically achieved by utilizing C++ API
VeloxColumnarBatch::from at runtime.
- fromBatch(ArrowBatch, TransitionDef.empty)
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ copy(child = newChild)
}
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
index a7003a6bd3..c69caf5f59 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
@@ -18,7 +18,6 @@ package org.apache.gluten.vectorized;
import org.apache.gluten.columnarbatch.ColumnarBatchJniWrapper;
import org.apache.gluten.columnarbatch.ColumnarBatches;
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.runtime.Runtimes;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -45,8 +44,7 @@ public class ColumnarBatchInIterator {
return
ColumnarBatchJniWrapper.create(Runtimes.contextInstance("ColumnarBatchInIterator"))
.getForEmptySchema(next.numRows());
}
- final ColumnarBatch offloaded =
-
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), next);
- return ColumnarBatches.getNativeHandle(offloaded);
+ ColumnarBatches.checkOffloaded(next);
+ return ColumnarBatches.getNativeHandle(next);
}
}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala
index 8c6161e0c4..af5fcb0f60 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala
@@ -139,12 +139,12 @@ object ArrowAbiUtil {
def exportFromSparkColumnarBatch(
allocator: BufferAllocator,
- columnarBatch: ColumnarBatch,
+ batch: ColumnarBatch,
cSchema: ArrowSchema,
cArray: ArrowArray): Unit = {
- val loaded = ColumnarBatches.ensureLoaded(allocator, columnarBatch)
- val schema = ArrowUtil.toSchema(loaded)
- val rb = SparkVectorUtil.toArrowRecordBatch(loaded)
+ ColumnarBatches.checkLoaded(batch)
+ val schema = ArrowUtil.toSchema(batch)
+ val rb = SparkVectorUtil.toArrowRecordBatch(batch)
try {
exportFromArrowRecordBatch(allocator, rb, schema, cSchema, cArray)
} finally {
diff --git
a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala
b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala
index 3e86be79ac..6a0f29c4ba 100644
---
a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala
+++
b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.utils
import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.vectorized.ArrowWritableColumnVector
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -30,12 +29,12 @@ import
scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListCon
object SparkVectorUtil {
- def toArrowRecordBatch(columnarBatch: ColumnarBatch): ArrowRecordBatch = {
- val numRowsInBatch = columnarBatch.numRows()
- val cols = (0 until columnarBatch.numCols).toList.map(
+ def toArrowRecordBatch(batch: ColumnarBatch): ArrowRecordBatch = {
+ ColumnarBatches.checkLoaded(batch)
+ val numRowsInBatch = batch.numRows()
+ val cols = (0 until batch.numCols).toList.map(
i =>
- ColumnarBatches
- .ensureLoaded(ArrowBufferAllocators.contextInstance(), columnarBatch)
+ batch
.column(i)
.asInstanceOf[ArrowWritableColumnVector]
.getValueVector)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
new file mode 100644
index 0000000000..86f9ddab46
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
+import
org.apache.gluten.extension.columnar.transition.Convention.KnownBatchType
+import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
+import org.apache.gluten.iterator.Iterators
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import java.util.concurrent.atomic.AtomicLong
+
+abstract class ColumnarToColumnarExec(from: Convention.BatchType, to:
Convention.BatchType)
+ extends UnaryExecNode
+ with KnownBatchType
+ with KnownChildrenConventions {
+
+ def child: SparkPlan
+ protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch]
+
+ override lazy val metrics: Map[String, SQLMetric] =
+ Map(
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
input batches"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
output batches"),
+ "selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to
convert batches")
+ )
+
+ override def supportsColumnar: Boolean = true
+ override def batchType(): Convention.BatchType = to
+ override def requiredChildrenConventions(): Seq[ConventionReq] = List(
+ ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(from)))
+
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val numInputRows = longMetric("numInputRows")
+ val numInputBatches = longMetric("numInputBatches")
+ val numOutputRows = longMetric("numOutputRows")
+ val numOutputBatches = longMetric("numOutputBatches")
+ val selfTime = longMetric("selfTime")
+
+ child.executeColumnar().mapPartitions {
+ in =>
+ // Self millis = Out millis - In millis.
+ val selfMillis = new AtomicLong(0L)
+ val wrappedIn = Iterators
+ .wrap(in)
+ .collectReadMillis(inMillis => selfMillis.getAndAdd(-inMillis))
+ .create()
+ .map {
+ inBatch =>
+ numInputRows += inBatch.numRows()
+ numInputBatches += 1
+ inBatch
+ }
+ val out = mapIterator(wrappedIn)
+ val wrappedOut = Iterators
+ .wrap(out)
+ .collectReadMillis(outMillis => selfMillis.getAndAdd(outMillis))
+ .recycleIterator {
+ selfTime += selfMillis.get()
+ }
+ .create()
+ .map {
+ outBatch =>
+ numOutputRows += outBatch.numRows()
+ numOutputBatches += 1
+ outBatch
+ }
+ wrappedOut
+ }
+
+ }
+
+ override def output: Seq[Attribute] = child.output
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index 034b458514..fcd34cb1f2 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -69,7 +69,7 @@ object Convention {
final case object VanillaRow extends RowType
}
- trait BatchType {
+ trait BatchType extends Serializable {
final def fromRow(transitionDef: TransitionDef): Unit = {
Transition.factory.update().defineFromRowTransition(this, transitionDef)
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index 3ba09efefe..1441814519 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -68,9 +68,9 @@ object RemoveTransitions extends Rule[SparkPlan] {
@tailrec
private[transition] def removeForNode(plan: SparkPlan): SparkPlan = plan
match {
- // TODO: Consider C2C transitions as well when we have some.
case ColumnarToRowLike(child) => removeForNode(child)
case RowToColumnarLike(child) => removeForNode(child)
+ case ColumnarToColumnarLike(child) => removeForNode(child)
case other => other
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
index 669ba3ac3a..2dd8d632e3 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
@@ -16,6 +16,8 @@
*/
package org.apache.gluten.extension.columnar
+import org.apache.gluten.execution.ColumnarToColumnarExec
+
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.debug.DebugExec
@@ -55,4 +57,15 @@ package object transition {
}
}
}
+
+ // Extractor for Gluten's C2C
+ object ColumnarToColumnarLike {
+ def unapply(plan: SparkPlan): Option[SparkPlan] = {
+ plan match {
+ case c2c: ColumnarToColumnarExec =>
+ Some(c2c.child)
+ case _ => None
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]