This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f7b402751 [VL] Add conf option to limit C2R memory usage (#5952)
f7b402751 is described below

commit f7b4027510d32fb889c926d1a9edea700c98d309
Author: YunDa <[email protected]>
AuthorDate: Tue Aug 6 12:14:14 2024 +0800

    [VL] Add conf option to limit C2R memory usage (#5952)
    
    Co-authored-by: BInwei Yang <[email protected]>
    Co-authored-by: Hongze Zhang <[email protected]>
---
 .../gluten/execution/VeloxColumnarToRowExec.scala  | 14 ++++--
 .../apache/gluten/execution/VeloxTPCHSuite.scala   |  3 ++
 cpp/core/compute/Runtime.h                         |  2 +-
 cpp/core/config/GlutenConfig.h                     |  3 ++
 cpp/core/jni/JniWrapper.cc                         | 29 ++++++++++--
 cpp/core/operators/c2r/ColumnarToRow.h             | 10 ++++-
 cpp/velox/benchmarks/ColumnarToRowBenchmark.cc     |  4 +-
 cpp/velox/compute/VeloxRuntime.cc                  |  4 +-
 cpp/velox/compute/VeloxRuntime.h                   |  2 +-
 .../serializer/VeloxColumnarToRowConverter.cc      | 51 ++++++++++++++--------
 .../serializer/VeloxColumnarToRowConverter.h       | 11 +++--
 cpp/velox/tests/RuntimeTest.cc                     |  2 +-
 cpp/velox/tests/VeloxColumnarToRowTest.cc          |  2 +-
 cpp/velox/tests/VeloxRowToColumnarTest.cc          |  2 +-
 .../vectorized/NativeColumnarToRowJniWrapper.java  |  4 +-
 .../sql/execution/ColumnarBuildSideRelation.scala  | 16 +++++--
 .../spark/sql/execution/utils/ExecUtil.scala       | 10 ++++-
 .../scala/org/apache/gluten/GlutenConfig.scala     | 15 ++++++-
 18 files changed, 136 insertions(+), 48 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
index 2c46893e4..993a888b9 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
@@ -147,13 +147,14 @@ object VeloxColumnarToRowExec {
           val rows = batch.numRows()
           val beforeConvert = System.currentTimeMillis()
           val batchHandle = ColumnarBatches.getNativeHandle(batch)
-          val info =
-            jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle)
+          var info =
+            jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0)
 
           convertTime += (System.currentTimeMillis() - beforeConvert)
 
           new Iterator[InternalRow] {
             var rowId = 0
+            var baseLength = 0
             val row = new UnsafeRow(cols)
 
             override def hasNext: Boolean = {
@@ -161,7 +162,14 @@ object VeloxColumnarToRowExec {
             }
 
             override def next: UnsafeRow = {
-              val (offset, length) = (info.offsets(rowId), info.lengths(rowId))
+              if (rowId == baseLength + info.lengths.length) {
+                baseLength += info.lengths.length
+                val before = System.currentTimeMillis()
+                info = jniWrapper.nativeColumnarToRowConvert(c2rId, 
batchHandle, rowId)
+                convertTime += (System.currentTimeMillis() - before)
+              }
+              val (offset, length) =
+                (info.offsets(rowId - baseLength), info.lengths(rowId - 
baseLength))
               row.pointTo(null, info.memoryAddress + offset, length)
               rowId += 1
               row
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
index 798cea8cb..22f96bbbc 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.GlutenConfig
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{DataFrame, Row, TestUtils}
 import org.apache.spark.sql.execution.FormattedMode
@@ -253,6 +255,7 @@ class VeloxTPCHDistinctSpillSuite extends 
VeloxTPCHTableSupport {
     super.sparkConf
       .set("spark.memory.offHeap.size", "50m")
       .set("spark.gluten.memory.overAcquiredMemoryRatio", "0.9") // to trigger 
distinct spill early
+      .set(GlutenConfig.GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY, "8k")
   }
 
   test("distinct spill") {
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index fb501dc9a..8bdf95cd7 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -97,7 +97,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
 
   /// This function is used to create certain converter from the format used by
   /// the backend to Spark unsafe row.
-  virtual std::shared_ptr<ColumnarToRowConverter> 
createColumnar2RowConverter() = 0;
+  virtual std::shared_ptr<ColumnarToRowConverter> 
createColumnar2RowConverter(int64_t column2RowMemThreshold) = 0;
 
   virtual std::shared_ptr<RowToColumnarConverter> 
createRow2ColumnarConverter(struct ArrowSchema* cSchema) = 0;
 
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 060bbe111..e4f5a884b 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -56,6 +56,9 @@ const std::string kGzipWindowSize4k = "4096";
 
 const std::string kParquetCompressionCodec = 
"spark.sql.parquet.compression.codec";
 
+const std::string kColumnarToRowMemoryThreshold = 
"spark.gluten.sql.columnarToRowMemoryThreshold";
+const std::string kColumnarToRowMemoryDefaultThreshold = "67108864"; // 64MB
+
 const std::string kUGIUserName = "spark.gluten.ugi.username";
 const std::string kUGITokens = "spark.gluten.ugi.tokens";
 
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index add4aa54d..60f367fd7 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -16,6 +16,8 @@
  */
 
 #include <jni.h>
+#include <algorithm>
+#include <cstdint>
 #include <filesystem>
 
 #include "compute/Runtime.h"
@@ -27,6 +29,7 @@
 
 #include <arrow/c/bridge.h>
 #include <optional>
+#include <string>
 #include "memory/AllocationListener.h"
 #include "operators/serializer/ColumnarBatchSerializer.h"
 #include "shuffle/LocalPartitionWriter.h"
@@ -528,8 +531,24 @@ 
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarTo
   JNI_METHOD_START
   auto ctx = gluten::getRuntime(env, wrapper);
 
+  auto& conf = ctx->getConfMap();
+  int64_t column2RowMemThreshold;
+  auto it = conf.find(kColumnarToRowMemoryThreshold);
+  bool confIsLegal =
+      ((it == conf.end()) ? false : std::all_of(it->second.begin(), 
it->second.end(), [](unsigned char c) {
+        return std::isdigit(c);
+      }));
+  if (confIsLegal) {
+    column2RowMemThreshold = std::stoll(it->second);
+  } else {
+    LOG(INFO)
+        << "Because the spark.gluten.sql.columnarToRowMemoryThreshold 
configuration item is invalid, the kColumnarToRowMemoryDefaultThreshold default 
value is used, which is "
+        << kColumnarToRowMemoryDefaultThreshold << " byte";
+    column2RowMemThreshold = std::stoll(kColumnarToRowMemoryDefaultThreshold);
+  }
+
   // Convert the native batch to Spark unsafe row.
-  return ctx->saveObject(ctx->createColumnar2RowConverter());
+  return 
ctx->saveObject(ctx->createColumnar2RowConverter(column2RowMemThreshold));
   JNI_METHOD_END(kInvalidObjectHandle)
 }
 
@@ -538,16 +557,18 @@ 
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarTo
     JNIEnv* env,
     jobject wrapper,
     jlong c2rHandle,
-    jlong batchHandle) {
+    jlong batchHandle,
+    jlong startRow) {
   JNI_METHOD_START
   auto columnarToRowConverter = 
ObjectStore::retrieve<ColumnarToRowConverter>(c2rHandle);
   auto cb = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
-  columnarToRowConverter->convert(cb);
+
+  columnarToRowConverter->convert(cb, startRow);
 
   const auto& offsets = columnarToRowConverter->getOffsets();
   const auto& lengths = columnarToRowConverter->getLengths();
 
-  auto numRows = cb->numRows();
+  auto numRows = columnarToRowConverter->numRows();
 
   auto offsetsArr = env->NewIntArray(numRows);
   auto offsetsSrc = reinterpret_cast<const jint*>(offsets.data());
diff --git a/cpp/core/operators/c2r/ColumnarToRow.h 
b/cpp/core/operators/c2r/ColumnarToRow.h
index edee31249..062a863f7 100644
--- a/cpp/core/operators/c2r/ColumnarToRow.h
+++ b/cpp/core/operators/c2r/ColumnarToRow.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <cstdint>
 #include "memory/ColumnarBatch.h"
 
 namespace gluten {
@@ -27,7 +28,14 @@ class ColumnarToRowConverter {
 
   virtual ~ColumnarToRowConverter() = default;
 
-  virtual void convert(std::shared_ptr<ColumnarBatch> cb = nullptr) = 0;
+  // We will start conversion from the 'rowId' row of 'cb'. The maximum memory 
consumption during the grabbing and
+  // swapping process is 'memoryThreshold' bytes. The number of rows 
successfully converted is stored in the 'numRows_'
+  // variable.
+  virtual void convert(std::shared_ptr<ColumnarBatch> cb = nullptr, int64_t 
startRow = 0) = 0;
+
+  virtual int32_t numRows() {
+    return numRows_;
+  }
 
   uint8_t* getBufferAddress() const {
     return bufferAddress_;
diff --git a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc 
b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
index 5037264e1..8a5505001 100644
--- a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
+++ b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
@@ -157,7 +157,7 @@ class GoogleBenchmarkColumnarToRowCacheScanBenchmark : 
public GoogleBenchmarkCol
     for (auto _ : state) {
       for (const auto& vector : vectors) {
         auto row = std::dynamic_pointer_cast<velox::RowVector>(vector);
-        auto columnarToRowConverter = 
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool);
+        auto columnarToRowConverter = 
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool, 64 << 20);
         auto cb = std::make_shared<VeloxColumnarBatch>(row);
         TIME_NANO_START(writeTime);
         columnarToRowConverter->convert(cb);
@@ -212,7 +212,7 @@ class GoogleBenchmarkColumnarToRowIterateScanBenchmark : 
public GoogleBenchmarkC
         numBatches += 1;
         numRows += recordBatch->num_rows();
         auto vector = recordBatch2RowVector(*recordBatch);
-        auto columnarToRowConverter = 
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool);
+        auto columnarToRowConverter = 
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool, 64 << 20);
         auto row = std::dynamic_pointer_cast<velox::RowVector>(vector);
         auto cb = std::make_shared<VeloxColumnarBatch>(row);
         TIME_NANO_START(writeTime);
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index c1e8fc860..cdce781bd 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -157,9 +157,9 @@ std::shared_ptr<ResultIterator> 
VeloxRuntime::createResultIterator(
   return std::make_shared<ResultIterator>(std::move(wholestageIter), this);
 }
 
-std::shared_ptr<ColumnarToRowConverter> 
VeloxRuntime::createColumnar2RowConverter() {
+std::shared_ptr<ColumnarToRowConverter> 
VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) {
   auto veloxPool = vmm_->getLeafMemoryPool();
-  return std::make_shared<VeloxColumnarToRowConverter>(veloxPool);
+  return std::make_shared<VeloxColumnarToRowConverter>(veloxPool, 
column2RowMemThreshold);
 }
 
 std::shared_ptr<ColumnarBatch> 
VeloxRuntime::createOrGetEmptySchemaBatch(int32_t numRows) {
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 096ecb6fb..952a103ed 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -49,7 +49,7 @@ class VeloxRuntime final : public Runtime {
       const std::vector<std::shared_ptr<ResultIterator>>& inputs = {},
       const std::unordered_map<std::string, std::string>& sessionConf = {}) 
override;
 
-  std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter() 
override;
+  std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t 
column2RowMemThreshold) override;
 
   std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows) 
override;
 
diff --git a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc 
b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
index 52a046ac7..a609e26d0 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
+++ b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
@@ -16,8 +16,11 @@
  */
 
 #include "VeloxColumnarToRowConverter.h"
+#include <velox/common/base/SuccinctPrinter.h>
+#include <cstdint>
 
 #include "memory/VeloxColumnarBatch.h"
+#include "utils/exception.h"
 #include "velox/row/UnsafeRowDeserializers.h"
 #include "velox/row/UnsafeRowFast.h"
 
@@ -25,27 +28,39 @@ using namespace facebook;
 
 namespace gluten {
 
-void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr 
rowVector) {
-  numRows_ = rowVector->size();
+void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr 
rowVector, int64_t startRow) {
+  auto vectorLength = rowVector->size();
   numCols_ = rowVector->childrenSize();
 
   fast_ = std::make_unique<velox::row::UnsafeRowFast>(rowVector);
 
-  size_t totalMemorySize = 0;
+  int64_t totalMemorySize;
+
   if (auto fixedRowSize = 
velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) {
-    totalMemorySize += fixedRowSize.value() * numRows_;
+    auto rowSize = fixedRowSize.value();
+    // make sure it has at least one row
+    numRows_ = std::max<int32_t>(1, std::min<int64_t>(memThreshold_ / rowSize, 
vectorLength - startRow));
+    totalMemorySize = numRows_ * rowSize;
   } else {
-    for (auto i = 0; i < numRows_; ++i) {
-      totalMemorySize += fast_->rowSize(i);
+    // Calculate the first row size
+    totalMemorySize = fast_->rowSize(startRow);
+
+    auto endRow = startRow + 1;
+    for (; endRow < vectorLength; ++endRow) {
+      auto rowSize = fast_->rowSize(endRow);
+      if (UNLIKELY(totalMemorySize + rowSize > memThreshold_)) {
+        break;
+      } else {
+        totalMemorySize += rowSize;
+      }
     }
+    // Make sure the threshold is larger than the first row size
+    numRows_ = endRow - startRow;
   }
 
-  if (veloxBuffers_ == nullptr) {
-    // First allocate memory
+  if (nullptr == veloxBuffers_) {
     veloxBuffers_ = velox::AlignedBuffer::allocate<uint8_t>(totalMemorySize, 
veloxPool_.get());
-  }
-
-  if (veloxBuffers_->capacity() < totalMemorySize) {
+  } else if (veloxBuffers_->capacity() < totalMemorySize) {
     velox::AlignedBuffer::reallocate<uint8_t>(&veloxBuffers_, totalMemorySize);
   }
 
@@ -53,9 +68,9 @@ void 
VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr ro
   memset(bufferAddress_, 0, sizeof(int8_t) * totalMemorySize);
 }
 
-void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb) {
+void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb, 
int64_t startRow) {
   auto veloxBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
-  refreshStates(veloxBatch->getRowVector());
+  refreshStates(veloxBatch->getRowVector(), startRow);
 
   // Initialize the offsets_ , lengths_
   lengths_.clear();
@@ -64,11 +79,11 @@ void 
VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb) {
   offsets_.resize(numRows_, 0);
 
   size_t offset = 0;
-  for (auto rowIdx = 0; rowIdx < numRows_; ++rowIdx) {
-    auto rowSize = fast_->serialize(rowIdx, (char*)(bufferAddress_ + offset));
-    lengths_[rowIdx] = rowSize;
-    if (rowIdx > 0) {
-      offsets_[rowIdx] = offsets_[rowIdx - 1] + lengths_[rowIdx - 1];
+  for (auto i = 0; i < numRows_; ++i) {
+    auto rowSize = fast_->serialize(startRow + i, (char*)(bufferAddress_ + 
offset));
+    lengths_[i] = rowSize;
+    if (i > 0) {
+      offsets_[i] = offsets_[i - 1] + lengths_[i - 1];
     }
     offset += rowSize;
   }
diff --git a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h 
b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h
index 833ffa8aa..540d991a6 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h
+++ b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h
@@ -29,17 +29,20 @@ namespace gluten {
 
 class VeloxColumnarToRowConverter final : public ColumnarToRowConverter {
  public:
-  explicit 
VeloxColumnarToRowConverter(std::shared_ptr<facebook::velox::memory::MemoryPool>
 veloxPool)
-      : ColumnarToRowConverter(), veloxPool_(veloxPool) {}
+  explicit VeloxColumnarToRowConverter(
+      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+      int64_t memThreshold)
+      : ColumnarToRowConverter(), veloxPool_(veloxPool), 
memThreshold_(memThreshold) {}
 
-  void convert(std::shared_ptr<ColumnarBatch> cb) override;
+  void convert(std::shared_ptr<ColumnarBatch> cb, int64_t startRow = 0) 
override;
 
  private:
-  void refreshStates(facebook::velox::RowVectorPtr rowVector);
+  void refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t 
startRow);
 
   std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
   std::shared_ptr<facebook::velox::row::UnsafeRowFast> fast_;
   facebook::velox::BufferPtr veloxBuffers_;
+  int64_t memThreshold_;
 };
 
 } // namespace gluten
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 563539d7d..1a353816f 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -61,7 +61,7 @@ class DummyRuntime final : public Runtime {
   std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows) 
override {
     throw GlutenException("Not yet implemented");
   }
-  std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter() 
override {
+  std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t 
column2RowMemThreshold) override {
     throw GlutenException("Not yet implemented");
   }
   std::shared_ptr<RowToColumnarConverter> createRow2ColumnarConverter(struct 
ArrowSchema* cSchema) override {
diff --git a/cpp/velox/tests/VeloxColumnarToRowTest.cc 
b/cpp/velox/tests/VeloxColumnarToRowTest.cc
index 2309e6e1c..3adacdda9 100644
--- a/cpp/velox/tests/VeloxColumnarToRowTest.cc
+++ b/cpp/velox/tests/VeloxColumnarToRowTest.cc
@@ -34,7 +34,7 @@ class VeloxColumnarToRowTest : public ::testing::Test, public 
test::VectorTestBa
   }
 
   void testRowBufferAddr(velox::RowVectorPtr vector, uint8_t* expectArr, 
int32_t expectArrSize) {
-    auto columnarToRowConverter = 
std::make_shared<VeloxColumnarToRowConverter>(pool_);
+    auto columnarToRowConverter = 
std::make_shared<VeloxColumnarToRowConverter>(pool_, 64 << 10);
 
     auto cb = std::make_shared<VeloxColumnarBatch>(vector);
     columnarToRowConverter->convert(cb);
diff --git a/cpp/velox/tests/VeloxRowToColumnarTest.cc 
b/cpp/velox/tests/VeloxRowToColumnarTest.cc
index 93f780ca3..c784dbd59 100644
--- a/cpp/velox/tests/VeloxRowToColumnarTest.cc
+++ b/cpp/velox/tests/VeloxRowToColumnarTest.cc
@@ -33,7 +33,7 @@ class VeloxRowToColumnarTest : public ::testing::Test, public 
test::VectorTestBa
   }
 
   void testRowVectorEqual(velox::RowVectorPtr vector) {
-    auto columnarToRowConverter = 
std::make_shared<VeloxColumnarToRowConverter>(pool_);
+    auto columnarToRowConverter = 
std::make_shared<VeloxColumnarToRowConverter>(pool_, 64 << 10);
 
     auto columnarBatch = std::make_shared<VeloxColumnarBatch>(vector);
     columnarToRowConverter->convert(columnarBatch);
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java
 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java
index 7f8de78f9..ffcb77ad3 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java
@@ -37,8 +37,8 @@ public class NativeColumnarToRowJniWrapper implements 
RuntimeAware {
 
   public native long nativeColumnarToRowInit() throws RuntimeException;
 
-  public native NativeColumnarToRowInfo nativeColumnarToRowConvert(long 
c2rHandle, long batchHandle)
-      throws RuntimeException;
+  public native NativeColumnarToRowInfo nativeColumnarToRowConvert(
+      long c2rHandle, long batchHandle, long rowId) throws RuntimeException;
 
   public native void nativeClose(long c2rHandle);
 }
diff --git 
a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
 
b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
index f7bcfd694..9f13ea967 100644
--- 
a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
+++ 
b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
@@ -135,8 +135,11 @@ case class ColumnarBuildSideRelation(output: 
Seq[Attribute], batches: Array[Arra
           } else {
             val cols = batch.numCols()
             val rows = batch.numRows()
-            val info =
-              jniWrapper.nativeColumnarToRowConvert(c2rId, 
ColumnarBatches.getNativeHandle(batch))
+            var info =
+              jniWrapper.nativeColumnarToRowConvert(
+                c2rId,
+                ColumnarBatches.getNativeHandle(batch),
+                0)
             batch.close()
             val columnNames = key.flatMap {
               case expression: AttributeReference =>
@@ -183,6 +186,7 @@ case class ColumnarBuildSideRelation(output: 
Seq[Attribute], batches: Array[Arra
 
             new Iterator[InternalRow] {
               var rowId = 0
+              var baseLength = 0
               val row = new UnsafeRow(cols)
 
               override def hasNext: Boolean = {
@@ -191,8 +195,12 @@ case class ColumnarBuildSideRelation(output: 
Seq[Attribute], batches: Array[Arra
 
               override def next: UnsafeRow = {
                 if (rowId >= rows) throw new NoSuchElementException
-
-                val (offset, length) = (info.offsets(rowId), 
info.lengths(rowId))
+                if (rowId == baseLength + info.lengths.length) {
+                  baseLength += info.lengths.length
+                  info = jniWrapper.nativeColumnarToRowConvert(batchHandle, 
c2rId, rowId)
+                }
+                val (offset, length) =
+                  (info.offsets(rowId - baseLength), info.lengths(rowId - 
baseLength))
                 row.pointTo(null, info.memoryAddress + offset, length.toInt)
                 rowId += 1
                 row
diff --git 
a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
 
b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
index 77f35ff48..94bdc73a5 100644
--- 
a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
+++ 
b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
@@ -46,11 +46,12 @@ object ExecUtil {
     var info: NativeColumnarToRowInfo = null
     val batchHandle = ColumnarBatches.getNativeHandle(batch)
     val c2rHandle = jniWrapper.nativeColumnarToRowInit()
-    info = jniWrapper.nativeColumnarToRowConvert(c2rHandle, batchHandle)
+    info = jniWrapper.nativeColumnarToRowConvert(c2rHandle, batchHandle, 0)
 
     Iterators
       .wrap(new Iterator[InternalRow] {
         var rowId = 0
+        var baseLength = 0
         val row = new UnsafeRow(batch.numCols())
 
         override def hasNext: Boolean = {
@@ -59,7 +60,12 @@ object ExecUtil {
 
         override def next: UnsafeRow = {
           if (rowId >= batch.numRows()) throw new NoSuchElementException
-          val (offset, length) = (info.offsets(rowId), info.lengths(rowId))
+          if (rowId == baseLength + info.lengths.length) {
+            baseLength += info.lengths.length
+            info = jniWrapper.nativeColumnarToRowConvert(c2rHandle, 
batchHandle, rowId)
+          }
+          val (offset, length) =
+            (info.offsets(rowId - baseLength), info.lengths(rowId - 
baseLength))
           row.pointTo(null, info.memoryAddress + offset, length.toInt)
           rowId += 1
           row
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index e3f6f1d98..ed7a81192 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -190,6 +190,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   // FIXME: Not clear: MIN or MAX ?
   def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
 
+  def columnarToRowMemThreshold: Long =
+    conf.getConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD)
+
   def shuffleWriterBufferSize: Int = conf
     .getConf(SHUFFLE_WRITER_BUFFER_SIZE)
     .getOrElse(maxBatchSize)
@@ -578,11 +581,14 @@ object GlutenConfig {
   val GLUTEN_SORT_SHUFFLE_WRITER = "sort"
   val GLUTEN_RSS_SORT_SHUFFLE_WRITER = "rss_sort"
 
-  // Shuffle writer buffer size.
+  // Shuffle Writer buffer size.
   val GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE = 
"spark.gluten.shuffleWriter.bufferSize"
 
   val GLUTEN_SHUFFLE_WRITER_MERGE_THRESHOLD = 
"spark.gluten.sql.columnar.shuffle.merge.threshold"
 
+  // Columnar to row memory threshold.
+  val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY = 
"spark.gluten.sql.columnarToRowMemoryThreshold"
+
   // Controls whether to load DLL from jars. User can get dependent native 
libs packed into a jar
   // by executing dev/package.sh. Then, with that jar configured, Gluten can 
load the native libs
   // at runtime. This config is just for velox backend. And it is NOT 
applicable to the situation
@@ -647,6 +653,7 @@ object GlutenConfig {
       GLUTEN_SAVE_DIR,
       GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
       GLUTEN_MAX_BATCH_SIZE_KEY,
+      GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY,
       GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE,
       SQLConf.SESSION_LOCAL_TIMEZONE.key,
       GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY,
@@ -1114,6 +1121,12 @@ object GlutenConfig {
       .checkValue(_ > 0, s"$GLUTEN_MAX_BATCH_SIZE_KEY must be positive.")
       .createWithDefault(4096)
 
+  val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD =
+    buildConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY)
+      .internal()
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("64MB")
+
   // if not set, use COLUMNAR_MAX_BATCH_SIZE instead
   val SHUFFLE_WRITER_BUFFER_SIZE =
     buildConf(GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to