This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 84da616220 [GLUTEN-7313][VL] Explicit Arrow transitions, part 1: Add 
LoadArrowDataExec / OffloadArrowDataExec (#7343)
84da616220 is described below

commit 84da61622068242c47f0a72365aa61542e7f6872
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Sep 26 09:06:30 2024 +0800

    [GLUTEN-7313][VL] Explicit Arrow transitions, part 1: Add LoadArrowDataExec 
/ OffloadArrowDataExec (#7343)
---
 .../apache/gluten/columnarbatch/VeloxBatch.scala   |  30 ++++-
 .../gluten/datasource/ArrowCSVFileFormat.scala     |   2 +-
 .../gluten/execution/VeloxResizeBatchesExec.scala  |   2 +
 .../datasource/v2/ArrowBatchScanExec.scala         |   4 +-
 .../api/python/ColumnarArrowEvalPythonExec.scala   |  40 ++++--
 .../sql/execution/ArrowFileSourceScanExec.scala    |   4 +-
 .../execution/VeloxColumnarWriteFilesExec.scala    |   2 +-
 .../velox/VeloxFormatWriterInjects.scala           |   5 +-
 .../spark/sql/execution/utils/ExecUtil.scala       |   2 +-
 .../gluten/columnarbatch/ColumnarBatchTest.java    |  10 +-
 .../columnar/transition/VeloxTransitionSuite.scala | 143 ++++++++++++++-------
 cpp/core/tests/CMakeLists.txt                      |   2 +-
 .../apache/gluten/columnarbatch/ArrowBatch.scala   |  46 -------
 .../apache/gluten/columnarbatch/ArrowBatches.scala |  82 ++++++++++++
 .../gluten/columnarbatch/ColumnarBatches.java      |  17 ++-
 .../gluten/execution/LoadArrowDataExec.scala       |  31 ++---
 .../gluten/execution/OffloadArrowDataExec.scala    |  31 ++---
 .../gluten/vectorized/ColumnarBatchInIterator.java |   6 +-
 .../org/apache/gluten/utils/ArrowAbiUtil.scala     |   8 +-
 .../apache/spark/sql/utils/SparkVectorUtil.scala   |  11 +-
 .../gluten/execution/ColumnarToColumnarExec.scala  |  97 ++++++++++++++
 .../extension/columnar/transition/Convention.scala |   2 +-
 .../columnar/transition/Transitions.scala          |   2 +-
 .../extension/columnar/transition/package.scala    |  13 ++
 24 files changed, 411 insertions(+), 181 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
index aa6676dc9b..33bd11b39d 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.columnarbatch
 
-import org.apache.gluten.execution.{RowToVeloxColumnarExec, 
VeloxColumnarToRowExec}
+import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec, 
RowToVeloxColumnarExec, VeloxColumnarToRowExec}
 import org.apache.gluten.extension.columnar.transition.{Convention, 
TransitionDef}
 
 import org.apache.spark.sql.execution.SparkPlan
@@ -34,7 +34,29 @@ object VeloxBatch extends Convention.BatchType {
         VeloxColumnarToRowExec(plan)
       })
 
-  // Velox batch is considered one-way compatible with Arrow batch.
-  // This is practically achieved by utilizing C++ API 
VeloxColumnarBatch::from at runtime.
-  fromBatch(ArrowBatch, TransitionDef.empty)
+  // TODO: Add explicit transitions between Arrow native batch and Velox batch.
+  //  See https://github.com/apache/incubator-gluten/issues/7313.
+
+  fromBatch(
+    ArrowBatches.ArrowJavaBatch,
+    () =>
+      (plan: SparkPlan) => {
+        OffloadArrowDataExec(plan)
+      })
+
+  toBatch(
+    ArrowBatches.ArrowJavaBatch,
+    () =>
+      (plan: SparkPlan) => {
+        LoadArrowDataExec(plan)
+      })
+
+  fromBatch(
+    ArrowBatches.ArrowNativeBatch,
+    () =>
+      (plan: SparkPlan) => {
+        LoadArrowDataExec(plan)
+      })
+
+  toBatch(ArrowBatches.ArrowNativeBatch, TransitionDef.empty)
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
index f42b921ab6..52f7756381 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
@@ -315,7 +315,7 @@ object ArrowCSVFileFormat {
       batchSize
     )
     veloxBatch
-      .map(v => 
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), v))
+      .map(v => ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), 
v))
   }
 
   private def toAttribute(field: StructField): AttributeReference =
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index ec62a33bdd..995582024b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -35,6 +35,8 @@ import scala.collection.JavaConverters._
 /**
  * An operator to resize input batches by appending the later batches to the 
one that comes earlier,
  * or splitting one batch to smaller ones.
+ *
+ * FIXME: Code duplication with ColumnarToColumnarExec.
  */
 case class VeloxResizeBatchesExec(
     override val child: SparkPlan,
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
index ee0acbf3f4..7255bca58f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution.datasource.v2
 
-import org.apache.gluten.columnarbatch.ArrowBatch
+import org.apache.gluten.columnarbatch.ArrowBatches
 import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.transition.Convention
 
@@ -34,7 +34,7 @@ case class ArrowBatchScanExec(original: BatchScanExec)
   @transient lazy val batch: Batch = original.batch
 
   override protected def batchType0(): Convention.BatchType = {
-    ArrowBatch
+    ArrowBatches.ArrowJavaBatch
   }
 
   override lazy val readerFactory: PartitionReaderFactory = 
original.readerFactory
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index 19f286056e..5c27f94ca8 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -16,9 +16,12 @@
  */
 package org.apache.spark.api.python
 
-import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxBatch}
+import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
+import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
 import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.utils.PullOutProjectHelper
@@ -209,9 +212,20 @@ case class ColumnarArrowEvalPythonExec(
     child: SparkPlan,
     evalType: Int)
   extends EvalPythonExec
-  with GlutenPlan {
+  with GlutenPlan
+  with KnownChildrenConventions {
   override def supportsColumnar: Boolean = true
 
+  override protected def batchType0(): Convention.BatchType = ArrowJavaBatch
+
+  // FIXME: Make this accepts ArrowJavaBatch as input. Before doing that, a 
weight-based
+  //  shortest patch algorithm should be added into transition factory. So 
that the factory
+  //  can find out row->velox->arrow-native->arrow-java as the possible viable 
transition.
+  //  Otherwise with current solution, any input (even already in Arrow Java 
format) will be
+  //  converted into Velox format then into Arrow Java format before entering 
python runner.
+  override def requiredChildrenConventions(): Seq[ConventionReq] = List(
+    ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(VeloxBatch)))
+
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
     "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, 
"output_batches"),
@@ -334,17 +348,17 @@ case class ColumnarArrowEvalPythonExec(
         val inputBatchIter = contextAwareIterator.map {
           inputCb =>
             start_time = System.nanoTime()
-            
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, inputCb)
-            ColumnarBatches.retain(inputCb)
+            val loaded = 
ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), inputCb)
+            ColumnarBatches.retain(loaded)
             // 0. cache input for later merge
-            inputCbCache += inputCb
-            numInputRows += inputCb.numRows
+            inputCbCache += loaded
+            numInputRows += loaded.numRows
             // We only need to pass the referred cols data to python worker 
for evaluation.
             var colsForEval = new ArrayBuffer[ColumnVector]()
             for (i <- originalOffsets) {
-              colsForEval += inputCb.column(i)
+              colsForEval += loaded.column(i)
             }
-            new ColumnarBatch(colsForEval.toArray, inputCb.numRows())
+            new ColumnarBatch(colsForEval.toArray, loaded.numRows())
         }
 
         val outputColumnarBatchIterator =
@@ -366,11 +380,9 @@ case class ColumnarArrowEvalPythonExec(
               numOutputBatches += 1
               numOutputRows += numRows
               val batch = new ColumnarBatch(joinedVectors, numRows)
-              val offloaded =
-                
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
-              ColumnarBatches.release(outputCb)
+              ColumnarBatches.checkLoaded(batch)
               procTime += (System.nanoTime() - start_time) / 1000000
-              offloaded
+              batch
           }
         Iterators
           .wrap(res)
@@ -390,13 +402,13 @@ case class ColumnarArrowEvalPythonExec(
     if (from > to) {
       do {
         vector.close()
-      } while (vector.refCnt() == to)
+      } while (vector.refCnt() != to)
       return
     }
     // from < to
     do {
       vector.retain()
-    } while (vector.refCnt() == to)
+    } while (vector.refCnt() != to)
   }
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
ColumnarArrowEvalPythonExec =
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
index e3298d7042..ee19425aeb 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.gluten.columnarbatch.ArrowBatch
+import org.apache.gluten.columnarbatch.ArrowBatches
 import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.transition.Convention
 
@@ -42,7 +42,7 @@ case class ArrowFileSourceScanExec(original: 
FileSourceScanExec)
   override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()
 
   override protected def batchType0(): Convention.BatchType = {
-    ArrowBatch
+    ArrowBatches.ArrowJavaBatch
   }
 
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index c339014c5e..249f5169d8 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -95,7 +95,7 @@ class VeloxColumnarWriteFilesRDD(
     // Currently, the cb contains three columns: row, fragments, and context.
     // The first row in the row column contains the number of written numRows.
     // The fragments column contains detailed information about the file 
writes.
-    val loadedCb = 
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
+    val loadedCb = ColumnarBatches.load(ArrowBufferAllocators.contextInstance, 
cb)
     assert(loadedCb.numCols() == 3)
     val numWrittenRows = loadedCb.column(0).getLong(0)
 
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index e25d3c663d..65eca8a18c 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -82,9 +82,8 @@ trait VeloxFormatWriterInjects extends 
GlutenFormatWriterInjectsBase {
             // the operation will find a zero column batch from a task-local 
pool
             
ColumnarBatchJniWrapper.create(runtime).getForEmptySchema(batch.numRows)
           } else {
-            val offloaded =
-              
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
-            ColumnarBatches.getNativeHandle(offloaded)
+            ColumnarBatches.checkOffloaded(batch)
+            ColumnarBatches.getNativeHandle(batch)
           }
         }
         datasourceJniWrapper.writeBatch(dsHandle, batchHandle)
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
index 4452f42c96..2e8a6a479a 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
@@ -145,7 +145,7 @@ object ExecUtil {
                     val pid = 
rangePartitioner.get.getPartition(partitionKeyExtractor(row))
                     pidVec.putInt(i, pid)
                 }
-                val pidBatch = ColumnarBatches.ensureOffloaded(
+                val pidBatch = ColumnarBatches.offload(
                   ArrowBufferAllocators.contextInstance(),
                   new ColumnarBatch(Array[ColumnVector](pidVec), cb.numRows))
                 val newHandle = ColumnarBatches.compose(pidBatch, cb)
diff --git 
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
 
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
index 819d35a100..a207a4b326 100644
--- 
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
+++ 
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
@@ -44,10 +44,10 @@ public class ColumnarBatchTest extends VeloxBackendTestBase 
{
           final ColumnarBatch batch = newArrowBatch("a boolean, b int", 
numRows);
           Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
           final ColumnarBatch offloaded =
-              
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
+              ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), 
batch);
           Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
           final ColumnarBatch loaded =
-              
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), 
offloaded);
+              ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), 
offloaded);
           Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
           long cnt =
               StreamSupport.stream(
@@ -69,7 +69,7 @@ public class ColumnarBatchTest extends VeloxBackendTestBase {
           final ColumnarBatch batch = newArrowBatch("a boolean, b int", 
numRows);
           Assert.assertEquals(1, ColumnarBatches.getRefCnt(batch));
           final ColumnarBatch offloaded =
-              
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
+              ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), 
batch);
           Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
           final long handle = ColumnarBatches.getNativeHandle(offloaded);
           final ColumnarBatch created = ColumnarBatches.create(handle);
@@ -110,10 +110,10 @@ public class ColumnarBatchTest extends 
VeloxBackendTestBase {
           col1.putNull(numRows - 1);
           Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
           final ColumnarBatch offloaded =
-              
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
+              ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), 
batch);
           Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
           final ColumnarBatch loaded =
-              
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), 
offloaded);
+              ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), 
offloaded);
           Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
           long cnt =
               StreamSupport.stream(
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
index 8decedc411..df8ee8cc53 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
@@ -17,9 +17,10 @@
 package org.apache.gluten.extension.columnar.transition
 
 import org.apache.gluten.backendsapi.velox.VeloxListenerApi
-import org.apache.gluten.columnarbatch.{ArrowBatch, VeloxBatch}
+import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, 
ArrowNativeBatch}
+import org.apache.gluten.columnarbatch.VeloxBatch
 import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.execution.{RowToVeloxColumnarExec, 
VeloxColumnarToRowExec}
+import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec, 
RowToVeloxColumnarExec, VeloxColumnarToRowExec}
 import 
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
 import org.apache.gluten.test.MockVeloxBackend
 
@@ -49,20 +50,20 @@ class VeloxTransitionSuite extends SharedSparkSession {
     assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch, 
RowToColumnarExec(RowLeaf()))))
   }
 
-  test("Arrow C2R - outputs row") {
-    val in = BatchLeaf(ArrowBatch)
+  test("ArrowNative C2R - outputs row") {
+    val in = BatchLeaf(ArrowNativeBatch)
     val out = Transitions.insertTransitions(in, outputsColumnar = false)
-    assert(out == ColumnarToRowExec(BatchLeaf(ArrowBatch)))
+    assert(out == 
ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch))))
   }
 
-  test("Arrow C2R - requires row input") {
-    val in = RowUnary(BatchLeaf(ArrowBatch))
+  test("ArrowNative C2R - requires row input") {
+    val in = RowUnary(BatchLeaf(ArrowNativeBatch))
     val out = Transitions.insertTransitions(in, outputsColumnar = false)
-    assert(out == RowUnary(ColumnarToRowExec(BatchLeaf(ArrowBatch))))
+    assert(out == 
RowUnary(ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
   }
 
-  test("Arrow R2C - requires Arrow input") {
-    val in = BatchUnary(ArrowBatch, RowLeaf())
+  test("ArrowNative R2C - requires Arrow input") {
+    val in = BatchUnary(ArrowNativeBatch, RowLeaf())
     assertThrows[GlutenException] {
       // No viable transitions.
       // FIXME: Support this case.
@@ -70,41 +71,56 @@ class VeloxTransitionSuite extends SharedSparkSession {
     }
   }
 
-  test("Velox C2R - outputs row") {
-    val in = BatchLeaf(VeloxBatch)
+  test("ArrowNative-to-Velox C2C") {
+    val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowNativeBatch))
     val out = Transitions.insertTransitions(in, outputsColumnar = false)
-    assert(out == VeloxColumnarToRowExec(BatchLeaf(VeloxBatch)))
+    // No explicit transition needed for ArrowNative-to-Velox.
+    // FIXME: Add explicit transitions.
+    //  See https://github.com/apache/incubator-gluten/issues/7313.
+    assert(
+      out == VeloxColumnarToRowExec(
+        BatchUnary(VeloxBatch, 
LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
   }
 
-  test("Velox C2R - requires row input") {
-    val in = RowUnary(BatchLeaf(VeloxBatch))
+  test("Velox-to-ArrowNative C2C") {
+    val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch))
     val out = Transitions.insertTransitions(in, outputsColumnar = false)
-    assert(out == RowUnary(VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))
+    assert(
+      out == ColumnarToRowExec(
+        LoadArrowDataExec(BatchUnary(ArrowNativeBatch, 
BatchLeaf(VeloxBatch)))))
   }
 
-  test("Velox R2C - outputs Velox") {
-    val in = RowLeaf()
-    val out = Transitions.insertTransitions(in, outputsColumnar = true)
-    assert(out == RowToVeloxColumnarExec(RowLeaf()))
+  test("Vanilla-to-ArrowNative C2C") {
+    val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VanillaBatch))
+    assertThrows[GlutenException] {
+      // No viable transitions.
+      // FIXME: Support this case.
+      Transitions.insertTransitions(in, outputsColumnar = false)
+    }
   }
 
-  test("Velox R2C - requires Velox input") {
-    val in = BatchUnary(VeloxBatch, RowLeaf())
+  test("ArrowNative-to-Vanilla C2C") {
+    val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowNativeBatch))
     val out = Transitions.insertTransitions(in, outputsColumnar = false)
-    assert(out == VeloxColumnarToRowExec(BatchUnary(VeloxBatch, 
RowToVeloxColumnarExec(RowLeaf()))))
+    assert(
+      out == ColumnarToRowExec(
+        BatchUnary(VanillaBatch, 
LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
   }
 
-  test("Arrow-to-Velox C2C") {
-    val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowBatch))
+  test("ArrowJava C2R - outputs row") {
+    val in = BatchLeaf(ArrowJavaBatch)
     val out = Transitions.insertTransitions(in, outputsColumnar = false)
-    // No explicit transition needed for Arrow-to-Velox.
-    // FIXME: Add explicit transitions.
-    //  See https://github.com/apache/incubator-gluten/issues/7313.
-    assert(out == VeloxColumnarToRowExec(BatchUnary(VeloxBatch, 
BatchLeaf(ArrowBatch))))
+    assert(out == ColumnarToRowExec(BatchLeaf(ArrowJavaBatch)))
+  }
+
+  test("ArrowJava C2R - requires row input") {
+    val in = RowUnary(BatchLeaf(ArrowJavaBatch))
+    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    assert(out == RowUnary(ColumnarToRowExec(BatchLeaf(ArrowJavaBatch))))
   }
 
-  test("Velox-to-Arrow C2C") {
-    val in = BatchUnary(ArrowBatch, BatchLeaf(VeloxBatch))
+  test("ArrowJava R2C - requires Arrow input") {
+    val in = BatchUnary(ArrowJavaBatch, RowLeaf())
     assertThrows[GlutenException] {
       // No viable transitions.
       // FIXME: Support this case.
@@ -112,24 +128,24 @@ class VeloxTransitionSuite extends SharedSparkSession {
     }
   }
 
-  test("Vanilla-to-Velox C2C") {
-    val in = BatchUnary(VeloxBatch, BatchLeaf(VanillaBatch))
+  test("ArrowJava-to-Velox C2C") {
+    val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowJavaBatch))
     val out = Transitions.insertTransitions(in, outputsColumnar = false)
     assert(
       out == VeloxColumnarToRowExec(
-        BatchUnary(VeloxBatch, 
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch))))))
+        BatchUnary(VeloxBatch, 
OffloadArrowDataExec(BatchLeaf(ArrowJavaBatch)))))
   }
 
-  test("Velox-to-Vanilla C2C") {
-    val in = BatchUnary(VanillaBatch, BatchLeaf(VeloxBatch))
+  test("Velox-to-ArrowJava C2C") {
+    val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VeloxBatch))
     val out = Transitions.insertTransitions(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
-        BatchUnary(VanillaBatch, 
RowToColumnarExec(VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))))
+        BatchUnary(ArrowJavaBatch, LoadArrowDataExec(BatchLeaf(VeloxBatch)))))
   }
 
-  test("Vanilla-to-Arrow C2C") {
-    val in = BatchUnary(ArrowBatch, BatchLeaf(VanillaBatch))
+  test("Vanilla-to-ArrowJava C2C") {
+    val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VanillaBatch))
     assertThrows[GlutenException] {
       // No viable transitions.
       // FIXME: Support this case.
@@ -137,13 +153,50 @@ class VeloxTransitionSuite extends SharedSparkSession {
     }
   }
 
-  test("Arrow-to-Vanilla C2C") {
-    val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowBatch))
+  test("ArrowJava-to-Vanilla C2C") {
+    val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowJavaBatch))
     val out = Transitions.insertTransitions(in, outputsColumnar = false)
-    // No explicit transition needed for Arrow-to-Vanilla.
-    // FIXME: Add explicit transitions.
-    //  See https://github.com/apache/incubator-gluten/issues/7313.
-    assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch, 
BatchLeaf(ArrowBatch))))
+    assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch, 
BatchLeaf(ArrowJavaBatch))))
+  }
+
+  test("Velox C2R - outputs row") {
+    val in = BatchLeaf(VeloxBatch)
+    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    assert(out == VeloxColumnarToRowExec(BatchLeaf(VeloxBatch)))
+  }
+
+  test("Velox C2R - requires row input") {
+    val in = RowUnary(BatchLeaf(VeloxBatch))
+    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    assert(out == RowUnary(VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))
+  }
+
+  test("Velox R2C - outputs Velox") {
+    val in = RowLeaf()
+    val out = Transitions.insertTransitions(in, outputsColumnar = true)
+    assert(out == RowToVeloxColumnarExec(RowLeaf()))
+  }
+
+  test("Velox R2C - requires Velox input") {
+    val in = BatchUnary(VeloxBatch, RowLeaf())
+    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    assert(out == VeloxColumnarToRowExec(BatchUnary(VeloxBatch, 
RowToVeloxColumnarExec(RowLeaf()))))
+  }
+
+  test("Vanilla-to-Velox C2C") {
+    val in = BatchUnary(VeloxBatch, BatchLeaf(VanillaBatch))
+    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    assert(
+      out == VeloxColumnarToRowExec(
+        BatchUnary(VeloxBatch, 
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch))))))
+  }
+
+  test("Velox-to-Vanilla C2C") {
+    val in = BatchUnary(VanillaBatch, BatchLeaf(VeloxBatch))
+    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    assert(
+      out == ColumnarToRowExec(
+        BatchUnary(VanillaBatch, 
RowToColumnarExec(VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))))
   }
 
   override protected def beforeAll(): Unit = {
diff --git a/cpp/core/tests/CMakeLists.txt b/cpp/core/tests/CMakeLists.txt
index a0f3406f5c..407045d9f9 100644
--- a/cpp/core/tests/CMakeLists.txt
+++ b/cpp/core/tests/CMakeLists.txt
@@ -18,4 +18,4 @@ if(ENABLE_HBM)
 endif()
 
 add_test_case(round_robin_partitioner_test SOURCES 
RoundRobinPartitionerTest.cc)
-add_test_case(objectstore__test SOURCES ObjectStoreTest.cc)
+add_test_case(object_store_test SOURCES ObjectStoreTest.cc)
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala
deleted file mode 100644
index 58a88e1f49..0000000000
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gluten.columnarbatch
-
-import org.apache.gluten.extension.columnar.transition.{Convention, 
TransitionDef}
-import 
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
-
-import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
-
-/**
- * ArrowBatch stands for Gluten's Arrow-based columnar batch implementation. 
Vanilla Spark's
- * ColumnarBatch consisting of 
[[org.apache.spark.sql.vectorized.ArrowColumnVector]]s is still
- * treated as [[Convention.BatchType.VanillaBatch]].
- *
- * As of now, ArrowBatch should have 
[[org.apache.gluten.vectorized.ArrowWritableColumnVector]]s
- * populated in it. ArrowBatch can be loaded from / offloaded to native to C++ 
ArrowColumnarBatch
- * through API in [[ColumnarBatches]]. After being offloaded, ArrowBatch is no 
longer considered a
- * legal ArrowBatch and cannot be accepted by trivial ColumnarToRowExec. To 
follow that rule, Any
- * plan with this batch type should promise it emits loaded batch only.
- */
-object ArrowBatch extends Convention.BatchType {
-  toRow(
-    () =>
-      (plan: SparkPlan) => {
-        ColumnarToRowExec(plan)
-      })
-
-  // Arrow batch is one-way compatible with vanilla batch since it provides 
valid
-  // #get<type>(...) implementations.
-  toBatch(VanillaBatch, TransitionDef.empty)
-}
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatches.scala 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatches.scala
new file mode 100644
index 0000000000..fac84efd35
--- /dev/null
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ArrowBatches.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.columnarbatch
+
+import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec}
+import org.apache.gluten.extension.columnar.transition.{Convention, 
TransitionDef}
+import 
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
+
+import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
+
+object ArrowBatches {
+
+  /**
+   * JavaArrowBatch stands for Gluten's Java Arrow-based columnar batch 
implementation.
+   *
+   * JavaArrowBatch should have 
[[org.apache.gluten.vectorized.ArrowWritableColumnVector]]s
+   * populated in it. JavaArrowBatch can be offloaded to NativeArrowBatch 
through API in
+   * [[ColumnarBatches]].
+   *
+   * JavaArrowBatch is compatible with vanilla batch since it provides valid 
#get<type>(...)
+   * implementations.
+   */
+  object ArrowJavaBatch extends Convention.BatchType {
+    toRow(
+      () =>
+        (plan: SparkPlan) => {
+          ColumnarToRowExec(plan)
+        })
+
+    toBatch(VanillaBatch, TransitionDef.empty)
+  }
+
+  /**
+   * NativeArrowBatch stands for Gluten's native Arrow-based columnar batch 
implementation.
+   *
+   * NativeArrowBatch should have 
[[org.apache.gluten.columnarbatch.IndicatorVector]] set as the
+   * first vector. NativeArrowBatch can be loaded to JavaArrowBatch through 
API in
+   * [[ColumnarBatches]].
+   */
+  object ArrowNativeBatch extends Convention.BatchType {
+    toRow(
+      () =>
+        (plan: SparkPlan) => {
+          ColumnarToRowExec(LoadArrowDataExec(plan))
+        })
+
+    toBatch(
+      VanillaBatch,
+      () =>
+        (plan: SparkPlan) => {
+          LoadArrowDataExec(plan)
+        })
+
+    fromBatch(
+      ArrowJavaBatch,
+      () =>
+        (plan: SparkPlan) => {
+          OffloadArrowDataExec(plan)
+        })
+
+    toBatch(
+      ArrowJavaBatch,
+      () =>
+        (plan: SparkPlan) => {
+          LoadArrowDataExec(plan)
+        })
+  }
+}
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index e6d97db84b..3f48d8f293 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -25,6 +25,7 @@ import org.apache.gluten.utils.InternalRowUtl;
 import org.apache.gluten.vectorized.ArrowWritableColumnVector;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.arrow.c.ArrowArray;
 import org.apache.arrow.c.ArrowSchema;
 import org.apache.arrow.c.CDataDictionaryProvider;
@@ -121,7 +122,7 @@ public class ColumnarBatches {
    * Ensure the input batch is offloaded as native-based columnar batch (See 
{@link IndicatorVector}
    * and {@link PlaceholderVector}). This method will close the input column 
batch after offloaded.
    */
-  public static ColumnarBatch ensureOffloaded(BufferAllocator allocator, 
ColumnarBatch batch) {
+  private static ColumnarBatch ensureOffloaded(BufferAllocator allocator, 
ColumnarBatch batch) {
     if (ColumnarBatches.isLightBatch(batch)) {
       return batch;
     }
@@ -133,14 +134,22 @@ public class ColumnarBatches {
    * take place if loading is required, which means when the input batch is 
not loaded yet. This
    * method will close the input column batch after loaded.
    */
-  public static ColumnarBatch ensureLoaded(BufferAllocator allocator, 
ColumnarBatch batch) {
+  private static ColumnarBatch ensureLoaded(BufferAllocator allocator, 
ColumnarBatch batch) {
     if (isHeavyBatch(batch)) {
       return batch;
     }
     return load(allocator, batch);
   }
 
-  private static ColumnarBatch load(BufferAllocator allocator, ColumnarBatch 
input) {
+  public static void checkLoaded(ColumnarBatch batch) {
+    Preconditions.checkArgument(isHeavyBatch(batch), "Input batch is not 
loaded");
+  }
+
+  public static void checkOffloaded(ColumnarBatch batch) {
+    Preconditions.checkArgument(isLightBatch(batch), "Input batch is not 
offloaded");
+  }
+
+  public static ColumnarBatch load(BufferAllocator allocator, ColumnarBatch 
input) {
     if (!ColumnarBatches.isLightBatch(input)) {
       throw new IllegalArgumentException(
           "Input is not light columnar batch. "
@@ -186,7 +195,7 @@ public class ColumnarBatches {
     }
   }
 
-  private static ColumnarBatch offload(BufferAllocator allocator, 
ColumnarBatch input) {
+  public static ColumnarBatch offload(BufferAllocator allocator, ColumnarBatch 
input) {
     if (!isHeavyBatch(input)) {
       throw new IllegalArgumentException("batch is not Arrow columnar batch");
     }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
 
b/gluten-arrow/src/main/java/org/apache/gluten/execution/LoadArrowDataExec.scala
similarity index 51%
copy from 
backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
copy to 
gluten-arrow/src/main/java/org/apache/gluten/execution/LoadArrowDataExec.scala
index aa6676dc9b..36f3c48d4e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/execution/LoadArrowDataExec.scala
@@ -14,27 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.columnarbatch
+package org.apache.gluten.execution
 
-import org.apache.gluten.execution.{RowToVeloxColumnarExec, 
VeloxColumnarToRowExec}
-import org.apache.gluten.extension.columnar.transition.{Convention, 
TransitionDef}
+import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, 
ArrowNativeBatch}
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
-object VeloxBatch extends Convention.BatchType {
-  fromRow(
-    () =>
-      (plan: SparkPlan) => {
-        RowToVeloxColumnarExec(plan)
-      })
+/** Converts input data with batch type [[ArrowNativeBatch]] to type 
[[ArrowJavaBatch]]. */
+case class LoadArrowDataExec(override val child: SparkPlan)
+  extends ColumnarToColumnarExec(ArrowNativeBatch, ArrowJavaBatch) {
+  override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
+    in.map(b => ColumnarBatches.load(ArrowBufferAllocators.contextInstance, b))
+  }
 
-  toRow(
-    () =>
-      (plan: SparkPlan) => {
-        VeloxColumnarToRowExec(plan)
-      })
-
-  // Velox batch is considered one-way compatible with Arrow batch.
-  // This is practically achieved by utilizing C++ API 
VeloxColumnarBatch::from at runtime.
-  fromBatch(ArrowBatch, TransitionDef.empty)
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
 
b/gluten-arrow/src/main/java/org/apache/gluten/execution/OffloadArrowDataExec.scala
similarity index 51%
copy from 
backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
copy to 
gluten-arrow/src/main/java/org/apache/gluten/execution/OffloadArrowDataExec.scala
index aa6676dc9b..7b2184a240 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/execution/OffloadArrowDataExec.scala
@@ -14,27 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.columnarbatch
+package org.apache.gluten.execution
 
-import org.apache.gluten.execution.{RowToVeloxColumnarExec, 
VeloxColumnarToRowExec}
-import org.apache.gluten.extension.columnar.transition.{Convention, 
TransitionDef}
+import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, 
ArrowNativeBatch}
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
-object VeloxBatch extends Convention.BatchType {
-  fromRow(
-    () =>
-      (plan: SparkPlan) => {
-        RowToVeloxColumnarExec(plan)
-      })
+/** Converts input data with batch type [[ArrowJavaBatch]] to type 
[[ArrowNativeBatch]]. */
+case class OffloadArrowDataExec(override val child: SparkPlan)
+  extends ColumnarToColumnarExec(ArrowJavaBatch, ArrowNativeBatch) {
+  override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
+    in.map(b => ColumnarBatches.offload(ArrowBufferAllocators.contextInstance, 
b))
+  }
 
-  toRow(
-    () =>
-      (plan: SparkPlan) => {
-        VeloxColumnarToRowExec(plan)
-      })
-
-  // Velox batch is considered one-way compatible with Arrow batch.
-  // This is practically achieved by utilizing C++ API 
VeloxColumnarBatch::from at runtime.
-  fromBatch(ArrowBatch, TransitionDef.empty)
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
 }
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
index a7003a6bd3..c69caf5f59 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
@@ -18,7 +18,6 @@ package org.apache.gluten.vectorized;
 
 import org.apache.gluten.columnarbatch.ColumnarBatchJniWrapper;
 import org.apache.gluten.columnarbatch.ColumnarBatches;
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
 import org.apache.gluten.runtime.Runtimes;
 
 import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -45,8 +44,7 @@ public class ColumnarBatchInIterator {
       return 
ColumnarBatchJniWrapper.create(Runtimes.contextInstance("ColumnarBatchInIterator"))
           .getForEmptySchema(next.numRows());
     }
-    final ColumnarBatch offloaded =
-        
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), next);
-    return ColumnarBatches.getNativeHandle(offloaded);
+    ColumnarBatches.checkOffloaded(next);
+    return ColumnarBatches.getNativeHandle(next);
   }
 }
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala 
b/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala
index 8c6161e0c4..af5fcb0f60 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowAbiUtil.scala
@@ -139,12 +139,12 @@ object ArrowAbiUtil {
 
   def exportFromSparkColumnarBatch(
       allocator: BufferAllocator,
-      columnarBatch: ColumnarBatch,
+      batch: ColumnarBatch,
       cSchema: ArrowSchema,
       cArray: ArrowArray): Unit = {
-    val loaded = ColumnarBatches.ensureLoaded(allocator, columnarBatch)
-    val schema = ArrowUtil.toSchema(loaded)
-    val rb = SparkVectorUtil.toArrowRecordBatch(loaded)
+    ColumnarBatches.checkLoaded(batch)
+    val schema = ArrowUtil.toSchema(batch)
+    val rb = SparkVectorUtil.toArrowRecordBatch(batch)
     try {
       exportFromArrowRecordBatch(allocator, rb, schema, cSchema, cArray)
     } finally {
diff --git 
a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala 
b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala
index 3e86be79ac..6a0f29c4ba 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkVectorUtil.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.utils
 
 import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.vectorized.ArrowWritableColumnVector
 
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -30,12 +29,12 @@ import 
scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListCon
 
 object SparkVectorUtil {
 
-  def toArrowRecordBatch(columnarBatch: ColumnarBatch): ArrowRecordBatch = {
-    val numRowsInBatch = columnarBatch.numRows()
-    val cols = (0 until columnarBatch.numCols).toList.map(
+  def toArrowRecordBatch(batch: ColumnarBatch): ArrowRecordBatch = {
+    ColumnarBatches.checkLoaded(batch)
+    val numRowsInBatch = batch.numRows()
+    val cols = (0 until batch.numCols).toList.map(
       i =>
-        ColumnarBatches
-          .ensureLoaded(ArrowBufferAllocators.contextInstance(), columnarBatch)
+        batch
           .column(i)
           .asInstanceOf[ArrowWritableColumnVector]
           .getValueVector)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
new file mode 100644
index 0000000000..86f9ddab46
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
+import 
org.apache.gluten.extension.columnar.transition.Convention.KnownBatchType
+import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
+import org.apache.gluten.iterator.Iterators
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import java.util.concurrent.atomic.AtomicLong
+
+abstract class ColumnarToColumnarExec(from: Convention.BatchType, to: 
Convention.BatchType)
+  extends UnaryExecNode
+  with KnownBatchType
+  with KnownChildrenConventions {
+
+  def child: SparkPlan
+  protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch]
+
+  override lazy val metrics: Map[String, SQLMetric] =
+    Map(
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
input batches"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+      "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"),
+      "selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
convert batches")
+    )
+
+  override def supportsColumnar: Boolean = true
+  override def batchType(): Convention.BatchType = to
+  override def requiredChildrenConventions(): Seq[ConventionReq] = List(
+    ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(from)))
+
+  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numInputRows = longMetric("numInputRows")
+    val numInputBatches = longMetric("numInputBatches")
+    val numOutputRows = longMetric("numOutputRows")
+    val numOutputBatches = longMetric("numOutputBatches")
+    val selfTime = longMetric("selfTime")
+
+    child.executeColumnar().mapPartitions {
+      in =>
+        // Self millis = Out millis - In millis.
+        val selfMillis = new AtomicLong(0L)
+        val wrappedIn = Iterators
+          .wrap(in)
+          .collectReadMillis(inMillis => selfMillis.getAndAdd(-inMillis))
+          .create()
+          .map {
+            inBatch =>
+              numInputRows += inBatch.numRows()
+              numInputBatches += 1
+              inBatch
+          }
+        val out = mapIterator(wrappedIn)
+        val wrappedOut = Iterators
+          .wrap(out)
+          .collectReadMillis(outMillis => selfMillis.getAndAdd(outMillis))
+          .recycleIterator {
+            selfTime += selfMillis.get()
+          }
+          .create()
+          .map {
+            outBatch =>
+              numOutputRows += outBatch.numRows()
+              numOutputBatches += 1
+              outBatch
+          }
+        wrappedOut
+    }
+
+  }
+
+  override def output: Seq[Attribute] = child.output
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index 034b458514..fcd34cb1f2 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -69,7 +69,7 @@ object Convention {
     final case object VanillaRow extends RowType
   }
 
-  trait BatchType {
+  trait BatchType extends Serializable {
     final def fromRow(transitionDef: TransitionDef): Unit = {
       Transition.factory.update().defineFromRowTransition(this, transitionDef)
     }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index 3ba09efefe..1441814519 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -68,9 +68,9 @@ object RemoveTransitions extends Rule[SparkPlan] {
 
   @tailrec
   private[transition] def removeForNode(plan: SparkPlan): SparkPlan = plan 
match {
-    // TODO: Consider C2C transitions as well when we have some.
     case ColumnarToRowLike(child) => removeForNode(child)
     case RowToColumnarLike(child) => removeForNode(child)
+    case ColumnarToColumnarLike(child) => removeForNode(child)
     case other => other
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
index 669ba3ac3a..2dd8d632e3 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.gluten.extension.columnar
 
+import org.apache.gluten.execution.ColumnarToColumnarExec
+
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
 import org.apache.spark.sql.execution.debug.DebugExec
@@ -55,4 +57,15 @@ package object transition {
       }
     }
   }
+
+  // Extractor for Gluten's C2C
+  object ColumnarToColumnarLike {
+    def unapply(plan: SparkPlan): Option[SparkPlan] = {
+      plan match {
+        case c2c: ColumnarToColumnarExec =>
+          Some(c2c.child)
+        case _ => None
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to