This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b8f8154a6 [VL] Refine evict logic in sort shuffle writer (#5786)
b8f8154a6 is described below

commit b8f8154a65b02ed10980563b861bf1b273c41691
Author: Rong Ma <[email protected]>
AuthorDate: Fri May 17 22:26:19 2024 +0800

    [VL] Refine evict logic in sort shuffle writer (#5786)
---
 cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc | 124 ++++++++---------------
 cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h  |   6 +-
 2 files changed, 44 insertions(+), 86 deletions(-)

diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
index bd56bc62e..2a6bca8c0 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
@@ -15,47 +15,20 @@
  * limitations under the License.
  */
 
-#include "VeloxSortBasedShuffleWriter.h"
-#include "memory/ArrowMemory.h"
+#include "shuffle/VeloxSortBasedShuffleWriter.h"
 #include "memory/VeloxColumnarBatch.h"
 #include "memory/VeloxMemoryManager.h"
 #include "shuffle/ShuffleSchema.h"
 #include "utils/Common.h"
 #include "utils/VeloxArrowUtils.h"
 #include "utils/macros.h"
+
 #include "velox/common/base/Nulls.h"
 #include "velox/type/Type.h"
 #include "velox/vector/ComplexVector.h"
 
-#if defined(__x86_64__)
-#include <immintrin.h>
-#include <x86intrin.h>
-#elif defined(__aarch64__)
-#include <arm_neon.h>
-#endif
-
 namespace gluten {
 
-#define VELOX_SHUFFLE_WRITER_LOG_FLAG 0
-
-// macro to rotate left an 8-bit value 'x' given the shift 's' is a 32-bit 
integer
-// (x is left shifted by 's' modulo 8) OR (x right shifted by (8 - 's' modulo 
8))
-#if !defined(__x86_64__)
-#define rotateLeft(x, s) (x << (s - ((s >> 3) << 3)) | x >> (8 - (s - ((s >> 
3) << 3))))
-#endif
-
-// on x86 machines, _MM_HINT_T0,T1,T2 are defined as 1, 2, 3
-// equivalent mapping to __builtin_prefetch hints is 3, 2, 1
-#if defined(__x86_64__)
-#define PREFETCHT0(ptr) _mm_prefetch(ptr, _MM_HINT_T0)
-#define PREFETCHT1(ptr) _mm_prefetch(ptr, _MM_HINT_T1)
-#define PREFETCHT2(ptr) _mm_prefetch(ptr, _MM_HINT_T2)
-#else
-#define PREFETCHT0(ptr) __builtin_prefetch(ptr, 0, 3)
-#define PREFETCHT1(ptr) __builtin_prefetch(ptr, 0, 2)
-#define PREFETCHT2(ptr) __builtin_prefetch(ptr, 0, 1)
-#endif
-
 arrow::Result<std::shared_ptr<VeloxShuffleWriter>> 
VeloxSortBasedShuffleWriter::create(
     uint32_t numPartitions,
     std::unique_ptr<PartitionWriter> partitionWriter,
@@ -154,83 +127,71 @@ arrow::Status 
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxSortBasedShuffleWriter::evictBatch(uint32_t partitionId, 
facebook::velox::RowTypePtr* rowTypePtr) {
+arrow::Status VeloxSortBasedShuffleWriter::evictBatch(uint32_t partitionId) {
   int64_t rawSize = batch_->size();
   bufferOutputStream_->seekp(0);
   batch_->flush(bufferOutputStream_.get());
   auto buffer = bufferOutputStream_->getBuffer();
   RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize, 
buffer->as<char>(), buffer->size()));
   batch_ = 
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), 
serde_.get());
-  batch_->createStreamTree(*rowTypePtr, options_.bufferSize, &serdeOptions_);
+  batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_);
   return arrow::Status::OK();
 }
 
 arrow::Status VeloxSortBasedShuffleWriter::evictRowVector(uint32_t 
partitionId) {
-  int32_t rowNum = 0;
-  const int32_t maxBatchNum = options_.bufferSize;
-  auto rowTypePtr = std::static_pointer_cast<const 
facebook::velox::RowType>(rowType_.value());
+  int32_t accumulatedRows = 0;
+  const int32_t maxRowsPerBatch = options_.bufferSize;
 
   if (options_.partitioning != Partitioning::kSingle) {
     if (auto it = rowVectorIndexMap_.find(partitionId); it != 
rowVectorIndexMap_.end()) {
-      auto rowVectorIndex = it->second;
-      const int32_t outputSize = rowVectorIndex.size();
+      const auto& rowIndices = it->second;
+      VELOX_DCHECK(!rowIndices.empty())
 
-      std::map<int32_t, std::vector<facebook::velox::IndexRange>> 
groupedIndices;
-      std::map<int32_t, int64_t> groupedSize;
+      size_t idx = 0;
+      const auto outputSize = rowIndices.size();
+      while (idx < outputSize) {
+        auto combinedRowIndex = rowIndices[idx];
+        auto inputVectorIndex = static_cast<int32_t>(combinedRowIndex >> 32);
+        auto startRow = static_cast<int32_t>(combinedRowIndex & 0xFFFFFFFFLL);
 
-      int32_t tempVectorIndex = -1;
-      int32_t baseRowIndex = -1;
-      int32_t tempRowIndex = -1;
-      int32_t size = 1;
-      for (int start = 0; start < outputSize; start++) {
-        const int64_t rowVector = rowVectorIndex[start];
-        const int32_t vectorIndex = static_cast<int32_t>(rowVector >> 32);
-        const int32_t rowIndex = static_cast<int32_t>(rowVector & 
0xFFFFFFFFLL);
-        if (tempVectorIndex == -1) {
-          tempVectorIndex = vectorIndex;
-          baseRowIndex = rowIndex;
-          tempRowIndex = rowIndex;
-        } else {
-          if (vectorIndex == tempVectorIndex && rowIndex == tempRowIndex + 1) {
-            size += 1;
-            tempRowIndex = rowIndex;
+        int32_t numRowsInRange = 1;
+        std::vector<facebook::velox::IndexRange> groupedIndices;
+
+        while (++idx < outputSize && (rowIndices[idx] >> 32) == 
inputVectorIndex) {
+          auto row = static_cast<int32_t>(rowIndices[idx] & 0xFFFFFFFFLL);
+          if (row == startRow + numRowsInRange) {
+            numRowsInRange++;
           } else {
-            groupedIndices[tempVectorIndex].push_back({baseRowIndex, size});
-            groupedSize[tempVectorIndex] += size;
-            size = 1;
-            tempVectorIndex = vectorIndex;
-            baseRowIndex = rowIndex;
-            tempRowIndex = rowIndex;
+            groupedIndices.push_back({startRow, numRowsInRange});
+            accumulatedRows += numRowsInRange;
+            startRow = row;
+            numRowsInRange = 1;
           }
         }
-      }
-      groupedIndices[tempVectorIndex].push_back({baseRowIndex, size});
-      groupedSize[tempVectorIndex] += size;
+        groupedIndices.push_back({startRow, numRowsInRange});
+        batch_->append(batches_[inputVectorIndex], groupedIndices);
 
-      for (auto& pair : groupedIndices) {
-        batch_->append(batches_[pair.first], pair.second);
-        rowNum += groupedSize[pair.first];
-        if (rowNum >= maxBatchNum) {
-          rowNum = 0;
-          RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
+        accumulatedRows += numRowsInRange;
+        // Check whether to evict the data after gathering all rows from one 
input RowVector.
+        if (accumulatedRows >= maxRowsPerBatch) {
+          RETURN_NOT_OK(evictBatch(partitionId));
+          accumulatedRows = 0;
         }
       }
-
-      rowVectorIndex.clear();
       rowVectorIndexMap_.erase(partitionId);
     }
   } else {
     for (facebook::velox::RowVectorPtr rowVectorPtr : batches_) {
-      rowNum += rowVectorPtr->size();
       batch_->append(rowVectorPtr);
-      if (rowNum >= maxBatchNum) {
-        RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
-        rowNum = 0;
+      accumulatedRows += rowVectorPtr->size();
+      if (accumulatedRows >= maxRowsPerBatch) {
+        RETURN_NOT_OK(evictBatch(partitionId));
+        accumulatedRows = 0;
       }
     }
   }
-  if (rowNum > 0) {
-    RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
+  if (accumulatedRows > 0) {
+    RETURN_NOT_OK(evictBatch(partitionId));
   }
   return arrow::Status::OK();
 }
@@ -255,15 +216,12 @@ arrow::Status VeloxSortBasedShuffleWriter::stop() {
 }
 
 arrow::Status VeloxSortBasedShuffleWriter::initFromRowVector(const 
facebook::velox::RowVector& rv) {
-  if (!rowType_.has_value()) {
-    rowType_ = rv.type();
+  if (!rowType_) {
+    rowType_ = facebook::velox::asRowType(rv.type());
     serdeOptions_ = {
         false, 
facebook::velox::common::stringToCompressionKind(partitionWriter_->options().compressionTypeStr)};
     batch_ = 
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), 
serde_.get());
-    batch_->createStreamTree(
-        std::static_pointer_cast<const 
facebook::velox::RowType>(rowType_.value()),
-        options_.bufferSize,
-        &serdeOptions_);
+    batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_);
   }
   return arrow::Status::OK();
 }
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
index 710590184..417d5e926 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
@@ -66,8 +66,6 @@ class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter 
{
 
   arrow::Status evictRowVector(uint32_t partitionId) override;
 
-  arrow::Status evictBatch(uint32_t partitionId, facebook::velox::RowTypePtr* 
rowTypePtr);
-
  private:
   VeloxSortBasedShuffleWriter(
       uint32_t numPartitions,
@@ -85,9 +83,11 @@ class VeloxSortBasedShuffleWriter : public 
VeloxShuffleWriter {
 
   arrow::Status doSort(facebook::velox::RowVectorPtr rv, int64_t memLimit);
 
+  arrow::Status evictBatch(uint32_t partitionId);
+
   void stat() const;
 
-  std::optional<facebook::velox::TypePtr> rowType_;
+  facebook::velox::RowTypePtr rowType_;
 
   std::unique_ptr<facebook::velox::VectorStreamGroup> batch_;
   std::unique_ptr<BufferOutputStream> bufferOutputStream_;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to