This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d0b6e1da [GLUTEN-5307][VL] Fix Potential Overflow Issue in 
VeloxShuffleWriter Due to Mismatched Data Types of RowNumber (#5326)
2d0b6e1da is described below

commit 2d0b6e1da730ccbc12362fd376290de0b5aadb2c
Author: Zhengguo Yang <[email protected]>
AuthorDate: Thu Apr 18 10:46:02 2024 +0800

    [GLUTEN-5307][VL] Fix Potential Overflow Issue in VeloxShuffleWriter Due to 
Mismatched Data Types of RowNumber (#5326)
---
 cpp/core/shuffle/FallbackRangePartitioner.cc |  2 +-
 cpp/core/shuffle/FallbackRangePartitioner.h  |  2 +-
 cpp/core/shuffle/HashPartitioner.cc          |  2 +-
 cpp/core/shuffle/HashPartitioner.h           |  2 +-
 cpp/core/shuffle/Partitioner.h               |  2 +-
 cpp/core/shuffle/RoundRobinPartitioner.cc    |  2 +-
 cpp/core/shuffle/RoundRobinPartitioner.h     |  2 +-
 cpp/core/shuffle/SinglePartitioner.cc        |  2 +-
 cpp/core/shuffle/SinglePartitioner.h         |  2 +-
 cpp/core/tests/RoundRobinPartitionerTest.cc  | 22 ++++++++++----------
 cpp/velox/shuffle/VeloxShuffleWriter.cc      | 22 ++++++++++----------
 cpp/velox/shuffle/VeloxShuffleWriter.h       | 30 ++++++++++++++--------------
 12 files changed, 46 insertions(+), 46 deletions(-)

diff --git a/cpp/core/shuffle/FallbackRangePartitioner.cc 
b/cpp/core/shuffle/FallbackRangePartitioner.cc
index e553d351f..4bad50b51 100644
--- a/cpp/core/shuffle/FallbackRangePartitioner.cc
+++ b/cpp/core/shuffle/FallbackRangePartitioner.cc
@@ -24,7 +24,7 @@ arrow::Status gluten::FallbackRangePartitioner::compute(
     const int32_t* pidArr,
     const int64_t numRows,
     std::vector<uint32_t>& row2Partition,
-    std::vector<uint16_t>& partition2RowCount) {
+    std::vector<uint32_t>& partition2RowCount) {
   row2Partition.resize(numRows);
   std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);
   for (auto i = 0; i < numRows; ++i) {
diff --git a/cpp/core/shuffle/FallbackRangePartitioner.h 
b/cpp/core/shuffle/FallbackRangePartitioner.h
index 4e1530b7a..f54dd1abc 100644
--- a/cpp/core/shuffle/FallbackRangePartitioner.h
+++ b/cpp/core/shuffle/FallbackRangePartitioner.h
@@ -29,7 +29,7 @@ class FallbackRangePartitioner final : public Partitioner {
       const int32_t* pidArr,
       const int64_t numRows,
       std::vector<uint32_t>& row2partition,
-      std::vector<uint16_t>& partition2RowCount) override;
+      std::vector<uint32_t>& partition2RowCount) override;
 };
 
 } // namespace gluten
diff --git a/cpp/core/shuffle/HashPartitioner.cc 
b/cpp/core/shuffle/HashPartitioner.cc
index b37f14d74..c62e3185f 100644
--- a/cpp/core/shuffle/HashPartitioner.cc
+++ b/cpp/core/shuffle/HashPartitioner.cc
@@ -23,7 +23,7 @@ arrow::Status gluten::HashPartitioner::compute(
     const int32_t* pidArr,
     const int64_t numRows,
     std::vector<uint32_t>& row2partition,
-    std::vector<uint16_t>& partition2RowCount) {
+    std::vector<uint32_t>& partition2RowCount) {
   row2partition.resize(numRows);
   std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);
 
diff --git a/cpp/core/shuffle/HashPartitioner.h 
b/cpp/core/shuffle/HashPartitioner.h
index 7bbc51b2c..fff01f939 100644
--- a/cpp/core/shuffle/HashPartitioner.h
+++ b/cpp/core/shuffle/HashPartitioner.h
@@ -29,7 +29,7 @@ class HashPartitioner final : public Partitioner {
       const int32_t* pidArr,
       const int64_t numRows,
       std::vector<uint32_t>& row2partition,
-      std::vector<uint16_t>& partition2RowCount) override;
+      std::vector<uint32_t>& partition2RowCount) override;
 };
 
 } // namespace gluten
diff --git a/cpp/core/shuffle/Partitioner.h b/cpp/core/shuffle/Partitioner.h
index 79a38be13..8331b8a91 100644
--- a/cpp/core/shuffle/Partitioner.h
+++ b/cpp/core/shuffle/Partitioner.h
@@ -38,7 +38,7 @@ class Partitioner {
       const int32_t* pidArr,
       const int64_t numRows,
       std::vector<uint32_t>& row2partition,
-      std::vector<uint16_t>& partition2RowCount) = 0;
+      std::vector<uint32_t>& partition2RowCount) = 0;
 
  protected:
   Partitioner(int32_t numPartitions, bool hasPid) : 
numPartitions_(numPartitions), hasPid_(hasPid) {}
diff --git a/cpp/core/shuffle/RoundRobinPartitioner.cc 
b/cpp/core/shuffle/RoundRobinPartitioner.cc
index eddcb50a9..b00680a18 100644
--- a/cpp/core/shuffle/RoundRobinPartitioner.cc
+++ b/cpp/core/shuffle/RoundRobinPartitioner.cc
@@ -23,7 +23,7 @@ arrow::Status gluten::RoundRobinPartitioner::compute(
     const int32_t* pidArr,
     const int64_t numRows,
     std::vector<uint32_t>& row2Partition,
-    std::vector<uint16_t>& partition2RowCount) {
+    std::vector<uint32_t>& partition2RowCount) {
   std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);
   row2Partition.resize(numRows);
 
diff --git a/cpp/core/shuffle/RoundRobinPartitioner.h 
b/cpp/core/shuffle/RoundRobinPartitioner.h
index 74fb8dcef..5afd2832a 100644
--- a/cpp/core/shuffle/RoundRobinPartitioner.h
+++ b/cpp/core/shuffle/RoundRobinPartitioner.h
@@ -30,7 +30,7 @@ class RoundRobinPartitioner final : public Partitioner {
       const int32_t* pidArr,
       const int64_t numRows,
       std::vector<uint32_t>& row2Partition,
-      std::vector<uint16_t>& partition2RowCount) override;
+      std::vector<uint32_t>& partition2RowCount) override;
 
  private:
   friend class RoundRobinPartitionerTest;
diff --git a/cpp/core/shuffle/SinglePartitioner.cc 
b/cpp/core/shuffle/SinglePartitioner.cc
index b3ea9eec7..c4f80ce79 100644
--- a/cpp/core/shuffle/SinglePartitioner.cc
+++ b/cpp/core/shuffle/SinglePartitioner.cc
@@ -23,7 +23,7 @@ arrow::Status gluten::SinglePartitioner::compute(
     const int32_t* pidArr,
     const int64_t numRows,
     std::vector<uint32_t>& row2partition,
-    std::vector<uint16_t>& partition2RowCount) {
+    std::vector<uint32_t>& partition2RowCount) {
   // nothing is need do here
   return arrow::Status::OK();
 }
diff --git a/cpp/core/shuffle/SinglePartitioner.h 
b/cpp/core/shuffle/SinglePartitioner.h
index 5a6977790..d3d2c29f7 100644
--- a/cpp/core/shuffle/SinglePartitioner.h
+++ b/cpp/core/shuffle/SinglePartitioner.h
@@ -28,6 +28,6 @@ class SinglePartitioner final : public Partitioner {
       const int32_t* pidArr,
       const int64_t numRows,
       std::vector<uint32_t>& row2partition,
-      std::vector<uint16_t>& partition2RowCount) override;
+      std::vector<uint32_t>& partition2RowCount) override;
 };
 } // namespace gluten
diff --git a/cpp/core/tests/RoundRobinPartitionerTest.cc 
b/cpp/core/tests/RoundRobinPartitionerTest.cc
index 01acd33de..d353c951b 100644
--- a/cpp/core/tests/RoundRobinPartitionerTest.cc
+++ b/cpp/core/tests/RoundRobinPartitionerTest.cc
@@ -29,13 +29,13 @@ class RoundRobinPartitionerTest : public ::testing::Test {
     partition2RowCount_.resize(numPart);
   }
 
-  void checkResult(const std::vector<uint32_t>& expectRow2Part, const 
std::vector<uint16_t>& expectPart2RowCount)
+  void checkResult(const std::vector<uint32_t>& expectRow2Part, const 
std::vector<uint32_t>& expectPart2RowCount)
       const {
     ASSERT_EQ(row2Partition_, expectRow2Part);
     ASSERT_EQ(partition2RowCount_, expectPart2RowCount);
   }
 
-  void traceCheckResult(const std::vector<uint32_t>& expectRow2Part, const 
std::vector<uint16_t>& expectPart2RowCount)
+  void traceCheckResult(const std::vector<uint32_t>& expectRow2Part, const 
std::vector<uint32_t>& expectPart2RowCount)
       const {
     toString(expectRow2Part, "expectRow2Part");
     toString(expectPart2RowCount, "expectPart2RowCount");
@@ -59,7 +59,7 @@ class RoundRobinPartitionerTest : public ::testing::Test {
   }
 
   std::vector<uint32_t> row2Partition_;
-  std::vector<uint16_t> partition2RowCount_;
+  std::vector<uint32_t> partition2RowCount_;
   std::shared_ptr<RoundRobinPartitioner> partitioner_;
 };
 
@@ -81,7 +81,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) {
     ASSERT_EQ(getPidSelection(), 0);
     std::vector<uint32_t> row2Part(numRows);
     std::iota(row2Part.begin(), row2Part.end(), 0);
-    checkResult(row2Part, std::vector<uint16_t>(numPart, 1));
+    checkResult(row2Part, std::vector<uint32_t>(numPart, 1));
   }
 
   // numRows less than numPart
@@ -93,7 +93,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) {
     ASSERT_EQ(getPidSelection(), 8);
     std::vector<uint32_t> row2Part(numRows);
     std::iota(row2Part.begin(), row2Part.end(), 0);
-    std::vector<uint16_t> part2RowCount(numPart, 0);
+    std::vector<uint32_t> part2RowCount(numPart, 0);
     std::fill_n(part2RowCount.begin(), numRows, 1);
     checkResult(row2Part, part2RowCount);
   }
@@ -107,7 +107,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) {
     ASSERT_EQ(getPidSelection(), 2);
     std::vector<uint32_t> row2Part(numRows);
     std::generate_n(row2Part.begin(), numRows, [n = 0, numPart]() mutable { 
return (n++) % numPart; });
-    std::vector<uint16_t> part2RowCount(numPart, 1);
+    std::vector<uint32_t> part2RowCount(numPart, 1);
     std::fill_n(part2RowCount.begin(), numRows - numPart, 2);
     checkResult(row2Part, part2RowCount);
   }
@@ -121,7 +121,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) {
     ASSERT_EQ(getPidSelection(), 2);
     std::vector<uint32_t> row2Part(numRows);
     std::generate_n(row2Part.begin(), numRows, [n = 0, numPart]() mutable { 
return (n++) % numPart; });
-    std::vector<uint16_t> part2RowCount(numPart, 2);
+    std::vector<uint32_t> part2RowCount(numPart, 2);
     std::fill_n(part2RowCount.begin(), numRows - 2 * numPart, 3);
     checkResult(row2Part, part2RowCount);
   }
@@ -137,7 +137,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeContinuous) {
     ASSERT_EQ(getPidSelection(), 8);
     std::vector<uint32_t> row2Part(numRows);
     std::generate_n(row2Part.begin(), numRows, [n = 0, numPart]() mutable { 
return (n++) % numPart; });
-    std::vector<uint16_t> part2RowCount(numPart, 0);
+    std::vector<uint32_t> part2RowCount(numPart, 0);
     std::fill_n(part2RowCount.begin(), numRows, 1);
     checkResult(row2Part, part2RowCount);
   }
@@ -148,7 +148,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeContinuous) {
     ASSERT_EQ(getPidSelection(), 8);
     std::vector<uint32_t> row2Part(numRows);
     std::generate_n(row2Part.begin(), numRows, [n = 8, numPart]() mutable { 
return (n++) % numPart; });
-    checkResult(row2Part, std::vector<uint16_t>(numPart, 1));
+    checkResult(row2Part, std::vector<uint32_t>(numPart, 1));
   }
 
   {
@@ -157,7 +157,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeContinuous) {
     ASSERT_EQ(getPidSelection(), 0);
     std::vector<uint32_t> row2Part(numRows);
     std::generate_n(row2Part.begin(), numRows, [n = 8, numPart]() mutable { 
return (n++) % numPart; });
-    std::vector<uint16_t> part2RowCount(numPart, 1);
+    std::vector<uint32_t> part2RowCount(numPart, 1);
     std::fill_n(part2RowCount.begin() + 8, numRows - numPart, 2);
     checkResult(row2Part, part2RowCount);
   }
@@ -168,7 +168,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeContinuous) {
     ASSERT_EQ(getPidSelection(), 2);
     std::vector<uint32_t> row2Part(numRows);
     std::generate_n(row2Part.begin(), numRows, [n = 0, numPart]() mutable { 
return (n++) % numPart; });
-    std::vector<uint16_t> part2RowCount(numPart, 2);
+    std::vector<uint32_t> part2RowCount(numPart, 2);
     std::fill_n(part2RowCount.begin(), numRows - 2 * numPart, 3);
     checkResult(row2Part, part2RowCount);
   }
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxShuffleWriter.cc
index 8b69471e7..cad5dae64 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc
@@ -267,7 +267,7 @@ int64_t VeloxShuffleWriter::rawPartitionBytes() const {
   return std::accumulate(metrics_.rawPartitionLengths.begin(), 
metrics_.rawPartitionLengths.end(), 0LL);
 }
 
-void VeloxShuffleWriter::setPartitionBufferSize(uint16_t newSize) {
+void VeloxShuffleWriter::setPartitionBufferSize(uint32_t newSize) {
   options_.bufferSize = newSize;
 }
 
@@ -832,7 +832,7 @@ arrow::Status VeloxShuffleWriter::initFromRowVector(const 
facebook::velox::RowVe
   return arrow::Status::OK();
 }
 
-inline bool VeloxShuffleWriter::beyondThreshold(uint32_t partitionId, uint16_t 
newSize) {
+inline bool VeloxShuffleWriter::beyondThreshold(uint32_t partitionId, uint32_t 
newSize) {
   auto currentBufferSize = partitionBufferSize_[partitionId];
   return newSize > (1 + options_.bufferReallocThreshold) * currentBufferSize ||
       newSize < (1 - options_.bufferReallocThreshold) * currentBufferSize;
@@ -848,7 +848,7 @@ void VeloxShuffleWriter::calculateSimpleColumnBytes() {
   fixedWidthBufferBytes_ += kSizeOfBinaryArrayLengthBuffer * 
binaryColumnIndices_.size();
 }
 
-uint16_t VeloxShuffleWriter::calculatePartitionBufferSize(const 
facebook::velox::RowVector& rv, int64_t memLimit) {
+uint32_t VeloxShuffleWriter::calculatePartitionBufferSize(const 
facebook::velox::RowVector& rv, int64_t memLimit) {
   auto bytesPerRow = fixedWidthBufferBytes_;
 
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCalculateBufferSize]);
@@ -895,11 +895,11 @@ uint16_t 
VeloxShuffleWriter::calculatePartitionBufferSize(const facebook::velox:
 
   totalInputNumRows_ += numRows;
 
-  return (uint16_t)preAllocRowCnt;
+  return (uint32_t)preAllocRowCnt;
 }
 
 arrow::Result<std::shared_ptr<arrow::ResizableBuffer>>
-VeloxShuffleWriter::allocateValidityBuffer(uint32_t col, uint32_t partitionId, 
uint16_t newSize) {
+VeloxShuffleWriter::allocateValidityBuffer(uint32_t col, uint32_t partitionId, 
uint32_t newSize) {
   if (inputHasNull_[col]) {
     ARROW_ASSIGN_OR_RAISE(
         auto validityBuffer,
@@ -913,7 +913,7 @@ VeloxShuffleWriter::allocateValidityBuffer(uint32_t col, 
uint32_t partitionId, u
   return nullptr;
 }
 
-arrow::Status VeloxShuffleWriter::updateValidityBuffers(uint32_t partitionId, 
uint16_t newSize) {
+arrow::Status VeloxShuffleWriter::updateValidityBuffers(uint32_t partitionId, 
uint32_t newSize) {
   for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
     // If the validity buffer is not yet allocated, allocate and fill 0xff 
based on inputHasNull_.
     if (partitionValidityAddrs_[i][partitionId] == nullptr) {
@@ -924,7 +924,7 @@ arrow::Status 
VeloxShuffleWriter::updateValidityBuffers(uint32_t partitionId, ui
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::allocatePartitionBuffer(uint32_t 
partitionId, uint16_t newSize) {
+arrow::Status VeloxShuffleWriter::allocatePartitionBuffer(uint32_t 
partitionId, uint32_t newSize) {
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingAllocateBuffer]);
 
   for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
@@ -1184,7 +1184,7 @@ arrow::Status 
VeloxShuffleWriter::resetValidityBuffer(uint32_t partitionId) {
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, 
uint16_t newSize, bool preserveData) {
+arrow::Status VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, 
uint32_t newSize, bool preserveData) {
   for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
     auto columnType = schema_->field(simpleColumnIndices_[i])->type()->id();
     auto& buffers = partitionBuffers_[i][partitionId];
@@ -1286,11 +1286,11 @@ arrow::Status 
VeloxShuffleWriter::shrinkPartitionBuffer(uint32_t partitionId) {
   return resizePartitionBuffer(partitionId, newSize, /*preserveData=*/true);
 }
 
-uint64_t VeloxShuffleWriter::valueBufferSizeForBinaryArray(uint32_t binaryIdx, 
uint16_t newSize) {
+uint64_t VeloxShuffleWriter::valueBufferSizeForBinaryArray(uint32_t binaryIdx, 
uint32_t newSize) {
   return (binaryArrayTotalSizeBytes_[binaryIdx] + totalInputNumRows_ - 1) / 
totalInputNumRows_ * newSize + 1024;
 }
 
-uint64_t VeloxShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t 
fixedWidthIndex, uint16_t newSize) {
+uint64_t VeloxShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t 
fixedWidthIndex, uint32_t newSize) {
   uint64_t valueBufferSize = 0;
   auto columnIdx = simpleColumnIndices_[fixedWidthIndex];
   if (arrowColumnTypes_[columnIdx]->id() == arrow::BooleanType::type_id) {
@@ -1425,7 +1425,7 @@ arrow::Result<uint32_t> 
VeloxShuffleWriter::partitionBufferSizeAfterShrink(uint3
   return arrow::Status::Invalid("Cannot shrink partition buffers in 
SplitState: " + std::to_string(splitState_));
 }
 
-arrow::Status VeloxShuffleWriter::preAllocPartitionBuffers(uint16_t 
preAllocBufferSize) {
+arrow::Status VeloxShuffleWriter::preAllocPartitionBuffers(uint32_t 
preAllocBufferSize) {
   for (auto& pid : partitionUsed_) {
     auto newSize = std::max(preAllocBufferSize, partition2RowCount_[pid]);
     VLOG_IF(9, partitionBufferSize_[pid] != newSize)
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h 
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index d0bc8445c..c06cd7a0d 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -143,7 +143,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {
   int64_t rawPartitionBytes() const;
 
   // For test only.
-  void setPartitionBufferSize(uint16_t newSize);
+  void setPartitionBufferSize(uint32_t newSize);
 
   // for debugging
   void printColumnsInfo() const {
@@ -223,18 +223,18 @@ class VeloxShuffleWriter final : public ShuffleWriter {
 
   arrow::Status doSplit(const facebook::velox::RowVector& rv, int64_t 
memLimit);
 
-  bool beyondThreshold(uint32_t partitionId, uint16_t newSize);
+  bool beyondThreshold(uint32_t partitionId, uint32_t newSize);
 
-  uint16_t calculatePartitionBufferSize(const facebook::velox::RowVector& rv, 
int64_t memLimit);
+  uint32_t calculatePartitionBufferSize(const facebook::velox::RowVector& rv, 
int64_t memLimit);
 
-  arrow::Status preAllocPartitionBuffers(uint16_t preAllocBufferSize);
+  arrow::Status preAllocPartitionBuffers(uint32_t preAllocBufferSize);
 
-  arrow::Status updateValidityBuffers(uint32_t partitionId, uint16_t newSize);
+  arrow::Status updateValidityBuffers(uint32_t partitionId, uint32_t newSize);
 
   arrow::Result<std::shared_ptr<arrow::ResizableBuffer>>
-  allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint16_t newSize);
+  allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint32_t newSize);
 
-  arrow::Status allocatePartitionBuffer(uint32_t partitionId, uint16_t 
newSize);
+  arrow::Status allocatePartitionBuffer(uint32_t partitionId, uint32_t 
newSize);
 
   arrow::Status splitFixedWidthValueBuffer(const facebook::velox::RowVector& 
rv);
 
@@ -290,11 +290,11 @@ class VeloxShuffleWriter final : public ShuffleWriter {
   // Resize the partition buffer to newSize. If preserveData is true, it will 
keep the data in buffer.
   // Note when preserveData is false, and newSize is larger, this function can 
introduce unnecessary memory copy.
   // In this case, use allocatePartitionBuffer to free current buffers and 
allocate new buffers instead.
-  arrow::Status resizePartitionBuffer(uint32_t partitionId, uint16_t newSize, 
bool preserveData);
+  arrow::Status resizePartitionBuffer(uint32_t partitionId, uint32_t newSize, 
bool preserveData);
 
-  uint64_t valueBufferSizeForBinaryArray(uint32_t binaryIdx, uint16_t newSize);
+  uint64_t valueBufferSizeForBinaryArray(uint32_t binaryIdx, uint32_t newSize);
 
-  uint64_t valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex, 
uint16_t newSize);
+  uint64_t valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex, 
uint32_t newSize);
 
   void calculateSimpleColumnBytes();
 
@@ -363,7 +363,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {
   // subscript: Partition ID
   // value: How many rows does this partition have in the current input 
RowVector
   // Updated for each input RowVector.
-  std::vector<uint16_t> partition2RowCount_;
+  std::vector<uint32_t> partition2RowCount_;
 
   // Note: partition2RowOffsetBase_ and rowOffset2RowId_ are the optimization 
of flattening the 2-dimensional vector
   // into single dimension.
@@ -379,20 +379,20 @@ class VeloxShuffleWriter final : public ShuffleWriter {
   // subscript: Partition ID
   // value: The base row offset of this Partition
   // Updated for each input RowVector.
-  std::vector<uint16_t> partition2RowOffsetBase_;
+  std::vector<uint32_t> partition2RowOffsetBase_;
 
   // Row offset -> Source row ID, elements num: input RowVector row num
   // subscript: Row offset
   // value: The index of row in the current input RowVector
   // Updated for each input RowVector.
-  std::vector<uint16_t> rowOffset2RowId_;
+  std::vector<uint32_t> rowOffset2RowId_;
 
   // Partition buffers are used for holding the intermediate data during split.
   // Partition ID -> Partition buffer size(unit is row)
-  std::vector<uint16_t> partitionBufferSize_;
+  std::vector<uint32_t> partitionBufferSize_;
 
   // The write position of partition buffer. Updated after split. Reset when 
partition buffers are reallocated.
-  std::vector<uint16_t> partitionBufferBase_;
+  std::vector<uint32_t> partitionBufferBase_;
 
   // Used by all simple types. Stores raw pointers of partition buffers.
   std::vector<std::vector<uint8_t*>> partitionValidityAddrs_;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to