This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f7971f6c16  [VL] Sort shuffle writer use vectorized c2r (#6782)
f7971f6c16 is described below

commit f7971f6c16d8ae51bed984632afa735fe4c7c585
Author: Rong Ma <[email protected]>
AuthorDate: Thu Nov 7 09:53:54 2024 +0800

     [VL] Sort shuffle writer use vectorized c2r (#6782)
---
 cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 29 ++++++++++++++++++-----------
 cpp/velox/shuffle/VeloxSortShuffleWriter.h  |  9 +++++++--
 2 files changed, 25 insertions(+), 13 deletions(-)

diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index f87eaabb56..ab37c0be74 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -180,7 +180,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const 
facebook::velox::RowVectorPtr
     rowSize_.resize(inputRows, *fixedRowSize_);
   }
 
-  uint32_t rowOffset = 0;
+  facebook::velox::vector_size_t rowOffset = 0;
   while (rowOffset < inputRows) {
     auto remainingRows = inputRows - rowOffset;
     auto rows = maxRowsToInsert(rowOffset, remainingRows);
@@ -201,18 +201,23 @@ arrow::Status VeloxSortShuffleWriter::insert(const 
facebook::velox::RowVectorPtr
   return arrow::Status::OK();
 }
 
-void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, 
uint32_t offset, uint32_t rows) {
+void VeloxSortShuffleWriter::insertRows(
+    facebook::velox::row::CompactRow& compact,
+    facebook::velox::vector_size_t offset,
+    facebook::velox::vector_size_t size) {
   VELOX_CHECK(!pages_.empty());
-  for (auto i = offset; i < offset + rows; ++i) {
-    auto pid = row2Partition_[i];
+  std::vector<size_t> offsets(size);
+  for (auto i = 0; i < size; ++i) {
+    auto row = offset + i;
+    auto pid = row2Partition_[row];
     arrayPtr_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_);
     // size(RowSize) | bytes
-    memcpy(currentPage_ + pageCursor_, &rowSize_[i], sizeof(RowSizeType));
-    pageCursor_ += sizeof(RowSizeType);
-    auto size = row.serialize(i, currentPage_ + pageCursor_);
-    pageCursor_ += size;
+    memcpy(currentPage_ + pageCursor_, &rowSize_[row], sizeof(RowSizeType));
+    offsets[i] = pageCursor_ + sizeof(RowSizeType);
+    pageCursor_ += rowSize_[row];
     VELOX_DCHECK_LE(pageCursor_, currenPageSize_);
   }
+  compact.serialize(offset, size, offsets.data(), currentPage_);
 }
 
 arrow::Status VeloxSortShuffleWriter::maybeSpill(uint32_t nextRows) {
@@ -337,19 +342,21 @@ VeloxSortShuffleWriter::evictPartition0(uint32_t 
partitionId, int32_t numRows, u
   return arrow::Status::OK();
 }
 
-uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t 
remainingRows) {
+facebook::velox::vector_size_t VeloxSortShuffleWriter::maxRowsToInsert(
+    facebook::velox::vector_size_t offset,
+    facebook::velox::vector_size_t remainingRows) {
   // Check how many rows can be handled.
   if (pages_.empty()) {
     return 0;
   }
   auto remainingBytes = pages_.back()->size() - pageCursor_;
   if (fixedRowSize_) {
-    return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), 
remainingRows);
+    return std::min((facebook::velox::vector_size_t)(remainingBytes / 
(fixedRowSize_.value())), remainingRows);
   }
   auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
   auto bytesWritten = rowSizePrefixSum_[offset];
   auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), 
remainingBytes + bytesWritten);
-  return iter - beginIter;
+  return (facebook::velox::vector_size_t)(iter - beginIter);
 }
 
 void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t memLimit, uint64_t 
minSizeRequired) {
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 531ed1fe3e..5b8cff452d 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -69,7 +69,10 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   arrow::Status insert(const facebook::velox::RowVectorPtr& vector, int64_t 
memLimit);
 
-  void insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, 
uint32_t rows);
+  void insertRows(
+      facebook::velox::row::CompactRow& compact,
+      facebook::velox::vector_size_t offset,
+      facebook::velox::vector_size_t size);
 
   arrow::Status maybeSpill(uint32_t nextRows);
 
@@ -79,7 +82,9 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   arrow::Status evictPartition0(uint32_t partitionId, int32_t numRows, 
uint8_t* buffer, int64_t rawLength);
 
-  uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows);
+  facebook::velox::vector_size_t maxRowsToInsert(
+      facebook::velox::vector_size_t offset,
+      facebook::velox::vector_size_t remainingRows);
 
   void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to