This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b8f8154a6 [VL] Refine evict logic in sort shuffle writer (#5786)
b8f8154a6 is described below
commit b8f8154a65b02ed10980563b861bf1b273c41691
Author: Rong Ma <[email protected]>
AuthorDate: Fri May 17 22:26:19 2024 +0800
[VL] Refine evict logic in sort shuffle writer (#5786)
---
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc | 124 ++++++++---------------
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h | 6 +-
2 files changed, 44 insertions(+), 86 deletions(-)
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
index bd56bc62e..2a6bca8c0 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
@@ -15,47 +15,20 @@
* limitations under the License.
*/
-#include "VeloxSortBasedShuffleWriter.h"
-#include "memory/ArrowMemory.h"
+#include "shuffle/VeloxSortBasedShuffleWriter.h"
#include "memory/VeloxColumnarBatch.h"
#include "memory/VeloxMemoryManager.h"
#include "shuffle/ShuffleSchema.h"
#include "utils/Common.h"
#include "utils/VeloxArrowUtils.h"
#include "utils/macros.h"
+
#include "velox/common/base/Nulls.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
-#if defined(__x86_64__)
-#include <immintrin.h>
-#include <x86intrin.h>
-#elif defined(__aarch64__)
-#include <arm_neon.h>
-#endif
-
namespace gluten {
-#define VELOX_SHUFFLE_WRITER_LOG_FLAG 0
-
-// macro to rotate left an 8-bit value 'x' given the shift 's' is a 32-bit
integer
-// (x is left shifted by 's' modulo 8) OR (x right shifted by (8 - 's' modulo
8))
-#if !defined(__x86_64__)
-#define rotateLeft(x, s) (x << (s - ((s >> 3) << 3)) | x >> (8 - (s - ((s >>
3) << 3))))
-#endif
-
-// on x86 machines, _MM_HINT_T0,T1,T2 are defined as 1, 2, 3
-// equivalent mapping to __builtin_prefetch hints is 3, 2, 1
-#if defined(__x86_64__)
-#define PREFETCHT0(ptr) _mm_prefetch(ptr, _MM_HINT_T0)
-#define PREFETCHT1(ptr) _mm_prefetch(ptr, _MM_HINT_T1)
-#define PREFETCHT2(ptr) _mm_prefetch(ptr, _MM_HINT_T2)
-#else
-#define PREFETCHT0(ptr) __builtin_prefetch(ptr, 0, 3)
-#define PREFETCHT1(ptr) __builtin_prefetch(ptr, 0, 2)
-#define PREFETCHT2(ptr) __builtin_prefetch(ptr, 0, 1)
-#endif
-
arrow::Result<std::shared_ptr<VeloxShuffleWriter>>
VeloxSortBasedShuffleWriter::create(
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
@@ -154,83 +127,71 @@ arrow::Status
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
return arrow::Status::OK();
}
-arrow::Status VeloxSortBasedShuffleWriter::evictBatch(uint32_t partitionId,
facebook::velox::RowTypePtr* rowTypePtr) {
+arrow::Status VeloxSortBasedShuffleWriter::evictBatch(uint32_t partitionId) {
int64_t rawSize = batch_->size();
bufferOutputStream_->seekp(0);
batch_->flush(bufferOutputStream_.get());
auto buffer = bufferOutputStream_->getBuffer();
RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize,
buffer->as<char>(), buffer->size()));
batch_ =
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(),
serde_.get());
- batch_->createStreamTree(*rowTypePtr, options_.bufferSize, &serdeOptions_);
+ batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_);
return arrow::Status::OK();
}
arrow::Status VeloxSortBasedShuffleWriter::evictRowVector(uint32_t
partitionId) {
- int32_t rowNum = 0;
- const int32_t maxBatchNum = options_.bufferSize;
- auto rowTypePtr = std::static_pointer_cast<const
facebook::velox::RowType>(rowType_.value());
+ int32_t accumulatedRows = 0;
+ const int32_t maxRowsPerBatch = options_.bufferSize;
if (options_.partitioning != Partitioning::kSingle) {
if (auto it = rowVectorIndexMap_.find(partitionId); it !=
rowVectorIndexMap_.end()) {
- auto rowVectorIndex = it->second;
- const int32_t outputSize = rowVectorIndex.size();
+ const auto& rowIndices = it->second;
+ VELOX_DCHECK(!rowIndices.empty())
- std::map<int32_t, std::vector<facebook::velox::IndexRange>>
groupedIndices;
- std::map<int32_t, int64_t> groupedSize;
+ size_t idx = 0;
+ const auto outputSize = rowIndices.size();
+ while (idx < outputSize) {
+ auto combinedRowIndex = rowIndices[idx];
+ auto inputVectorIndex = static_cast<int32_t>(combinedRowIndex >> 32);
+ auto startRow = static_cast<int32_t>(combinedRowIndex & 0xFFFFFFFFLL);
- int32_t tempVectorIndex = -1;
- int32_t baseRowIndex = -1;
- int32_t tempRowIndex = -1;
- int32_t size = 1;
- for (int start = 0; start < outputSize; start++) {
- const int64_t rowVector = rowVectorIndex[start];
- const int32_t vectorIndex = static_cast<int32_t>(rowVector >> 32);
- const int32_t rowIndex = static_cast<int32_t>(rowVector &
0xFFFFFFFFLL);
- if (tempVectorIndex == -1) {
- tempVectorIndex = vectorIndex;
- baseRowIndex = rowIndex;
- tempRowIndex = rowIndex;
- } else {
- if (vectorIndex == tempVectorIndex && rowIndex == tempRowIndex + 1) {
- size += 1;
- tempRowIndex = rowIndex;
+ int32_t numRowsInRange = 1;
+ std::vector<facebook::velox::IndexRange> groupedIndices;
+
+ while (++idx < outputSize && (rowIndices[idx] >> 32) ==
inputVectorIndex) {
+ auto row = static_cast<int32_t>(rowIndices[idx] & 0xFFFFFFFFLL);
+ if (row == startRow + numRowsInRange) {
+ numRowsInRange++;
} else {
- groupedIndices[tempVectorIndex].push_back({baseRowIndex, size});
- groupedSize[tempVectorIndex] += size;
- size = 1;
- tempVectorIndex = vectorIndex;
- baseRowIndex = rowIndex;
- tempRowIndex = rowIndex;
+ groupedIndices.push_back({startRow, numRowsInRange});
+ accumulatedRows += numRowsInRange;
+ startRow = row;
+ numRowsInRange = 1;
}
}
- }
- groupedIndices[tempVectorIndex].push_back({baseRowIndex, size});
- groupedSize[tempVectorIndex] += size;
+ groupedIndices.push_back({startRow, numRowsInRange});
+ batch_->append(batches_[inputVectorIndex], groupedIndices);
- for (auto& pair : groupedIndices) {
- batch_->append(batches_[pair.first], pair.second);
- rowNum += groupedSize[pair.first];
- if (rowNum >= maxBatchNum) {
- rowNum = 0;
- RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
+ accumulatedRows += numRowsInRange;
+ // Check whether to evict the data after gathering all rows from one
input RowVector.
+ if (accumulatedRows >= maxRowsPerBatch) {
+ RETURN_NOT_OK(evictBatch(partitionId));
+ accumulatedRows = 0;
}
}
-
- rowVectorIndex.clear();
rowVectorIndexMap_.erase(partitionId);
}
} else {
for (facebook::velox::RowVectorPtr rowVectorPtr : batches_) {
- rowNum += rowVectorPtr->size();
batch_->append(rowVectorPtr);
- if (rowNum >= maxBatchNum) {
- RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
- rowNum = 0;
+ accumulatedRows += rowVectorPtr->size();
+ if (accumulatedRows >= maxRowsPerBatch) {
+ RETURN_NOT_OK(evictBatch(partitionId));
+ accumulatedRows = 0;
}
}
}
- if (rowNum > 0) {
- RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
+ if (accumulatedRows > 0) {
+ RETURN_NOT_OK(evictBatch(partitionId));
}
return arrow::Status::OK();
}
@@ -255,15 +216,12 @@ arrow::Status VeloxSortBasedShuffleWriter::stop() {
}
arrow::Status VeloxSortBasedShuffleWriter::initFromRowVector(const
facebook::velox::RowVector& rv) {
- if (!rowType_.has_value()) {
- rowType_ = rv.type();
+ if (!rowType_) {
+ rowType_ = facebook::velox::asRowType(rv.type());
serdeOptions_ = {
false,
facebook::velox::common::stringToCompressionKind(partitionWriter_->options().compressionTypeStr)};
batch_ =
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(),
serde_.get());
- batch_->createStreamTree(
- std::static_pointer_cast<const
facebook::velox::RowType>(rowType_.value()),
- options_.bufferSize,
- &serdeOptions_);
+ batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_);
}
return arrow::Status::OK();
}
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
index 710590184..417d5e926 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
@@ -66,8 +66,6 @@ class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter
{
arrow::Status evictRowVector(uint32_t partitionId) override;
- arrow::Status evictBatch(uint32_t partitionId, facebook::velox::RowTypePtr*
rowTypePtr);
-
private:
VeloxSortBasedShuffleWriter(
uint32_t numPartitions,
@@ -85,9 +83,11 @@ class VeloxSortBasedShuffleWriter : public
VeloxShuffleWriter {
arrow::Status doSort(facebook::velox::RowVectorPtr rv, int64_t memLimit);
+ arrow::Status evictBatch(uint32_t partitionId);
+
void stat() const;
- std::optional<facebook::velox::TypePtr> rowType_;
+ facebook::velox::RowTypePtr rowType_;
std::unique_ptr<facebook::velox::VectorStreamGroup> batch_;
std::unique_ptr<BufferOutputStream> bufferOutputStream_;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]