This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d0bb5224 [VL] Provide options to combine small batches before sending 
to shuffle (#6009)
1d0bb5224 is described below

commit 1d0bb52248097cba721e850e83c0963971dbc81c
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jun 11 13:57:46 2024 +0800

    [VL] Provide options to combine small batches before sending to shuffle 
(#6009)
---
 .../clickhouse/CHSparkPlanExecApi.scala            |   9 +-
 .../apache/gluten/utils/VeloxBatchAppender.java    |  41 ++++++++
 .../gluten/utils/VeloxBatchAppenderJniWrapper.java |  41 ++++++++
 .../backendsapi/velox/VeloxIteratorApi.scala       |   4 +-
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |  44 +++++----
 .../gluten/execution/VeloxAppendBatchesExec.scala  | 105 +++++++++++++++++++++
 .../org/apache/gluten/execution/TestOperator.scala |  26 ++++-
 cpp/core/jni/JniCommon.cc                          |  58 ++++++++++++
 cpp/core/jni/JniCommon.h                           |  41 ++++++++
 cpp/core/jni/JniWrapper.cc                         |  88 -----------------
 cpp/velox/CMakeLists.txt                           |   1 +
 cpp/velox/jni/VeloxJniWrapper.cc                   |  18 ++++
 cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc   |  16 +---
 cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h    |   9 --
 cpp/velox/shuffle/VeloxShuffleWriter.h             |   4 -
 cpp/velox/utils/VeloxBatchAppender.cc              |  59 ++++++++++++
 cpp/velox/utils/VeloxBatchAppender.h               |  41 ++++++++
 .../gluten/backendsapi/SparkPlanExecApi.scala      |   2 +-
 .../extension/columnar/OffloadSingleNode.scala     |  28 +++---
 .../scala/org/apache/gluten/utils/Iterators.scala  |  38 ++++++--
 .../vectorized/ColumnarBatchOutIterator.java       |   3 +-
 .../scala/org/apache/gluten/GlutenConfig.scala     |  26 +++++
 22 files changed, 532 insertions(+), 170 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index bdbdfed0d..a8a05c40f 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -271,13 +271,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
     }
   }
 
-  override def genColumnarShuffleExchange(
-      shuffle: ShuffleExchangeExec,
-      child: SparkPlan): SparkPlan = {
+  override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): 
SparkPlan = {
+    val child = shuffle.child
     if (
-      BackendsApiManager.getSettings.supportShuffleWithProject(
-        shuffle.outputPartitioning,
-        shuffle.child)
+      
BackendsApiManager.getSettings.supportShuffleWithProject(shuffle.outputPartitioning,
 child)
     ) {
       val (projectColumnNumber, newPartitioning, newChild) =
         addProjectionForShuffleExchange(shuffle)
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppender.java 
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppender.java
new file mode 100644
index 000000000..1bf34b5ce
--- /dev/null
+++ 
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppender.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils;
+
+import org.apache.gluten.exec.Runtime;
+import org.apache.gluten.exec.Runtimes;
+import org.apache.gluten.memory.nmm.NativeMemoryManager;
+import org.apache.gluten.memory.nmm.NativeMemoryManagers;
+import org.apache.gluten.vectorized.ColumnarBatchInIterator;
+import org.apache.gluten.vectorized.ColumnarBatchOutIterator;
+
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+import java.util.Iterator;
+
+public final class VeloxBatchAppender {
+  public static ColumnarBatchOutIterator create(
+      int minOutputBatchSize, Iterator<ColumnarBatch> in) {
+    final Runtime runtime = Runtimes.contextInstance();
+    final NativeMemoryManager nmm = 
NativeMemoryManagers.contextInstance("VeloxBatchAppender");
+    long outHandle =
+        VeloxBatchAppenderJniWrapper.forRuntime(runtime)
+            .create(
+                nmm.getNativeInstanceHandle(), minOutputBatchSize, new 
ColumnarBatchInIterator(in));
+    return new ColumnarBatchOutIterator(runtime, outHandle, nmm);
+  }
+}
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppenderJniWrapper.java
 
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppenderJniWrapper.java
new file mode 100644
index 000000000..9e2531951
--- /dev/null
+++ 
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppenderJniWrapper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils;
+
+import org.apache.gluten.exec.Runtime;
+import org.apache.gluten.exec.RuntimeAware;
+import org.apache.gluten.vectorized.ColumnarBatchInIterator;
+
+public class VeloxBatchAppenderJniWrapper implements RuntimeAware {
+  private final Runtime runtime;
+
+  private VeloxBatchAppenderJniWrapper(Runtime runtime) {
+    this.runtime = runtime;
+  }
+
+  public static VeloxBatchAppenderJniWrapper forRuntime(Runtime runtime) {
+    return new VeloxBatchAppenderJniWrapper(runtime);
+  }
+
+  @Override
+  public long handle() {
+    return runtime.getHandle();
+  }
+
+  public native long create(
+      long memoryManagerHandle, int minOutputBatchSize, 
ColumnarBatchInIterator itr);
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index b20eccafb..459a7886e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -203,7 +203,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
         resIter.close()
       }
       .recyclePayload(batch => batch.close())
-      .addToPipelineTime(pipelineTime)
+      .collectLifeMillis(millis => pipelineTime += millis)
       .asInterruptible(context)
       .create()
   }
@@ -246,7 +246,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
         nativeResultIterator.close()
       }
       .recyclePayload(batch => batch.close())
-      .addToPipelineTime(pipelineTime)
+      .collectLifeMillis(millis => pipelineTime += millis)
       .create()
   }
   // scalastyle:on argcount
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index f8af80a9b..66ca8660a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -320,9 +320,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       aggregateAttributes: Seq[Attribute]): HashAggregateExecPullOutBaseHelper 
=
     HashAggregateExecPullOutHelper(aggregateExpressions, aggregateAttributes)
 
-  override def genColumnarShuffleExchange(
-      shuffle: ShuffleExchangeExec,
-      newChild: SparkPlan): SparkPlan = {
+  override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): 
SparkPlan = {
     def allowHashOnMap[T](f: => T): T = {
       val originalAllowHash = 
SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_HASH_ON_MAPTYPE)
       try {
@@ -333,20 +331,28 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       }
     }
 
+    def maybeAddAppendBatchesExec(plan: SparkPlan): SparkPlan = {
+      if (GlutenConfig.getConf.veloxCoalesceBatchesBeforeShuffle) {
+        VeloxAppendBatchesExec(plan, 
GlutenConfig.getConf.veloxMinBatchSizeForShuffle)
+      } else {
+        plan
+      }
+    }
+
+    val child = shuffle.child
+
     shuffle.outputPartitioning match {
       case HashPartitioning(exprs, _) =>
         val hashExpr = new Murmur3Hash(exprs)
-        val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ 
newChild.output
-        val projectTransformer = ProjectExecTransformer(projectList, newChild)
+        val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ 
child.output
+        val projectTransformer = ProjectExecTransformer(projectList, child)
         val validationResult = projectTransformer.doValidate()
         if (validationResult.isValid) {
-          ColumnarShuffleExchangeExec(
-            shuffle,
-            projectTransformer,
-            projectTransformer.output.drop(1))
+          val newChild = maybeAddAppendBatchesExec(projectTransformer)
+          ColumnarShuffleExchangeExec(shuffle, newChild, 
newChild.output.drop(1))
         } else {
           TransformHints.tagNotTransformable(shuffle, validationResult)
-          shuffle.withNewChildren(newChild :: Nil)
+          shuffle.withNewChildren(child :: Nil)
         }
       case RoundRobinPartitioning(num) if SQLConf.get.sortBeforeRepartition && 
num > 1 =>
         // scalastyle:off line.size.limit
@@ -357,19 +363,20 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
         allowHashOnMap {
           // Velox hash expression does not support null type and we also do 
not need to sort
           // null type since the value always be null.
-          val columnsForHash = newChild.output.filterNot(_.dataType == 
NullType)
+          val columnsForHash = child.output.filterNot(_.dataType == NullType)
           if (columnsForHash.isEmpty) {
+            val newChild = maybeAddAppendBatchesExec(child)
             ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
           } else {
             val hashExpr = new Murmur3Hash(columnsForHash)
-            val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ 
newChild.output
-            val projectTransformer = ProjectExecTransformer(projectList, 
newChild)
+            val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ 
child.output
+            val projectTransformer = ProjectExecTransformer(projectList, child)
             val projectBeforeSortValidationResult = 
projectTransformer.doValidate()
             // Make sure we support offload hash expression
             val projectBeforeSort = if 
(projectBeforeSortValidationResult.isValid) {
               projectTransformer
             } else {
-              val project = ProjectExec(projectList, newChild)
+              val project = ProjectExec(projectList, child)
               TransformHints.tagNotTransformable(project, 
projectBeforeSortValidationResult)
               project
             }
@@ -380,17 +387,16 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
               ProjectExecTransformer(projectList.drop(1), sortByHashCode)
             val validationResult = dropSortColumnTransformer.doValidate()
             if (validationResult.isValid) {
-              ColumnarShuffleExchangeExec(
-                shuffle,
-                dropSortColumnTransformer,
-                dropSortColumnTransformer.output)
+              val newChild = 
maybeAddAppendBatchesExec(dropSortColumnTransformer)
+              ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
             } else {
               TransformHints.tagNotTransformable(shuffle, validationResult)
-              shuffle.withNewChildren(newChild :: Nil)
+              shuffle.withNewChildren(child :: Nil)
             }
           }
         }
       case _ =>
+        val newChild = maybeAddAppendBatchesExec(child)
         ColumnarShuffleExchangeExec(shuffle, newChild, null)
     }
   }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala
new file mode 100644
index 000000000..8c2834574
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.utils.{Iterators, VeloxBatchAppender}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+
+/**
+ * An operator to coalesce input batches by appending the later batches to the 
one that comes
+ * earlier.
+ */
+case class VeloxAppendBatchesExec(override val child: SparkPlan, 
minOutputBatchSize: Int)
+  extends GlutenPlan
+  with UnaryExecNode {
+
+  override lazy val metrics: Map[String, SQLMetric] = Map(
+    "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+    "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
input batches"),
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"),
+    "appendTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
append batches")
+  )
+
+  override def supportsColumnar: Boolean = true
+  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numInputRows = longMetric("numInputRows")
+    val numInputBatches = longMetric("numInputBatches")
+    val numOutputRows = longMetric("numOutputRows")
+    val numOutputBatches = longMetric("numOutputBatches")
+    val appendTime = longMetric("appendTime")
+
+    child.executeColumnar().mapPartitions {
+      in =>
+        // Append millis = Out millis - In millis.
+        val appendMillis = new AtomicLong(0L)
+
+        val appender = VeloxBatchAppender.create(
+          minOutputBatchSize,
+          Iterators
+            .wrap(in)
+            .collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis))
+            .create()
+            .map {
+              inBatch =>
+                numInputRows += inBatch.numRows()
+                numInputBatches += 1
+                inBatch
+            }
+            .asJava
+        )
+
+        val out = Iterators
+          .wrap(appender.asScala)
+          .collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis))
+          .recyclePayload(_.close())
+          .recycleIterator {
+            appender.close()
+            appendTime += appendMillis.get()
+          }
+          .create()
+          .map {
+            outBatch =>
+              numOutputRows += outBatch.numRows()
+              numOutputBatches += 1
+              outBatch
+          }
+
+        out
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
+}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index cd1f21a0a..ae8d64a09 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -24,6 +24,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -33,7 +34,7 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters
 
-class TestOperator extends VeloxWholeStageTransformerSuite {
+class TestOperator extends VeloxWholeStageTransformerSuite with 
AdaptiveSparkPlanHelper {
 
   protected val rootPath: String = getClass.getResource("/").getPath
   override protected val resourcePath: String = "/tpch-data-parquet-velox"
@@ -703,6 +704,29 @@ class TestOperator extends VeloxWholeStageTransformerSuite 
{
     }
   }
 
+  test("combine small batches before shuffle") {
+    val minBatchSize = 15
+    withSQLConf(
+      "spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle" 
-> "true",
+      "spark.gluten.sql.columnar.maxBatchSize" -> "2",
+      "spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle" -> 
s"$minBatchSize"
+    ) {
+      val df = runQueryAndCompare(
+        "select l_orderkey, sum(l_partkey) as sum from lineitem " +
+          "where l_orderkey < 100 group by l_orderkey") { _ => }
+      checkLengthAndPlan(df, 27)
+      val ops = collect(df.queryExecution.executedPlan) { case p: 
VeloxAppendBatchesExec => p }
+      assert(ops.size == 1)
+      val op = ops.head
+      assert(op.minOutputBatchSize == minBatchSize)
+      val metrics = op.metrics
+      assert(metrics("numInputRows").value == 27)
+      assert(metrics("numInputBatches").value == 14)
+      assert(metrics("numOutputRows").value == 27)
+      assert(metrics("numOutputBatches").value == 2)
+    }
+  }
+
   test("test OneRowRelation") {
     val df = sql("SELECT 1")
     checkAnswer(df, Row(1))
diff --git a/cpp/core/jni/JniCommon.cc b/cpp/core/jni/JniCommon.cc
index 328a7b772..08c5cb1d4 100644
--- a/cpp/core/jni/JniCommon.cc
+++ b/cpp/core/jni/JniCommon.cc
@@ -65,3 +65,61 @@ gluten::Runtime* gluten::getRuntime(JNIEnv* env, jobject 
runtimeAware) {
   GLUTEN_CHECK(ctx != nullptr, "FATAL: resource instance should not be null.");
   return ctx;
 }
+
+std::unique_ptr<gluten::JniColumnarBatchIterator> 
gluten::makeJniColumnarBatchIterator(
+    JNIEnv* env,
+    jobject jColumnarBatchItr,
+    gluten::Runtime* runtime,
+    std::shared_ptr<ArrowWriter> writer) {
+  return std::make_unique<JniColumnarBatchIterator>(env, jColumnarBatchItr, 
runtime, writer);
+}
+
+gluten::JniColumnarBatchIterator::JniColumnarBatchIterator(
+    JNIEnv* env,
+    jobject jColumnarBatchItr,
+    gluten::Runtime* runtime,
+    std::shared_ptr<ArrowWriter> writer)
+    : runtime_(runtime), writer_(writer) {
+  // IMPORTANT: DO NOT USE LOCAL REF IN DIFFERENT THREAD
+  if (env->GetJavaVM(&vm_) != JNI_OK) {
+    std::string errorMessage = "Unable to get JavaVM instance";
+    throw gluten::GlutenException(errorMessage);
+  }
+  serializedColumnarBatchIteratorClass_ =
+      createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/ColumnarBatchInIterator;");
+  serializedColumnarBatchIteratorHasNext_ =
+      getMethodIdOrError(env, serializedColumnarBatchIteratorClass_, 
"hasNext", "()Z");
+  serializedColumnarBatchIteratorNext_ = getMethodIdOrError(env, 
serializedColumnarBatchIteratorClass_, "next", "()J");
+  jColumnarBatchItr_ = env->NewGlobalRef(jColumnarBatchItr);
+}
+
+gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() {
+  JNIEnv* env;
+  attachCurrentThreadAsDaemonOrThrow(vm_, &env);
+  env->DeleteGlobalRef(jColumnarBatchItr_);
+  env->DeleteGlobalRef(serializedColumnarBatchIteratorClass_);
+  vm_->DetachCurrentThread();
+}
+
+std::shared_ptr<gluten::ColumnarBatch> 
gluten::JniColumnarBatchIterator::next() {
+  JNIEnv* env;
+  attachCurrentThreadAsDaemonOrThrow(vm_, &env);
+  if (!env->CallBooleanMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorHasNext_)) {
+    checkException(env);
+    return nullptr; // stream ended
+  }
+
+  checkException(env);
+  jlong handle = env->CallLongMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorNext_);
+  checkException(env);
+  auto batch = runtime_->objectStore()->retrieve<ColumnarBatch>(handle);
+  if (writer_ != nullptr) {
+    // save snapshot of the batch to file
+    std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
+    std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
+    auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), 
schema.get()));
+    GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
+    GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
+  }
+  return batch;
+}
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index 5858a70e9..bc5cf84f6 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -28,6 +28,7 @@
 #include "memory/AllocationListener.h"
 #include "shuffle/rss/RssClient.h"
 #include "utils/Compression.h"
+#include "utils/ResourceMap.h"
 #include "utils/exception.h"
 
 static jint jniVersion = JNI_VERSION_1_8;
@@ -119,6 +120,12 @@ static inline void 
attachCurrentThreadAsDaemonOrThrow(JavaVM* vm, JNIEnv** out)
   }
 }
 
+template <typename T>
+static T* jniCastOrThrow(gluten::ResourceHandle handle) {
+  auto instance = reinterpret_cast<T*>(handle);
+  GLUTEN_CHECK(instance != nullptr, "FATAL: resource instance should not be 
null.");
+  return instance;
+}
 namespace gluten {
 
 class JniCommonState {
@@ -251,6 +258,40 @@ DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kLong, 
jlongArray, Long)
 DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kFloat, jfloatArray, Float)
 DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kDouble, jdoubleArray, Double)
 
+class JniColumnarBatchIterator : public ColumnarBatchIterator {
+ public:
+  explicit JniColumnarBatchIterator(
+      JNIEnv* env,
+      jobject jColumnarBatchItr,
+      Runtime* runtime,
+      std::shared_ptr<ArrowWriter> writer);
+
+  // singleton
+  JniColumnarBatchIterator(const JniColumnarBatchIterator&) = delete;
+  JniColumnarBatchIterator(JniColumnarBatchIterator&&) = delete;
+  JniColumnarBatchIterator& operator=(const JniColumnarBatchIterator&) = 
delete;
+  JniColumnarBatchIterator& operator=(JniColumnarBatchIterator&&) = delete;
+
+  virtual ~JniColumnarBatchIterator();
+
+  std::shared_ptr<ColumnarBatch> next() override;
+
+ private:
+  JavaVM* vm_;
+  jobject jColumnarBatchItr_;
+  Runtime* runtime_;
+  std::shared_ptr<ArrowWriter> writer_;
+
+  jclass serializedColumnarBatchIteratorClass_;
+  jmethodID serializedColumnarBatchIteratorHasNext_;
+  jmethodID serializedColumnarBatchIteratorNext_;
+};
+
+std::unique_ptr<JniColumnarBatchIterator> makeJniColumnarBatchIterator(
+    JNIEnv* env,
+    jobject jColumnarBatchItr,
+    Runtime* runtime,
+    std::shared_ptr<ArrowWriter> writer);
 } // namespace gluten
 
 // TODO: Move the static functions to namespace gluten
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index db498f43a..4e069ec7a 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -58,13 +58,8 @@ static jmethodID splitResultConstructor;
 static jclass columnarBatchSerializeResultClass;
 static jmethodID columnarBatchSerializeResultConstructor;
 
-static jclass serializedColumnarBatchIteratorClass;
 static jclass metricsBuilderClass;
 static jmethodID metricsBuilderConstructor;
-
-static jmethodID serializedColumnarBatchIteratorHasNext;
-static jmethodID serializedColumnarBatchIteratorNext;
-
 static jclass nativeColumnarToRowInfoClass;
 static jmethodID nativeColumnarToRowInfoConstructor;
 
@@ -147,80 +142,6 @@ class JavaInputStreamAdaptor final : public 
arrow::io::InputStream {
   bool closed_ = false;
 };
 
-class JniColumnarBatchIterator : public ColumnarBatchIterator {
- public:
-  explicit JniColumnarBatchIterator(
-      JNIEnv* env,
-      jobject jColumnarBatchItr,
-      Runtime* runtime,
-      std::shared_ptr<ArrowWriter> writer)
-      : runtime_(runtime), writer_(writer) {
-    // IMPORTANT: DO NOT USE LOCAL REF IN DIFFERENT THREAD
-    if (env->GetJavaVM(&vm_) != JNI_OK) {
-      std::string errorMessage = "Unable to get JavaVM instance";
-      throw gluten::GlutenException(errorMessage);
-    }
-    jColumnarBatchItr_ = env->NewGlobalRef(jColumnarBatchItr);
-  }
-
-  // singleton
-  JniColumnarBatchIterator(const JniColumnarBatchIterator&) = delete;
-  JniColumnarBatchIterator(JniColumnarBatchIterator&&) = delete;
-  JniColumnarBatchIterator& operator=(const JniColumnarBatchIterator&) = 
delete;
-  JniColumnarBatchIterator& operator=(JniColumnarBatchIterator&&) = delete;
-
-  virtual ~JniColumnarBatchIterator() {
-    JNIEnv* env;
-    attachCurrentThreadAsDaemonOrThrow(vm_, &env);
-    env->DeleteGlobalRef(jColumnarBatchItr_);
-    vm_->DetachCurrentThread();
-  }
-
-  std::shared_ptr<ColumnarBatch> next() override {
-    JNIEnv* env;
-    attachCurrentThreadAsDaemonOrThrow(vm_, &env);
-    if (!env->CallBooleanMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorHasNext)) {
-      checkException(env);
-      return nullptr; // stream ended
-    }
-
-    checkException(env);
-    jlong handle = env->CallLongMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorNext);
-    checkException(env);
-    auto batch = runtime_->objectStore()->retrieve<ColumnarBatch>(handle);
-    if (writer_ != nullptr) {
-      // save snapshot of the batch to file
-      std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
-      std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
-      auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), 
schema.get()));
-      GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
-      GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
-    }
-    return batch;
-  }
-
- private:
-  JavaVM* vm_;
-  jobject jColumnarBatchItr_;
-  Runtime* runtime_;
-  std::shared_ptr<ArrowWriter> writer_;
-};
-
-std::unique_ptr<JniColumnarBatchIterator> makeJniColumnarBatchIterator(
-    JNIEnv* env,
-    jobject jColumnarBatchItr,
-    Runtime* runtime,
-    std::shared_ptr<ArrowWriter> writer) {
-  return std::make_unique<JniColumnarBatchIterator>(env, jColumnarBatchItr, 
runtime, writer);
-}
-
-template <typename T>
-T* jniCastOrThrow(ResourceHandle handle) {
-  auto instance = reinterpret_cast<T*>(handle);
-  GLUTEN_CHECK(instance != nullptr, "FATAL: resource instance should not be 
null.");
-  return instance;
-}
-
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -253,14 +174,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
   metricsBuilderConstructor = getMethodIdOrError(
       env, metricsBuilderClass, "<init>", 
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
 
-  serializedColumnarBatchIteratorClass =
-      createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/ColumnarBatchInIterator;");
-
-  serializedColumnarBatchIteratorHasNext =
-      getMethodIdOrError(env, serializedColumnarBatchIteratorClass, "hasNext", 
"()Z");
-
-  serializedColumnarBatchIteratorNext = getMethodIdOrError(env, 
serializedColumnarBatchIteratorClass, "next", "()J");
-
   nativeColumnarToRowInfoClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
   nativeColumnarToRowInfoConstructor = getMethodIdOrError(env, 
nativeColumnarToRowInfoClass, "<init>", "([I[IJ)V");
@@ -293,7 +206,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   env->DeleteGlobalRef(jniByteInputStreamClass);
   env->DeleteGlobalRef(splitResultClass);
   env->DeleteGlobalRef(columnarBatchSerializeResultClass);
-  env->DeleteGlobalRef(serializedColumnarBatchIteratorClass);
   env->DeleteGlobalRef(nativeColumnarToRowInfoClass);
   env->DeleteGlobalRef(byteArrayClass);
   env->DeleteGlobalRef(shuffleReaderMetricsClass);
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 05ecf9635..34cc9001c 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -327,6 +327,7 @@ set(VELOX_SRCS
     utils/VeloxArrowUtils.cc
     utils/ConfigExtractor.cc
     utils/Common.cc
+    utils/VeloxBatchAppender.cc
     )
 
 if (ENABLE_HDFS)
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 9da7355d1..3b52eaa86 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -30,6 +30,7 @@
 #include "jni/JniFileSystem.h"
 #include "memory/VeloxMemoryManager.h"
 #include "substrait/SubstraitToVeloxPlanValidator.h"
+#include "utils/VeloxBatchAppender.h"
 #include "velox/common/base/BloomFilter.h"
 
 #include <iostream>
@@ -246,6 +247,23 @@ JNIEXPORT jbyteArray JNICALL 
Java_org_apache_gluten_utils_VeloxBloomFilterJniWra
   JNI_METHOD_END(nullptr)
 }
 
+JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_utils_VeloxBatchAppenderJniWrapper_create( // NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlong memoryManagerHandle,
+    jint minOutputBatchSize,
+    jobject jIter) {
+  JNI_METHOD_START
+  auto ctx = gluten::getRuntime(env, wrapper);
+  auto memoryManager = 
jniCastOrThrow<gluten::MemoryManager>(memoryManagerHandle);
+  auto pool = gluten::VeloxRuntime::getLeafVeloxPool(memoryManager);
+  auto iter = gluten::makeJniColumnarBatchIterator(env, jIter, ctx, nullptr);
+  auto appender = std::make_shared<gluten::ResultIterator>(
+      std::make_unique<gluten::VeloxBatchAppender>(pool.get(), 
minOutputBatchSize, std::move(iter)));
+  return ctx->objectStore()->save(appender);
+  JNI_METHOD_END(gluten::kInvalidResourceHandle)
+}
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
index cc648cf7f..741ca8ab9 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
@@ -303,17 +303,7 @@ arrow::Status 
VeloxHashBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
         numRows -= length;
       } while (numRows);
     } else {
-      if (accumulateRows_ + rv->size() < 8192) {
-        accumulateRows_ += rv->size();
-        initAccumulateDataset(rv);
-        accumulateDataset_->append(rv.get());
-      } else {
-        initAccumulateDataset(rv);
-        accumulateDataset_->append(rv.get());
-        RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), 
memLimit));
-        accumulateDataset_ = nullptr;
-        accumulateRows_ = 0;
-      }
+      RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit));
     }
   }
   return arrow::Status::OK();
@@ -339,10 +329,6 @@ arrow::Status 
VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
 }
 
 arrow::Status VeloxHashBasedShuffleWriter::stop() {
-  if (accumulateDataset_ != nullptr) {
-    RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), 
kMinMemLimit));
-    accumulateRows_ = 0;
-  }
   if (options_.partitioning != Partitioning::kSingle) {
     for (auto pid = 0; pid < numPartitions_; ++pid) {
       RETURN_NOT_OK(evictPartitionBuffers(pid, false));
diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h 
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
index 142c7978b..a11f84e95 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
@@ -303,15 +303,6 @@ class VeloxHashBasedShuffleWriter : public 
VeloxShuffleWriter {
 
   arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, 
int64_t memLimit);
 
-  void initAccumulateDataset(facebook::velox::RowVectorPtr& rv) {
-    if (accumulateDataset_) {
-      return;
-    }
-    std::vector<facebook::velox::VectorPtr> children(rv->children().size(), 
nullptr);
-    accumulateDataset_ =
-        std::make_shared<facebook::velox::RowVector>(veloxPool_.get(), 
rv->type(), nullptr, 0, std::move(children));
-  }
-
   BinaryArrayResizeState binaryArrayResizeState_{};
 
   bool hasComplexType_ = false;
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h 
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index 2855831c5..104b87616 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -124,10 +124,6 @@ class VeloxShuffleWriter : public ShuffleWriter {
 
   int32_t maxBatchSize_{0};
 
-  uint32_t accumulateRows_{0};
-
-  facebook::velox::RowVectorPtr accumulateDataset_;
-
   enum EvictState { kEvictable, kUnevictable };
 
   // stat
diff --git a/cpp/velox/utils/VeloxBatchAppender.cc 
b/cpp/velox/utils/VeloxBatchAppender.cc
new file mode 100644
index 000000000..8fa1ade21
--- /dev/null
+++ b/cpp/velox/utils/VeloxBatchAppender.cc
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "VeloxBatchAppender.h"
+
+namespace gluten {
+
+gluten::VeloxBatchAppender::VeloxBatchAppender(
+    facebook::velox::memory::MemoryPool* pool,
+    int32_t minOutputBatchSize,
+    std::unique_ptr<ColumnarBatchIterator> in)
+    : pool_(pool), minOutputBatchSize_(minOutputBatchSize), in_(std::move(in)) 
{}
+
+std::shared_ptr<ColumnarBatch> VeloxBatchAppender::next() {
+  auto cb = in_->next();
+  if (cb == nullptr) {
+    // Input iterator was drained.
+    return nullptr;
+  }
+  if (cb->numRows() >= minOutputBatchSize_) {
+    // Fast flush path.
+    return cb;
+  }
+
+  auto vb = VeloxColumnarBatch::from(pool_, cb);
+  auto rv = vb->getRowVector();
+  auto buffer = facebook::velox::RowVector::createEmpty(rv->type(), pool_);
+  buffer->append(rv.get());
+
+  for (auto nextCb = in_->next(); nextCb != nullptr; nextCb = in_->next()) {
+    auto nextVb = VeloxColumnarBatch::from(pool_, nextCb);
+    auto nextRv = nextVb->getRowVector();
+    buffer->append(nextRv.get());
+    if (buffer->size() >= minOutputBatchSize_) {
+      // Buffer is full.
+      break;
+    }
+  }
+  return std::make_shared<VeloxColumnarBatch>(buffer);
+}
+
+int64_t VeloxBatchAppender::spillFixedSize(int64_t size) {
+  return in_->spillFixedSize(size);
+}
+} // namespace gluten
diff --git a/cpp/velox/utils/VeloxBatchAppender.h 
b/cpp/velox/utils/VeloxBatchAppender.h
new file mode 100644
index 000000000..3698381d0
--- /dev/null
+++ b/cpp/velox/utils/VeloxBatchAppender.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "memory/ColumnarBatchIterator.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "utils/exception.h"
+#include "velox/common/memory/MemoryPool.h"
+#include "velox/vector/ComplexVector.h"
+
+namespace gluten {
+class VeloxBatchAppender : public ColumnarBatchIterator {
+ public:
+  VeloxBatchAppender(
+      facebook::velox::memory::MemoryPool* pool,
+      int32_t minOutputBatchSize,
+      std::unique_ptr<ColumnarBatchIterator> in);
+
+  std::shared_ptr<ColumnarBatch> next() override;
+
+  int64_t spillFixedSize(int64_t size) override;
+
+ private:
+  facebook::velox::memory::MemoryPool* pool_;
+  const int32_t minOutputBatchSize_;
+  std::unique_ptr<ColumnarBatchIterator> in_;
+};
+} // namespace gluten
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 8a086f896..8a1baae51 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -101,7 +101,7 @@ trait SparkPlanExecApi {
       aggregateExpressions: Seq[AggregateExpression],
       aggregateAttributes: Seq[Attribute]): HashAggregateExecPullOutBaseHelper
 
-  def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec, newChild: 
SparkPlan): SparkPlan
+  def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): SparkPlan
 
   /** Generate ShuffledHashJoinExecTransformer. */
   def genShuffledHashJoinExecTransformer(
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 39cc8ad2e..6e4d37f63 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -101,23 +101,17 @@ case class OffloadAggregate() extends OffloadSingleNode 
with LogLevelUtil {
 // Exchange transformation.
 case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case plan if TransformHints.isNotTransformable(plan) =>
-      plan
-    case plan: ShuffleExchangeExec =>
-      logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-      val child = plan.child
-      if (
-        (child.supportsColumnar || GlutenConfig.getConf.enablePreferColumnar) 
&&
-        BackendsApiManager.getSettings.supportColumnarShuffleExec()
-      ) {
-        
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(plan, 
child)
-      } else {
-        plan.withNewChildren(Seq(child))
-      }
-    case plan: BroadcastExchangeExec =>
-      val child = plan.child
-      logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-      ColumnarBroadcastExchangeExec(plan.mode, child)
+    case p if TransformHints.isNotTransformable(p) =>
+      p
+    case s: ShuffleExchangeExec
+        if (s.child.supportsColumnar || 
GlutenConfig.getConf.enablePreferColumnar) &&
+          BackendsApiManager.getSettings.supportColumnarShuffleExec() =>
+      logDebug(s"Columnar Processing for ${s.getClass} is currently 
supported.")
+      
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(s)
+    case b: BroadcastExchangeExec =>
+      val child = b.child
+      logDebug(s"Columnar Processing for ${b.getClass} is currently 
supported.")
+      ColumnarBroadcastExchangeExec(b.mode, child)
     case other => other
   }
 }
diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala 
b/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala
index 81ff2dc0b..1e3681355 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.utils
 
 import org.apache.spark.{InterruptibleIterator, TaskContext}
-import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.util.TaskResources
 
 import java.util.concurrent.TimeUnit
@@ -85,12 +84,12 @@ private class IteratorCompleter[A](in: 
Iterator[A])(completionCallback: => Unit)
   }
 }
 
-private class PipelineTimeAccumulator[A](in: Iterator[A], pipelineTime: 
SQLMetric)
+private class LifeTimeAccumulator[A](in: Iterator[A], onCollected: Long => 
Unit)
   extends Iterator[A] {
   private val closed = new AtomicBoolean(false)
   private val startTime = System.nanoTime()
 
-  TaskResources.addRecycler("Iterators#PipelineTimeAccumulator", 100) {
+  TaskResources.addRecycler("Iterators#LifeTimeAccumulator", 100) {
     tryFinish()
   }
 
@@ -111,9 +110,31 @@ private class PipelineTimeAccumulator[A](in: Iterator[A], 
pipelineTime: SQLMetri
     if (!closed.compareAndSet(false, true)) {
       return
     }
-    pipelineTime += TimeUnit.NANOSECONDS.toMillis(
+    val lifeTime = TimeUnit.NANOSECONDS.toMillis(
       System.nanoTime() - startTime
     )
+    onCollected(lifeTime)
+  }
+}
+
+private class ReadTimeAccumulator[A](in: Iterator[A], onAdded: Long => Unit) 
extends Iterator[A] {
+
+  override def hasNext: Boolean = {
+    val prev = System.nanoTime()
+    val out = in.hasNext
+    val after = System.nanoTime()
+    val duration = TimeUnit.NANOSECONDS.toMillis(after - prev)
+    onAdded(duration)
+    out
+  }
+
+  override def next(): A = {
+    val prev = System.nanoTime()
+    val out = in.next()
+    val after = System.nanoTime()
+    val duration = TimeUnit.NANOSECONDS.toMillis(after - prev)
+    onAdded(duration)
+    out
   }
 }
 
@@ -171,8 +192,13 @@ class WrapperBuilder[A](in: Iterator[A]) { // FIXME how to 
make the ctor compani
     this
   }
 
-  def addToPipelineTime(pipelineTime: SQLMetric): WrapperBuilder[A] = {
-    wrapped = new PipelineTimeAccumulator[A](wrapped, pipelineTime)
+  def collectLifeMillis(onCollected: Long => Unit): WrapperBuilder[A] = {
+    wrapped = new LifeTimeAccumulator[A](wrapped, onCollected)
+    this
+  }
+
+  def collectReadMillis(onAdded: Long => Unit): WrapperBuilder[A] = {
+    wrapped = new ReadTimeAccumulator[A](wrapped, onAdded)
     this
   }
 
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
index 37de98943..3a2a741be 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
@@ -31,8 +31,7 @@ public class ColumnarBatchOutIterator extends 
GeneralOutIterator implements Runt
   private final long iterHandle;
   private final NativeMemoryManager nmm;
 
-  public ColumnarBatchOutIterator(Runtime runtime, long iterHandle, 
NativeMemoryManager nmm)
-      throws IOException {
+  public ColumnarBatchOutIterator(Runtime runtime, long iterHandle, 
NativeMemoryManager nmm) {
     super();
     this.runtime = runtime;
     this.iterHandle = iterHandle;
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index d76e698dc..2376a1f39 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -187,6 +187,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def columnarShuffleCompressionThreshold: Int =
     conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD)
 
+  // FIXME: Not clear: MIN or MAX ?
   def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
 
   def shuffleWriterBufferSize: Int = conf
@@ -295,6 +296,14 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def veloxBloomFilterMaxNumBits: Long = 
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS)
 
+  def veloxCoalesceBatchesBeforeShuffle: Boolean =
+    conf.getConf(COLUMNAR_VELOX_COALESCE_BATCHES_BEFORE_SHUFFLE)
+
+  def veloxMinBatchSizeForShuffle: Int =
+    conf
+      .getConf(COLUMNAR_VELOX_MIN_BATCH_SIZE_FOR_SHUFFLE)
+      .getOrElse(conf.getConf(COLUMNAR_MAX_BATCH_SIZE))
+
   def chColumnarShufflePreferSpill: Boolean = 
conf.getConf(COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED)
 
   def chColumnarShuffleSpillThreshold: Long = {
@@ -1395,6 +1404,23 @@ object GlutenConfig {
       .checkValue(_ > 0, "must be a positive number")
       .createWithDefault(10000)
 
+  val COLUMNAR_VELOX_COALESCE_BATCHES_BEFORE_SHUFFLE =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle")
+      .internal()
+      .doc(s"If true, combine small columnar batches together before sending 
to shuffle. " +
+        s"The default minimum output batch size is equal to 
$GLUTEN_MAX_BATCH_SIZE_KEY")
+      .booleanConf
+      .createWithDefault(false)
+
+  val COLUMNAR_VELOX_MIN_BATCH_SIZE_FOR_SHUFFLE =
+    buildConf("spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle")
+      .internal()
+      .doc(s"The minimum batch size for shuffle. If the batch size is smaller 
than this value, " +
+        s"it will be combined with other batches before sending to shuffle. 
Only functions when " +
+        s"${COLUMNAR_VELOX_COALESCE_BATCHES_BEFORE_SHUFFLE.key} is set to 
true.")
+      .intConf
+      .createOptional
+
   val COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED =
     buildConf("spark.gluten.sql.columnar.backend.ch.shuffle.preferSpill")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to