This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f7b402751 [VL] Add conf option to limit C2R memory usage (#5952)
f7b402751 is described below
commit f7b4027510d32fb889c926d1a9edea700c98d309
Author: YunDa <[email protected]>
AuthorDate: Tue Aug 6 12:14:14 2024 +0800
[VL] Add conf option to limit C2R memory usage (#5952)
Co-authored-by: BInwei Yang <[email protected]>
Co-authored-by: Hongze Zhang <[email protected]>
---
.../gluten/execution/VeloxColumnarToRowExec.scala | 14 ++++--
.../apache/gluten/execution/VeloxTPCHSuite.scala | 3 ++
cpp/core/compute/Runtime.h | 2 +-
cpp/core/config/GlutenConfig.h | 3 ++
cpp/core/jni/JniWrapper.cc | 29 ++++++++++--
cpp/core/operators/c2r/ColumnarToRow.h | 10 ++++-
cpp/velox/benchmarks/ColumnarToRowBenchmark.cc | 4 +-
cpp/velox/compute/VeloxRuntime.cc | 4 +-
cpp/velox/compute/VeloxRuntime.h | 2 +-
.../serializer/VeloxColumnarToRowConverter.cc | 51 ++++++++++++++--------
.../serializer/VeloxColumnarToRowConverter.h | 11 +++--
cpp/velox/tests/RuntimeTest.cc | 2 +-
cpp/velox/tests/VeloxColumnarToRowTest.cc | 2 +-
cpp/velox/tests/VeloxRowToColumnarTest.cc | 2 +-
.../vectorized/NativeColumnarToRowJniWrapper.java | 4 +-
.../sql/execution/ColumnarBuildSideRelation.scala | 16 +++++--
.../spark/sql/execution/utils/ExecUtil.scala | 10 ++++-
.../scala/org/apache/gluten/GlutenConfig.scala | 15 ++++++-
18 files changed, 136 insertions(+), 48 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
index 2c46893e4..993a888b9 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
@@ -147,13 +147,14 @@ object VeloxColumnarToRowExec {
val rows = batch.numRows()
val beforeConvert = System.currentTimeMillis()
val batchHandle = ColumnarBatches.getNativeHandle(batch)
- val info =
- jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle)
+ var info =
+ jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0)
convertTime += (System.currentTimeMillis() - beforeConvert)
new Iterator[InternalRow] {
var rowId = 0
+ var baseLength = 0
val row = new UnsafeRow(cols)
override def hasNext: Boolean = {
@@ -161,7 +162,14 @@ object VeloxColumnarToRowExec {
}
override def next: UnsafeRow = {
- val (offset, length) = (info.offsets(rowId), info.lengths(rowId))
+ if (rowId == baseLength + info.lengths.length) {
+ baseLength += info.lengths.length
+ val before = System.currentTimeMillis()
+ info = jniWrapper.nativeColumnarToRowConvert(c2rId,
batchHandle, rowId)
+ convertTime += (System.currentTimeMillis() - before)
+ }
+ val (offset, length) =
+ (info.offsets(rowId - baseLength), info.lengths(rowId -
baseLength))
row.pointTo(null, info.memoryAddress + offset, length)
rowId += 1
row
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
index 798cea8cb..22f96bbbc 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
@@ -16,6 +16,8 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.GlutenConfig
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, TestUtils}
import org.apache.spark.sql.execution.FormattedMode
@@ -253,6 +255,7 @@ class VeloxTPCHDistinctSpillSuite extends
VeloxTPCHTableSupport {
super.sparkConf
.set("spark.memory.offHeap.size", "50m")
.set("spark.gluten.memory.overAcquiredMemoryRatio", "0.9") // to trigger
distinct spill early
+ .set(GlutenConfig.GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY, "8k")
}
test("distinct spill") {
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index fb501dc9a..8bdf95cd7 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -97,7 +97,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
/// This function is used to create certain converter from the format used by
/// the backend to Spark unsafe row.
- virtual std::shared_ptr<ColumnarToRowConverter>
createColumnar2RowConverter() = 0;
+ virtual std::shared_ptr<ColumnarToRowConverter>
createColumnar2RowConverter(int64_t column2RowMemThreshold) = 0;
virtual std::shared_ptr<RowToColumnarConverter>
createRow2ColumnarConverter(struct ArrowSchema* cSchema) = 0;
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 060bbe111..e4f5a884b 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -56,6 +56,9 @@ const std::string kGzipWindowSize4k = "4096";
const std::string kParquetCompressionCodec =
"spark.sql.parquet.compression.codec";
+const std::string kColumnarToRowMemoryThreshold =
"spark.gluten.sql.columnarToRowMemoryThreshold";
+const std::string kColumnarToRowMemoryDefaultThreshold = "67108864"; // 64MB
+
const std::string kUGIUserName = "spark.gluten.ugi.username";
const std::string kUGITokens = "spark.gluten.ugi.tokens";
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index add4aa54d..60f367fd7 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -16,6 +16,8 @@
*/
#include <jni.h>
+#include <algorithm>
+#include <cstdint>
#include <filesystem>
#include "compute/Runtime.h"
@@ -27,6 +29,7 @@
#include <arrow/c/bridge.h>
#include <optional>
+#include <string>
#include "memory/AllocationListener.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "shuffle/LocalPartitionWriter.h"
@@ -528,8 +531,24 @@
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarTo
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
+ auto& conf = ctx->getConfMap();
+ int64_t column2RowMemThreshold;
+ auto it = conf.find(kColumnarToRowMemoryThreshold);
+ bool confIsLegal =
+ ((it == conf.end()) ? false : std::all_of(it->second.begin(),
it->second.end(), [](unsigned char c) {
+ return std::isdigit(c);
+ }));
+ if (confIsLegal) {
+ column2RowMemThreshold = std::stoll(it->second);
+ } else {
+ LOG(INFO)
+ << "Because the spark.gluten.sql.columnarToRowMemoryThreshold
configuration item is invalid, the kColumnarToRowMemoryDefaultThreshold default
value is used, which is "
+ << kColumnarToRowMemoryDefaultThreshold << " byte";
+ column2RowMemThreshold = std::stoll(kColumnarToRowMemoryDefaultThreshold);
+ }
+
// Convert the native batch to Spark unsafe row.
- return ctx->saveObject(ctx->createColumnar2RowConverter());
+ return
ctx->saveObject(ctx->createColumnar2RowConverter(column2RowMemThreshold));
JNI_METHOD_END(kInvalidObjectHandle)
}
@@ -538,16 +557,18 @@
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarTo
JNIEnv* env,
jobject wrapper,
jlong c2rHandle,
- jlong batchHandle) {
+ jlong batchHandle,
+ jlong startRow) {
JNI_METHOD_START
auto columnarToRowConverter =
ObjectStore::retrieve<ColumnarToRowConverter>(c2rHandle);
auto cb = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
- columnarToRowConverter->convert(cb);
+
+ columnarToRowConverter->convert(cb, startRow);
const auto& offsets = columnarToRowConverter->getOffsets();
const auto& lengths = columnarToRowConverter->getLengths();
- auto numRows = cb->numRows();
+ auto numRows = columnarToRowConverter->numRows();
auto offsetsArr = env->NewIntArray(numRows);
auto offsetsSrc = reinterpret_cast<const jint*>(offsets.data());
diff --git a/cpp/core/operators/c2r/ColumnarToRow.h
b/cpp/core/operators/c2r/ColumnarToRow.h
index edee31249..062a863f7 100644
--- a/cpp/core/operators/c2r/ColumnarToRow.h
+++ b/cpp/core/operators/c2r/ColumnarToRow.h
@@ -17,6 +17,7 @@
#pragma once
+#include <cstdint>
#include "memory/ColumnarBatch.h"
namespace gluten {
@@ -27,7 +28,14 @@ class ColumnarToRowConverter {
virtual ~ColumnarToRowConverter() = default;
- virtual void convert(std::shared_ptr<ColumnarBatch> cb = nullptr) = 0;
+ // We will start conversion from the 'rowId' row of 'cb'. The maximum memory
consumption during the grabbing and
+ // swapping process is 'memoryThreshold' bytes. The number of rows
successfully converted is stored in the 'numRows_'
+ // variable.
+ virtual void convert(std::shared_ptr<ColumnarBatch> cb = nullptr, int64_t
startRow = 0) = 0;
+
+ virtual int32_t numRows() {
+ return numRows_;
+ }
uint8_t* getBufferAddress() const {
return bufferAddress_;
diff --git a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
index 5037264e1..8a5505001 100644
--- a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
+++ b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
@@ -157,7 +157,7 @@ class GoogleBenchmarkColumnarToRowCacheScanBenchmark :
public GoogleBenchmarkCol
for (auto _ : state) {
for (const auto& vector : vectors) {
auto row = std::dynamic_pointer_cast<velox::RowVector>(vector);
- auto columnarToRowConverter =
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool);
+ auto columnarToRowConverter =
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool, 64 << 20);
auto cb = std::make_shared<VeloxColumnarBatch>(row);
TIME_NANO_START(writeTime);
columnarToRowConverter->convert(cb);
@@ -212,7 +212,7 @@ class GoogleBenchmarkColumnarToRowIterateScanBenchmark :
public GoogleBenchmarkC
numBatches += 1;
numRows += recordBatch->num_rows();
auto vector = recordBatch2RowVector(*recordBatch);
- auto columnarToRowConverter =
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool);
+ auto columnarToRowConverter =
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool, 64 << 20);
auto row = std::dynamic_pointer_cast<velox::RowVector>(vector);
auto cb = std::make_shared<VeloxColumnarBatch>(row);
TIME_NANO_START(writeTime);
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index c1e8fc860..cdce781bd 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -157,9 +157,9 @@ std::shared_ptr<ResultIterator>
VeloxRuntime::createResultIterator(
return std::make_shared<ResultIterator>(std::move(wholestageIter), this);
}
-std::shared_ptr<ColumnarToRowConverter>
VeloxRuntime::createColumnar2RowConverter() {
+std::shared_ptr<ColumnarToRowConverter>
VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) {
auto veloxPool = vmm_->getLeafMemoryPool();
- return std::make_shared<VeloxColumnarToRowConverter>(veloxPool);
+ return std::make_shared<VeloxColumnarToRowConverter>(veloxPool,
column2RowMemThreshold);
}
std::shared_ptr<ColumnarBatch>
VeloxRuntime::createOrGetEmptySchemaBatch(int32_t numRows) {
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 096ecb6fb..952a103ed 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -49,7 +49,7 @@ class VeloxRuntime final : public Runtime {
const std::vector<std::shared_ptr<ResultIterator>>& inputs = {},
const std::unordered_map<std::string, std::string>& sessionConf = {})
override;
- std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter()
override;
+ std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t
column2RowMemThreshold) override;
std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows)
override;
diff --git a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
index 52a046ac7..a609e26d0 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
+++ b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
@@ -16,8 +16,11 @@
*/
#include "VeloxColumnarToRowConverter.h"
+#include <velox/common/base/SuccinctPrinter.h>
+#include <cstdint>
#include "memory/VeloxColumnarBatch.h"
+#include "utils/exception.h"
#include "velox/row/UnsafeRowDeserializers.h"
#include "velox/row/UnsafeRowFast.h"
@@ -25,27 +28,39 @@ using namespace facebook;
namespace gluten {
-void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr
rowVector) {
- numRows_ = rowVector->size();
+void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr
rowVector, int64_t startRow) {
+ auto vectorLength = rowVector->size();
numCols_ = rowVector->childrenSize();
fast_ = std::make_unique<velox::row::UnsafeRowFast>(rowVector);
- size_t totalMemorySize = 0;
+ int64_t totalMemorySize;
+
if (auto fixedRowSize =
velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) {
- totalMemorySize += fixedRowSize.value() * numRows_;
+ auto rowSize = fixedRowSize.value();
+ // make sure it has at least one row
+ numRows_ = std::max<int32_t>(1, std::min<int64_t>(memThreshold_ / rowSize,
vectorLength - startRow));
+ totalMemorySize = numRows_ * rowSize;
} else {
- for (auto i = 0; i < numRows_; ++i) {
- totalMemorySize += fast_->rowSize(i);
+ // Calculate the first row size
+ totalMemorySize = fast_->rowSize(startRow);
+
+ auto endRow = startRow + 1;
+ for (; endRow < vectorLength; ++endRow) {
+ auto rowSize = fast_->rowSize(endRow);
+ if (UNLIKELY(totalMemorySize + rowSize > memThreshold_)) {
+ break;
+ } else {
+ totalMemorySize += rowSize;
+ }
}
+ // Make sure the threshold is larger than the first row size
+ numRows_ = endRow - startRow;
}
- if (veloxBuffers_ == nullptr) {
- // First allocate memory
+ if (nullptr == veloxBuffers_) {
veloxBuffers_ = velox::AlignedBuffer::allocate<uint8_t>(totalMemorySize,
veloxPool_.get());
- }
-
- if (veloxBuffers_->capacity() < totalMemorySize) {
+ } else if (veloxBuffers_->capacity() < totalMemorySize) {
velox::AlignedBuffer::reallocate<uint8_t>(&veloxBuffers_, totalMemorySize);
}
@@ -53,9 +68,9 @@ void
VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr ro
memset(bufferAddress_, 0, sizeof(int8_t) * totalMemorySize);
}
-void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb) {
+void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb,
int64_t startRow) {
auto veloxBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
- refreshStates(veloxBatch->getRowVector());
+ refreshStates(veloxBatch->getRowVector(), startRow);
// Initialize the offsets_ , lengths_
lengths_.clear();
@@ -64,11 +79,11 @@ void
VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb) {
offsets_.resize(numRows_, 0);
size_t offset = 0;
- for (auto rowIdx = 0; rowIdx < numRows_; ++rowIdx) {
- auto rowSize = fast_->serialize(rowIdx, (char*)(bufferAddress_ + offset));
- lengths_[rowIdx] = rowSize;
- if (rowIdx > 0) {
- offsets_[rowIdx] = offsets_[rowIdx - 1] + lengths_[rowIdx - 1];
+ for (auto i = 0; i < numRows_; ++i) {
+ auto rowSize = fast_->serialize(startRow + i, (char*)(bufferAddress_ +
offset));
+ lengths_[i] = rowSize;
+ if (i > 0) {
+ offsets_[i] = offsets_[i - 1] + lengths_[i - 1];
}
offset += rowSize;
}
diff --git a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h
b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h
index 833ffa8aa..540d991a6 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h
+++ b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h
@@ -29,17 +29,20 @@ namespace gluten {
class VeloxColumnarToRowConverter final : public ColumnarToRowConverter {
public:
- explicit
VeloxColumnarToRowConverter(std::shared_ptr<facebook::velox::memory::MemoryPool>
veloxPool)
- : ColumnarToRowConverter(), veloxPool_(veloxPool) {}
+ explicit VeloxColumnarToRowConverter(
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+ int64_t memThreshold)
+ : ColumnarToRowConverter(), veloxPool_(veloxPool),
memThreshold_(memThreshold) {}
- void convert(std::shared_ptr<ColumnarBatch> cb) override;
+ void convert(std::shared_ptr<ColumnarBatch> cb, int64_t startRow = 0)
override;
private:
- void refreshStates(facebook::velox::RowVectorPtr rowVector);
+ void refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t
startRow);
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
std::shared_ptr<facebook::velox::row::UnsafeRowFast> fast_;
facebook::velox::BufferPtr veloxBuffers_;
+ int64_t memThreshold_;
};
} // namespace gluten
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 563539d7d..1a353816f 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -61,7 +61,7 @@ class DummyRuntime final : public Runtime {
std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows)
override {
throw GlutenException("Not yet implemented");
}
- std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter()
override {
+ std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t
column2RowMemThreshold) override {
throw GlutenException("Not yet implemented");
}
std::shared_ptr<RowToColumnarConverter> createRow2ColumnarConverter(struct
ArrowSchema* cSchema) override {
diff --git a/cpp/velox/tests/VeloxColumnarToRowTest.cc
b/cpp/velox/tests/VeloxColumnarToRowTest.cc
index 2309e6e1c..3adacdda9 100644
--- a/cpp/velox/tests/VeloxColumnarToRowTest.cc
+++ b/cpp/velox/tests/VeloxColumnarToRowTest.cc
@@ -34,7 +34,7 @@ class VeloxColumnarToRowTest : public ::testing::Test, public
test::VectorTestBa
}
void testRowBufferAddr(velox::RowVectorPtr vector, uint8_t* expectArr,
int32_t expectArrSize) {
- auto columnarToRowConverter =
std::make_shared<VeloxColumnarToRowConverter>(pool_);
+ auto columnarToRowConverter =
std::make_shared<VeloxColumnarToRowConverter>(pool_, 64 << 10);
auto cb = std::make_shared<VeloxColumnarBatch>(vector);
columnarToRowConverter->convert(cb);
diff --git a/cpp/velox/tests/VeloxRowToColumnarTest.cc
b/cpp/velox/tests/VeloxRowToColumnarTest.cc
index 93f780ca3..c784dbd59 100644
--- a/cpp/velox/tests/VeloxRowToColumnarTest.cc
+++ b/cpp/velox/tests/VeloxRowToColumnarTest.cc
@@ -33,7 +33,7 @@ class VeloxRowToColumnarTest : public ::testing::Test, public
test::VectorTestBa
}
void testRowVectorEqual(velox::RowVectorPtr vector) {
- auto columnarToRowConverter =
std::make_shared<VeloxColumnarToRowConverter>(pool_);
+ auto columnarToRowConverter =
std::make_shared<VeloxColumnarToRowConverter>(pool_, 64 << 10);
auto columnarBatch = std::make_shared<VeloxColumnarBatch>(vector);
columnarToRowConverter->convert(columnarBatch);
diff --git
a/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java
b/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java
index 7f8de78f9..ffcb77ad3 100644
---
a/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java
+++
b/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java
@@ -37,8 +37,8 @@ public class NativeColumnarToRowJniWrapper implements
RuntimeAware {
public native long nativeColumnarToRowInit() throws RuntimeException;
- public native NativeColumnarToRowInfo nativeColumnarToRowConvert(long
c2rHandle, long batchHandle)
- throws RuntimeException;
+ public native NativeColumnarToRowInfo nativeColumnarToRowConvert(
+ long c2rHandle, long batchHandle, long rowId) throws RuntimeException;
public native void nativeClose(long c2rHandle);
}
diff --git
a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
index f7bcfd694..9f13ea967 100644
---
a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
+++
b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
@@ -135,8 +135,11 @@ case class ColumnarBuildSideRelation(output:
Seq[Attribute], batches: Array[Arra
} else {
val cols = batch.numCols()
val rows = batch.numRows()
- val info =
- jniWrapper.nativeColumnarToRowConvert(c2rId,
ColumnarBatches.getNativeHandle(batch))
+ var info =
+ jniWrapper.nativeColumnarToRowConvert(
+ c2rId,
+ ColumnarBatches.getNativeHandle(batch),
+ 0)
batch.close()
val columnNames = key.flatMap {
case expression: AttributeReference =>
@@ -183,6 +186,7 @@ case class ColumnarBuildSideRelation(output:
Seq[Attribute], batches: Array[Arra
new Iterator[InternalRow] {
var rowId = 0
+ var baseLength = 0
val row = new UnsafeRow(cols)
override def hasNext: Boolean = {
@@ -191,8 +195,12 @@ case class ColumnarBuildSideRelation(output:
Seq[Attribute], batches: Array[Arra
override def next: UnsafeRow = {
if (rowId >= rows) throw new NoSuchElementException
-
- val (offset, length) = (info.offsets(rowId),
info.lengths(rowId))
+ if (rowId == baseLength + info.lengths.length) {
+ baseLength += info.lengths.length
+ info = jniWrapper.nativeColumnarToRowConvert(batchHandle,
c2rId, rowId)
+ }
+ val (offset, length) =
+ (info.offsets(rowId - baseLength), info.lengths(rowId -
baseLength))
row.pointTo(null, info.memoryAddress + offset, length.toInt)
rowId += 1
row
diff --git
a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
index 77f35ff48..94bdc73a5 100644
---
a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
+++
b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
@@ -46,11 +46,12 @@ object ExecUtil {
var info: NativeColumnarToRowInfo = null
val batchHandle = ColumnarBatches.getNativeHandle(batch)
val c2rHandle = jniWrapper.nativeColumnarToRowInit()
- info = jniWrapper.nativeColumnarToRowConvert(c2rHandle, batchHandle)
+ info = jniWrapper.nativeColumnarToRowConvert(c2rHandle, batchHandle, 0)
Iterators
.wrap(new Iterator[InternalRow] {
var rowId = 0
+ var baseLength = 0
val row = new UnsafeRow(batch.numCols())
override def hasNext: Boolean = {
@@ -59,7 +60,12 @@ object ExecUtil {
override def next: UnsafeRow = {
if (rowId >= batch.numRows()) throw new NoSuchElementException
- val (offset, length) = (info.offsets(rowId), info.lengths(rowId))
+ if (rowId == baseLength + info.lengths.length) {
+ baseLength += info.lengths.length
+ info = jniWrapper.nativeColumnarToRowConvert(c2rHandle,
batchHandle, rowId)
+ }
+ val (offset, length) =
+ (info.offsets(rowId - baseLength), info.lengths(rowId -
baseLength))
row.pointTo(null, info.memoryAddress + offset, length.toInt)
rowId += 1
row
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index e3f6f1d98..ed7a81192 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -190,6 +190,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
// FIXME: Not clear: MIN or MAX ?
def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
+ def columnarToRowMemThreshold: Long =
+ conf.getConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD)
+
def shuffleWriterBufferSize: Int = conf
.getConf(SHUFFLE_WRITER_BUFFER_SIZE)
.getOrElse(maxBatchSize)
@@ -578,11 +581,14 @@ object GlutenConfig {
val GLUTEN_SORT_SHUFFLE_WRITER = "sort"
val GLUTEN_RSS_SORT_SHUFFLE_WRITER = "rss_sort"
- // Shuffle writer buffer size.
+ // Shuffle Writer buffer size.
val GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE =
"spark.gluten.shuffleWriter.bufferSize"
val GLUTEN_SHUFFLE_WRITER_MERGE_THRESHOLD =
"spark.gluten.sql.columnar.shuffle.merge.threshold"
+ // Columnar to row memory threshold.
+ val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY =
"spark.gluten.sql.columnarToRowMemoryThreshold"
+
// Controls whether to load DLL from jars. User can get dependent native
libs packed into a jar
// by executing dev/package.sh. Then, with that jar configured, Gluten can
load the native libs
// at runtime. This config is just for velox backend. And it is NOT
applicable to the situation
@@ -647,6 +653,7 @@ object GlutenConfig {
GLUTEN_SAVE_DIR,
GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_MAX_BATCH_SIZE_KEY,
+ GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY,
GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE,
SQLConf.SESSION_LOCAL_TIMEZONE.key,
GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY,
@@ -1114,6 +1121,12 @@ object GlutenConfig {
.checkValue(_ > 0, s"$GLUTEN_MAX_BATCH_SIZE_KEY must be positive.")
.createWithDefault(4096)
+ val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD =
+ buildConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY)
+ .internal()
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("64MB")
+
// if not set, use COLUMNAR_MAX_BATCH_SIZE instead
val SHUFFLE_WRITER_BUFFER_SIZE =
buildConf(GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]