This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1d0bb5224 [VL] Provide options to combine small batches before sending
to shuffle (#6009)
1d0bb5224 is described below
commit 1d0bb52248097cba721e850e83c0963971dbc81c
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jun 11 13:57:46 2024 +0800
[VL] Provide options to combine small batches before sending to shuffle
(#6009)
---
.../clickhouse/CHSparkPlanExecApi.scala | 9 +-
.../apache/gluten/utils/VeloxBatchAppender.java | 41 ++++++++
.../gluten/utils/VeloxBatchAppenderJniWrapper.java | 41 ++++++++
.../backendsapi/velox/VeloxIteratorApi.scala | 4 +-
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 44 +++++----
.../gluten/execution/VeloxAppendBatchesExec.scala | 105 +++++++++++++++++++++
.../org/apache/gluten/execution/TestOperator.scala | 26 ++++-
cpp/core/jni/JniCommon.cc | 58 ++++++++++++
cpp/core/jni/JniCommon.h | 41 ++++++++
cpp/core/jni/JniWrapper.cc | 88 -----------------
cpp/velox/CMakeLists.txt | 1 +
cpp/velox/jni/VeloxJniWrapper.cc | 18 ++++
cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc | 16 +---
cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h | 9 --
cpp/velox/shuffle/VeloxShuffleWriter.h | 4 -
cpp/velox/utils/VeloxBatchAppender.cc | 59 ++++++++++++
cpp/velox/utils/VeloxBatchAppender.h | 41 ++++++++
.../gluten/backendsapi/SparkPlanExecApi.scala | 2 +-
.../extension/columnar/OffloadSingleNode.scala | 28 +++---
.../scala/org/apache/gluten/utils/Iterators.scala | 38 ++++++--
.../vectorized/ColumnarBatchOutIterator.java | 3 +-
.../scala/org/apache/gluten/GlutenConfig.scala | 26 +++++
22 files changed, 532 insertions(+), 170 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index bdbdfed0d..a8a05c40f 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -271,13 +271,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
}
}
- override def genColumnarShuffleExchange(
- shuffle: ShuffleExchangeExec,
- child: SparkPlan): SparkPlan = {
+ override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec):
SparkPlan = {
+ val child = shuffle.child
if (
- BackendsApiManager.getSettings.supportShuffleWithProject(
- shuffle.outputPartitioning,
- shuffle.child)
+
BackendsApiManager.getSettings.supportShuffleWithProject(shuffle.outputPartitioning,
child)
) {
val (projectColumnNumber, newPartitioning, newChild) =
addProjectionForShuffleExchange(shuffle)
diff --git
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppender.java
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppender.java
new file mode 100644
index 000000000..1bf34b5ce
--- /dev/null
+++
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppender.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils;
+
+import org.apache.gluten.exec.Runtime;
+import org.apache.gluten.exec.Runtimes;
+import org.apache.gluten.memory.nmm.NativeMemoryManager;
+import org.apache.gluten.memory.nmm.NativeMemoryManagers;
+import org.apache.gluten.vectorized.ColumnarBatchInIterator;
+import org.apache.gluten.vectorized.ColumnarBatchOutIterator;
+
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+import java.util.Iterator;
+
+public final class VeloxBatchAppender {
+ public static ColumnarBatchOutIterator create(
+ int minOutputBatchSize, Iterator<ColumnarBatch> in) {
+ final Runtime runtime = Runtimes.contextInstance();
+ final NativeMemoryManager nmm =
NativeMemoryManagers.contextInstance("VeloxBatchAppender");
+ long outHandle =
+ VeloxBatchAppenderJniWrapper.forRuntime(runtime)
+ .create(
+ nmm.getNativeInstanceHandle(), minOutputBatchSize, new
ColumnarBatchInIterator(in));
+ return new ColumnarBatchOutIterator(runtime, outHandle, nmm);
+ }
+}
diff --git
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppenderJniWrapper.java
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppenderJniWrapper.java
new file mode 100644
index 000000000..9e2531951
--- /dev/null
+++
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchAppenderJniWrapper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils;
+
+import org.apache.gluten.exec.Runtime;
+import org.apache.gluten.exec.RuntimeAware;
+import org.apache.gluten.vectorized.ColumnarBatchInIterator;
+
+public class VeloxBatchAppenderJniWrapper implements RuntimeAware {
+ private final Runtime runtime;
+
+ private VeloxBatchAppenderJniWrapper(Runtime runtime) {
+ this.runtime = runtime;
+ }
+
+ public static VeloxBatchAppenderJniWrapper forRuntime(Runtime runtime) {
+ return new VeloxBatchAppenderJniWrapper(runtime);
+ }
+
+ @Override
+ public long handle() {
+ return runtime.getHandle();
+ }
+
+ public native long create(
+ long memoryManagerHandle, int minOutputBatchSize,
ColumnarBatchInIterator itr);
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index b20eccafb..459a7886e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -203,7 +203,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
resIter.close()
}
.recyclePayload(batch => batch.close())
- .addToPipelineTime(pipelineTime)
+ .collectLifeMillis(millis => pipelineTime += millis)
.asInterruptible(context)
.create()
}
@@ -246,7 +246,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
nativeResultIterator.close()
}
.recyclePayload(batch => batch.close())
- .addToPipelineTime(pipelineTime)
+ .collectLifeMillis(millis => pipelineTime += millis)
.create()
}
// scalastyle:on argcount
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index f8af80a9b..66ca8660a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -320,9 +320,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
aggregateAttributes: Seq[Attribute]): HashAggregateExecPullOutBaseHelper
=
HashAggregateExecPullOutHelper(aggregateExpressions, aggregateAttributes)
- override def genColumnarShuffleExchange(
- shuffle: ShuffleExchangeExec,
- newChild: SparkPlan): SparkPlan = {
+ override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec):
SparkPlan = {
def allowHashOnMap[T](f: => T): T = {
val originalAllowHash =
SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_HASH_ON_MAPTYPE)
try {
@@ -333,20 +331,28 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
}
}
+ def maybeAddAppendBatchesExec(plan: SparkPlan): SparkPlan = {
+ if (GlutenConfig.getConf.veloxCoalesceBatchesBeforeShuffle) {
+ VeloxAppendBatchesExec(plan,
GlutenConfig.getConf.veloxMinBatchSizeForShuffle)
+ } else {
+ plan
+ }
+ }
+
+ val child = shuffle.child
+
shuffle.outputPartitioning match {
case HashPartitioning(exprs, _) =>
val hashExpr = new Murmur3Hash(exprs)
- val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++
newChild.output
- val projectTransformer = ProjectExecTransformer(projectList, newChild)
+ val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++
child.output
+ val projectTransformer = ProjectExecTransformer(projectList, child)
val validationResult = projectTransformer.doValidate()
if (validationResult.isValid) {
- ColumnarShuffleExchangeExec(
- shuffle,
- projectTransformer,
- projectTransformer.output.drop(1))
+ val newChild = maybeAddAppendBatchesExec(projectTransformer)
+ ColumnarShuffleExchangeExec(shuffle, newChild,
newChild.output.drop(1))
} else {
TransformHints.tagNotTransformable(shuffle, validationResult)
- shuffle.withNewChildren(newChild :: Nil)
+ shuffle.withNewChildren(child :: Nil)
}
case RoundRobinPartitioning(num) if SQLConf.get.sortBeforeRepartition &&
num > 1 =>
// scalastyle:off line.size.limit
@@ -357,19 +363,20 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
allowHashOnMap {
// Velox hash expression does not support null type and we also do
not need to sort
// null type since the value always be null.
- val columnsForHash = newChild.output.filterNot(_.dataType ==
NullType)
+ val columnsForHash = child.output.filterNot(_.dataType == NullType)
if (columnsForHash.isEmpty) {
+ val newChild = maybeAddAppendBatchesExec(child)
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
} else {
val hashExpr = new Murmur3Hash(columnsForHash)
- val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++
newChild.output
- val projectTransformer = ProjectExecTransformer(projectList,
newChild)
+ val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++
child.output
+ val projectTransformer = ProjectExecTransformer(projectList, child)
val projectBeforeSortValidationResult =
projectTransformer.doValidate()
// Make sure we support offload hash expression
val projectBeforeSort = if
(projectBeforeSortValidationResult.isValid) {
projectTransformer
} else {
- val project = ProjectExec(projectList, newChild)
+ val project = ProjectExec(projectList, child)
TransformHints.tagNotTransformable(project,
projectBeforeSortValidationResult)
project
}
@@ -380,17 +387,16 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
ProjectExecTransformer(projectList.drop(1), sortByHashCode)
val validationResult = dropSortColumnTransformer.doValidate()
if (validationResult.isValid) {
- ColumnarShuffleExchangeExec(
- shuffle,
- dropSortColumnTransformer,
- dropSortColumnTransformer.output)
+ val newChild =
maybeAddAppendBatchesExec(dropSortColumnTransformer)
+ ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
} else {
TransformHints.tagNotTransformable(shuffle, validationResult)
- shuffle.withNewChildren(newChild :: Nil)
+ shuffle.withNewChildren(child :: Nil)
}
}
}
case _ =>
+ val newChild = maybeAddAppendBatchesExec(child)
ColumnarShuffleExchangeExec(shuffle, newChild, null)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala
new file mode 100644
index 000000000..8c2834574
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxAppendBatchesExec.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.utils.{Iterators, VeloxBatchAppender}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+
+/**
+ * An operator to coalesce input batches by appending the later batches to the
one that comes
+ * earlier.
+ */
+case class VeloxAppendBatchesExec(override val child: SparkPlan,
minOutputBatchSize: Int)
+ extends GlutenPlan
+ with UnaryExecNode {
+
+ override lazy val metrics: Map[String, SQLMetric] = Map(
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
input batches"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
output batches"),
+ "appendTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to
append batches")
+ )
+
+ override def supportsColumnar: Boolean = true
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val numInputRows = longMetric("numInputRows")
+ val numInputBatches = longMetric("numInputBatches")
+ val numOutputRows = longMetric("numOutputRows")
+ val numOutputBatches = longMetric("numOutputBatches")
+ val appendTime = longMetric("appendTime")
+
+ child.executeColumnar().mapPartitions {
+ in =>
+ // Append millis = Out millis - In millis.
+ val appendMillis = new AtomicLong(0L)
+
+ val appender = VeloxBatchAppender.create(
+ minOutputBatchSize,
+ Iterators
+ .wrap(in)
+ .collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis))
+ .create()
+ .map {
+ inBatch =>
+ numInputRows += inBatch.numRows()
+ numInputBatches += 1
+ inBatch
+ }
+ .asJava
+ )
+
+ val out = Iterators
+ .wrap(appender.asScala)
+ .collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis))
+ .recyclePayload(_.close())
+ .recycleIterator {
+ appender.close()
+ appendTime += appendMillis.get()
+ }
+ .create()
+ .map {
+ outBatch =>
+ numOutputRows += outBatch.numRows()
+ numOutputBatches += 1
+ outBatch
+ }
+
+ out
+ }
+ }
+
+ override def output: Seq[Attribute] = child.output
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+ override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ copy(child = newChild)
+}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index cd1f21a0a..ae8d64a09 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -24,6 +24,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -33,7 +34,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters
-class TestOperator extends VeloxWholeStageTransformerSuite {
+class TestOperator extends VeloxWholeStageTransformerSuite with
AdaptiveSparkPlanHelper {
protected val rootPath: String = getClass.getResource("/").getPath
override protected val resourcePath: String = "/tpch-data-parquet-velox"
@@ -703,6 +704,29 @@ class TestOperator extends VeloxWholeStageTransformerSuite
{
}
}
+ test("combine small batches before shuffle") {
+ val minBatchSize = 15
+ withSQLConf(
+ "spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle"
-> "true",
+ "spark.gluten.sql.columnar.maxBatchSize" -> "2",
+ "spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle" ->
s"$minBatchSize"
+ ) {
+ val df = runQueryAndCompare(
+ "select l_orderkey, sum(l_partkey) as sum from lineitem " +
+ "where l_orderkey < 100 group by l_orderkey") { _ => }
+ checkLengthAndPlan(df, 27)
+ val ops = collect(df.queryExecution.executedPlan) { case p:
VeloxAppendBatchesExec => p }
+ assert(ops.size == 1)
+ val op = ops.head
+ assert(op.minOutputBatchSize == minBatchSize)
+ val metrics = op.metrics
+ assert(metrics("numInputRows").value == 27)
+ assert(metrics("numInputBatches").value == 14)
+ assert(metrics("numOutputRows").value == 27)
+ assert(metrics("numOutputBatches").value == 2)
+ }
+ }
+
test("test OneRowRelation") {
val df = sql("SELECT 1")
checkAnswer(df, Row(1))
diff --git a/cpp/core/jni/JniCommon.cc b/cpp/core/jni/JniCommon.cc
index 328a7b772..08c5cb1d4 100644
--- a/cpp/core/jni/JniCommon.cc
+++ b/cpp/core/jni/JniCommon.cc
@@ -65,3 +65,61 @@ gluten::Runtime* gluten::getRuntime(JNIEnv* env, jobject
runtimeAware) {
GLUTEN_CHECK(ctx != nullptr, "FATAL: resource instance should not be null.");
return ctx;
}
+
+std::unique_ptr<gluten::JniColumnarBatchIterator>
gluten::makeJniColumnarBatchIterator(
+ JNIEnv* env,
+ jobject jColumnarBatchItr,
+ gluten::Runtime* runtime,
+ std::shared_ptr<ArrowWriter> writer) {
+ return std::make_unique<JniColumnarBatchIterator>(env, jColumnarBatchItr,
runtime, writer);
+}
+
+gluten::JniColumnarBatchIterator::JniColumnarBatchIterator(
+ JNIEnv* env,
+ jobject jColumnarBatchItr,
+ gluten::Runtime* runtime,
+ std::shared_ptr<ArrowWriter> writer)
+ : runtime_(runtime), writer_(writer) {
+ // IMPORTANT: DO NOT USE LOCAL REF IN DIFFERENT THREAD
+ if (env->GetJavaVM(&vm_) != JNI_OK) {
+ std::string errorMessage = "Unable to get JavaVM instance";
+ throw gluten::GlutenException(errorMessage);
+ }
+ serializedColumnarBatchIteratorClass_ =
+ createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/ColumnarBatchInIterator;");
+ serializedColumnarBatchIteratorHasNext_ =
+ getMethodIdOrError(env, serializedColumnarBatchIteratorClass_,
"hasNext", "()Z");
+ serializedColumnarBatchIteratorNext_ = getMethodIdOrError(env,
serializedColumnarBatchIteratorClass_, "next", "()J");
+ jColumnarBatchItr_ = env->NewGlobalRef(jColumnarBatchItr);
+}
+
+gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() {
+ JNIEnv* env;
+ attachCurrentThreadAsDaemonOrThrow(vm_, &env);
+ env->DeleteGlobalRef(jColumnarBatchItr_);
+ env->DeleteGlobalRef(serializedColumnarBatchIteratorClass_);
+ vm_->DetachCurrentThread();
+}
+
+std::shared_ptr<gluten::ColumnarBatch>
gluten::JniColumnarBatchIterator::next() {
+ JNIEnv* env;
+ attachCurrentThreadAsDaemonOrThrow(vm_, &env);
+ if (!env->CallBooleanMethod(jColumnarBatchItr_,
serializedColumnarBatchIteratorHasNext_)) {
+ checkException(env);
+ return nullptr; // stream ended
+ }
+
+ checkException(env);
+ jlong handle = env->CallLongMethod(jColumnarBatchItr_,
serializedColumnarBatchIteratorNext_);
+ checkException(env);
+ auto batch = runtime_->objectStore()->retrieve<ColumnarBatch>(handle);
+ if (writer_ != nullptr) {
+ // save snapshot of the batch to file
+ std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
+ std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
+ auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(),
schema.get()));
+ GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
+ GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
+ }
+ return batch;
+}
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index 5858a70e9..bc5cf84f6 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -28,6 +28,7 @@
#include "memory/AllocationListener.h"
#include "shuffle/rss/RssClient.h"
#include "utils/Compression.h"
+#include "utils/ResourceMap.h"
#include "utils/exception.h"
static jint jniVersion = JNI_VERSION_1_8;
@@ -119,6 +120,12 @@ static inline void
attachCurrentThreadAsDaemonOrThrow(JavaVM* vm, JNIEnv** out)
}
}
+template <typename T>
+static T* jniCastOrThrow(gluten::ResourceHandle handle) {
+ auto instance = reinterpret_cast<T*>(handle);
+ GLUTEN_CHECK(instance != nullptr, "FATAL: resource instance should not be
null.");
+ return instance;
+}
namespace gluten {
class JniCommonState {
@@ -251,6 +258,40 @@ DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kLong,
jlongArray, Long)
DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kFloat, jfloatArray, Float)
DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kDouble, jdoubleArray, Double)
+class JniColumnarBatchIterator : public ColumnarBatchIterator {
+ public:
+ explicit JniColumnarBatchIterator(
+ JNIEnv* env,
+ jobject jColumnarBatchItr,
+ Runtime* runtime,
+ std::shared_ptr<ArrowWriter> writer);
+
+ // singleton
+ JniColumnarBatchIterator(const JniColumnarBatchIterator&) = delete;
+ JniColumnarBatchIterator(JniColumnarBatchIterator&&) = delete;
+ JniColumnarBatchIterator& operator=(const JniColumnarBatchIterator&) =
delete;
+ JniColumnarBatchIterator& operator=(JniColumnarBatchIterator&&) = delete;
+
+ virtual ~JniColumnarBatchIterator();
+
+ std::shared_ptr<ColumnarBatch> next() override;
+
+ private:
+ JavaVM* vm_;
+ jobject jColumnarBatchItr_;
+ Runtime* runtime_;
+ std::shared_ptr<ArrowWriter> writer_;
+
+ jclass serializedColumnarBatchIteratorClass_;
+ jmethodID serializedColumnarBatchIteratorHasNext_;
+ jmethodID serializedColumnarBatchIteratorNext_;
+};
+
+std::unique_ptr<JniColumnarBatchIterator> makeJniColumnarBatchIterator(
+ JNIEnv* env,
+ jobject jColumnarBatchItr,
+ Runtime* runtime,
+ std::shared_ptr<ArrowWriter> writer);
} // namespace gluten
// TODO: Move the static functions to namespace gluten
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index db498f43a..4e069ec7a 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -58,13 +58,8 @@ static jmethodID splitResultConstructor;
static jclass columnarBatchSerializeResultClass;
static jmethodID columnarBatchSerializeResultConstructor;
-static jclass serializedColumnarBatchIteratorClass;
static jclass metricsBuilderClass;
static jmethodID metricsBuilderConstructor;
-
-static jmethodID serializedColumnarBatchIteratorHasNext;
-static jmethodID serializedColumnarBatchIteratorNext;
-
static jclass nativeColumnarToRowInfoClass;
static jmethodID nativeColumnarToRowInfoConstructor;
@@ -147,80 +142,6 @@ class JavaInputStreamAdaptor final : public
arrow::io::InputStream {
bool closed_ = false;
};
-class JniColumnarBatchIterator : public ColumnarBatchIterator {
- public:
- explicit JniColumnarBatchIterator(
- JNIEnv* env,
- jobject jColumnarBatchItr,
- Runtime* runtime,
- std::shared_ptr<ArrowWriter> writer)
- : runtime_(runtime), writer_(writer) {
- // IMPORTANT: DO NOT USE LOCAL REF IN DIFFERENT THREAD
- if (env->GetJavaVM(&vm_) != JNI_OK) {
- std::string errorMessage = "Unable to get JavaVM instance";
- throw gluten::GlutenException(errorMessage);
- }
- jColumnarBatchItr_ = env->NewGlobalRef(jColumnarBatchItr);
- }
-
- // singleton
- JniColumnarBatchIterator(const JniColumnarBatchIterator&) = delete;
- JniColumnarBatchIterator(JniColumnarBatchIterator&&) = delete;
- JniColumnarBatchIterator& operator=(const JniColumnarBatchIterator&) =
delete;
- JniColumnarBatchIterator& operator=(JniColumnarBatchIterator&&) = delete;
-
- virtual ~JniColumnarBatchIterator() {
- JNIEnv* env;
- attachCurrentThreadAsDaemonOrThrow(vm_, &env);
- env->DeleteGlobalRef(jColumnarBatchItr_);
- vm_->DetachCurrentThread();
- }
-
- std::shared_ptr<ColumnarBatch> next() override {
- JNIEnv* env;
- attachCurrentThreadAsDaemonOrThrow(vm_, &env);
- if (!env->CallBooleanMethod(jColumnarBatchItr_,
serializedColumnarBatchIteratorHasNext)) {
- checkException(env);
- return nullptr; // stream ended
- }
-
- checkException(env);
- jlong handle = env->CallLongMethod(jColumnarBatchItr_,
serializedColumnarBatchIteratorNext);
- checkException(env);
- auto batch = runtime_->objectStore()->retrieve<ColumnarBatch>(handle);
- if (writer_ != nullptr) {
- // save snapshot of the batch to file
- std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
- std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
- auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(),
schema.get()));
- GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
- GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
- }
- return batch;
- }
-
- private:
- JavaVM* vm_;
- jobject jColumnarBatchItr_;
- Runtime* runtime_;
- std::shared_ptr<ArrowWriter> writer_;
-};
-
-std::unique_ptr<JniColumnarBatchIterator> makeJniColumnarBatchIterator(
- JNIEnv* env,
- jobject jColumnarBatchItr,
- Runtime* runtime,
- std::shared_ptr<ArrowWriter> writer) {
- return std::make_unique<JniColumnarBatchIterator>(env, jColumnarBatchItr,
runtime, writer);
-}
-
-template <typename T>
-T* jniCastOrThrow(ResourceHandle handle) {
- auto instance = reinterpret_cast<T*>(handle);
- GLUTEN_CHECK(instance != nullptr, "FATAL: resource instance should not be
null.");
- return instance;
-}
-
#ifdef __cplusplus
extern "C" {
#endif
@@ -253,14 +174,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
metricsBuilderConstructor = getMethodIdOrError(
env, metricsBuilderClass, "<init>",
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
- serializedColumnarBatchIteratorClass =
- createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/ColumnarBatchInIterator;");
-
- serializedColumnarBatchIteratorHasNext =
- getMethodIdOrError(env, serializedColumnarBatchIteratorClass, "hasNext",
"()Z");
-
- serializedColumnarBatchIteratorNext = getMethodIdOrError(env,
serializedColumnarBatchIteratorClass, "next", "()J");
-
nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
nativeColumnarToRowInfoConstructor = getMethodIdOrError(env,
nativeColumnarToRowInfoClass, "<init>", "([I[IJ)V");
@@ -293,7 +206,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
env->DeleteGlobalRef(jniByteInputStreamClass);
env->DeleteGlobalRef(splitResultClass);
env->DeleteGlobalRef(columnarBatchSerializeResultClass);
- env->DeleteGlobalRef(serializedColumnarBatchIteratorClass);
env->DeleteGlobalRef(nativeColumnarToRowInfoClass);
env->DeleteGlobalRef(byteArrayClass);
env->DeleteGlobalRef(shuffleReaderMetricsClass);
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 05ecf9635..34cc9001c 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -327,6 +327,7 @@ set(VELOX_SRCS
utils/VeloxArrowUtils.cc
utils/ConfigExtractor.cc
utils/Common.cc
+ utils/VeloxBatchAppender.cc
)
if (ENABLE_HDFS)
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 9da7355d1..3b52eaa86 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -30,6 +30,7 @@
#include "jni/JniFileSystem.h"
#include "memory/VeloxMemoryManager.h"
#include "substrait/SubstraitToVeloxPlanValidator.h"
+#include "utils/VeloxBatchAppender.h"
#include "velox/common/base/BloomFilter.h"
#include <iostream>
@@ -246,6 +247,23 @@ JNIEXPORT jbyteArray JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWra
JNI_METHOD_END(nullptr)
}
+JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_VeloxBatchAppenderJniWrapper_create( // NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong memoryManagerHandle,
+ jint minOutputBatchSize,
+ jobject jIter) {
+ JNI_METHOD_START
+ auto ctx = gluten::getRuntime(env, wrapper);
+ auto memoryManager =
jniCastOrThrow<gluten::MemoryManager>(memoryManagerHandle);
+ auto pool = gluten::VeloxRuntime::getLeafVeloxPool(memoryManager);
+ auto iter = gluten::makeJniColumnarBatchIterator(env, jIter, ctx, nullptr);
+ auto appender = std::make_shared<gluten::ResultIterator>(
+ std::make_unique<gluten::VeloxBatchAppender>(pool.get(),
minOutputBatchSize, std::move(iter)));
+ return ctx->objectStore()->save(appender);
+ JNI_METHOD_END(gluten::kInvalidResourceHandle)
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
index cc648cf7f..741ca8ab9 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
@@ -303,17 +303,7 @@ arrow::Status
VeloxHashBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
numRows -= length;
} while (numRows);
} else {
- if (accumulateRows_ + rv->size() < 8192) {
- accumulateRows_ += rv->size();
- initAccumulateDataset(rv);
- accumulateDataset_->append(rv.get());
- } else {
- initAccumulateDataset(rv);
- accumulateDataset_->append(rv.get());
- RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_),
memLimit));
- accumulateDataset_ = nullptr;
- accumulateRows_ = 0;
- }
+ RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit));
}
}
return arrow::Status::OK();
@@ -339,10 +329,6 @@ arrow::Status
VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
}
arrow::Status VeloxHashBasedShuffleWriter::stop() {
- if (accumulateDataset_ != nullptr) {
- RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_),
kMinMemLimit));
- accumulateRows_ = 0;
- }
if (options_.partitioning != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
index 142c7978b..a11f84e95 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
@@ -303,15 +303,6 @@ class VeloxHashBasedShuffleWriter : public
VeloxShuffleWriter {
arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv,
int64_t memLimit);
- void initAccumulateDataset(facebook::velox::RowVectorPtr& rv) {
- if (accumulateDataset_) {
- return;
- }
- std::vector<facebook::velox::VectorPtr> children(rv->children().size(),
nullptr);
- accumulateDataset_ =
- std::make_shared<facebook::velox::RowVector>(veloxPool_.get(),
rv->type(), nullptr, 0, std::move(children));
- }
-
BinaryArrayResizeState binaryArrayResizeState_{};
bool hasComplexType_ = false;
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index 2855831c5..104b87616 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -124,10 +124,6 @@ class VeloxShuffleWriter : public ShuffleWriter {
int32_t maxBatchSize_{0};
- uint32_t accumulateRows_{0};
-
- facebook::velox::RowVectorPtr accumulateDataset_;
-
enum EvictState { kEvictable, kUnevictable };
// stat
diff --git a/cpp/velox/utils/VeloxBatchAppender.cc
b/cpp/velox/utils/VeloxBatchAppender.cc
new file mode 100644
index 000000000..8fa1ade21
--- /dev/null
+++ b/cpp/velox/utils/VeloxBatchAppender.cc
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "VeloxBatchAppender.h"
+
+namespace gluten {
+
+gluten::VeloxBatchAppender::VeloxBatchAppender(
+ facebook::velox::memory::MemoryPool* pool,
+ int32_t minOutputBatchSize,
+ std::unique_ptr<ColumnarBatchIterator> in)
+ : pool_(pool), minOutputBatchSize_(minOutputBatchSize), in_(std::move(in))
{}
+
+std::shared_ptr<ColumnarBatch> VeloxBatchAppender::next() {
+ auto cb = in_->next();
+ if (cb == nullptr) {
+ // Input iterator was drained.
+ return nullptr;
+ }
+ if (cb->numRows() >= minOutputBatchSize_) {
+ // Fast flush path.
+ return cb;
+ }
+
+ auto vb = VeloxColumnarBatch::from(pool_, cb);
+ auto rv = vb->getRowVector();
+ auto buffer = facebook::velox::RowVector::createEmpty(rv->type(), pool_);
+ buffer->append(rv.get());
+
+ for (auto nextCb = in_->next(); nextCb != nullptr; nextCb = in_->next()) {
+ auto nextVb = VeloxColumnarBatch::from(pool_, nextCb);
+ auto nextRv = nextVb->getRowVector();
+ buffer->append(nextRv.get());
+ if (buffer->size() >= minOutputBatchSize_) {
+ // Buffer is full.
+ break;
+ }
+ }
+ return std::make_shared<VeloxColumnarBatch>(buffer);
+}
+
+int64_t VeloxBatchAppender::spillFixedSize(int64_t size) {
+ return in_->spillFixedSize(size);
+}
+} // namespace gluten
diff --git a/cpp/velox/utils/VeloxBatchAppender.h
b/cpp/velox/utils/VeloxBatchAppender.h
new file mode 100644
index 000000000..3698381d0
--- /dev/null
+++ b/cpp/velox/utils/VeloxBatchAppender.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "memory/ColumnarBatchIterator.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "utils/exception.h"
+#include "velox/common/memory/MemoryPool.h"
+#include "velox/vector/ComplexVector.h"
+
+namespace gluten {
+class VeloxBatchAppender : public ColumnarBatchIterator {
+ public:
+ VeloxBatchAppender(
+ facebook::velox::memory::MemoryPool* pool,
+ int32_t minOutputBatchSize,
+ std::unique_ptr<ColumnarBatchIterator> in);
+
+ std::shared_ptr<ColumnarBatch> next() override;
+
+ int64_t spillFixedSize(int64_t size) override;
+
+ private:
+ facebook::velox::memory::MemoryPool* pool_;
+ const int32_t minOutputBatchSize_;
+ std::unique_ptr<ColumnarBatchIterator> in_;
+};
+} // namespace gluten
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 8a086f896..8a1baae51 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -101,7 +101,7 @@ trait SparkPlanExecApi {
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute]): HashAggregateExecPullOutBaseHelper
- def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec, newChild:
SparkPlan): SparkPlan
+ def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): SparkPlan
/** Generate ShuffledHashJoinExecTransformer. */
def genShuffledHashJoinExecTransformer(
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 39cc8ad2e..6e4d37f63 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -101,23 +101,17 @@ case class OffloadAggregate() extends OffloadSingleNode
with LogLevelUtil {
// Exchange transformation.
case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil {
override def offload(plan: SparkPlan): SparkPlan = plan match {
- case plan if TransformHints.isNotTransformable(plan) =>
- plan
- case plan: ShuffleExchangeExec =>
- logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
- val child = plan.child
- if (
- (child.supportsColumnar || GlutenConfig.getConf.enablePreferColumnar)
&&
- BackendsApiManager.getSettings.supportColumnarShuffleExec()
- ) {
-
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(plan,
child)
- } else {
- plan.withNewChildren(Seq(child))
- }
- case plan: BroadcastExchangeExec =>
- val child = plan.child
- logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
- ColumnarBroadcastExchangeExec(plan.mode, child)
+ case p if TransformHints.isNotTransformable(p) =>
+ p
+ case s: ShuffleExchangeExec
+ if (s.child.supportsColumnar ||
GlutenConfig.getConf.enablePreferColumnar) &&
+ BackendsApiManager.getSettings.supportColumnarShuffleExec() =>
+ logDebug(s"Columnar Processing for ${s.getClass} is currently
supported.")
+
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(s)
+ case b: BroadcastExchangeExec =>
+ val child = b.child
+ logDebug(s"Columnar Processing for ${b.getClass} is currently
supported.")
+ ColumnarBroadcastExchangeExec(b.mode, child)
case other => other
}
}
diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala
b/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala
index 81ff2dc0b..1e3681355 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/Iterators.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.utils
import org.apache.spark.{InterruptibleIterator, TaskContext}
-import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.util.TaskResources
import java.util.concurrent.TimeUnit
@@ -85,12 +84,12 @@ private class IteratorCompleter[A](in:
Iterator[A])(completionCallback: => Unit)
}
}
-private class PipelineTimeAccumulator[A](in: Iterator[A], pipelineTime:
SQLMetric)
+private class LifeTimeAccumulator[A](in: Iterator[A], onCollected: Long =>
Unit)
extends Iterator[A] {
private val closed = new AtomicBoolean(false)
private val startTime = System.nanoTime()
- TaskResources.addRecycler("Iterators#PipelineTimeAccumulator", 100) {
+ TaskResources.addRecycler("Iterators#LifeTimeAccumulator", 100) {
tryFinish()
}
@@ -111,9 +110,31 @@ private class PipelineTimeAccumulator[A](in: Iterator[A],
pipelineTime: SQLMetri
if (!closed.compareAndSet(false, true)) {
return
}
- pipelineTime += TimeUnit.NANOSECONDS.toMillis(
+ val lifeTime = TimeUnit.NANOSECONDS.toMillis(
System.nanoTime() - startTime
)
+ onCollected(lifeTime)
+ }
+}
+
+private class ReadTimeAccumulator[A](in: Iterator[A], onAdded: Long => Unit)
extends Iterator[A] {
+
+ override def hasNext: Boolean = {
+ val prev = System.nanoTime()
+ val out = in.hasNext
+ val after = System.nanoTime()
+ val duration = TimeUnit.NANOSECONDS.toMillis(after - prev)
+ onAdded(duration)
+ out
+ }
+
+ override def next(): A = {
+ val prev = System.nanoTime()
+ val out = in.next()
+ val after = System.nanoTime()
+ val duration = TimeUnit.NANOSECONDS.toMillis(after - prev)
+ onAdded(duration)
+ out
}
}
@@ -171,8 +192,13 @@ class WrapperBuilder[A](in: Iterator[A]) { // FIXME how to
make the ctor compani
this
}
- def addToPipelineTime(pipelineTime: SQLMetric): WrapperBuilder[A] = {
- wrapped = new PipelineTimeAccumulator[A](wrapped, pipelineTime)
+ def collectLifeMillis(onCollected: Long => Unit): WrapperBuilder[A] = {
+ wrapped = new LifeTimeAccumulator[A](wrapped, onCollected)
+ this
+ }
+
+ def collectReadMillis(onAdded: Long => Unit): WrapperBuilder[A] = {
+ wrapped = new ReadTimeAccumulator[A](wrapped, onAdded)
this
}
diff --git
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
index 37de98943..3a2a741be 100644
---
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
+++
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
@@ -31,8 +31,7 @@ public class ColumnarBatchOutIterator extends
GeneralOutIterator implements Runt
private final long iterHandle;
private final NativeMemoryManager nmm;
- public ColumnarBatchOutIterator(Runtime runtime, long iterHandle,
NativeMemoryManager nmm)
- throws IOException {
+ public ColumnarBatchOutIterator(Runtime runtime, long iterHandle,
NativeMemoryManager nmm) {
super();
this.runtime = runtime;
this.iterHandle = iterHandle;
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index d76e698dc..2376a1f39 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -187,6 +187,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def columnarShuffleCompressionThreshold: Int =
conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD)
+ // FIXME: Not clear: MIN or MAX ?
def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
def shuffleWriterBufferSize: Int = conf
@@ -295,6 +296,14 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def veloxBloomFilterMaxNumBits: Long =
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS)
+ def veloxCoalesceBatchesBeforeShuffle: Boolean =
+ conf.getConf(COLUMNAR_VELOX_COALESCE_BATCHES_BEFORE_SHUFFLE)
+
+ def veloxMinBatchSizeForShuffle: Int =
+ conf
+ .getConf(COLUMNAR_VELOX_MIN_BATCH_SIZE_FOR_SHUFFLE)
+ .getOrElse(conf.getConf(COLUMNAR_MAX_BATCH_SIZE))
+
def chColumnarShufflePreferSpill: Boolean =
conf.getConf(COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED)
def chColumnarShuffleSpillThreshold: Long = {
@@ -1395,6 +1404,23 @@ object GlutenConfig {
.checkValue(_ > 0, "must be a positive number")
.createWithDefault(10000)
+ val COLUMNAR_VELOX_COALESCE_BATCHES_BEFORE_SHUFFLE =
+
buildConf("spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle")
+ .internal()
+ .doc(s"If true, combine small columnar batches together before sending
to shuffle. " +
+ s"The default minimum output batch size is equal to
$GLUTEN_MAX_BATCH_SIZE_KEY")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COLUMNAR_VELOX_MIN_BATCH_SIZE_FOR_SHUFFLE =
+ buildConf("spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle")
+ .internal()
+ .doc(s"The minimum batch size for shuffle. If the batch size is smaller
than this value, " +
+ s"it will be combined with other batches before sending to shuffle.
Only functions when " +
+ s"${COLUMNAR_VELOX_COALESCE_BATCHES_BEFORE_SHUFFLE.key} is set to
true.")
+ .intConf
+ .createOptional
+
val COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED =
buildConf("spark.gluten.sql.columnar.backend.ch.shuffle.preferSpill")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]