This is an automated email from the ASF dual-hosted git repository.
kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5f56f0d81e [GLUTEN-10200][VL] Fix estimating row vector size logic of
rss shuffle writer (#10235)
5f56f0d81e is described below
commit 5f56f0d81e9a413eb0ffa4dfd0a09b4f18b03e50
Author: Jaime Pan <[email protected]>
AuthorDate: Tue Jul 22 17:22:25 2025 +0800
[GLUTEN-10200][VL] Fix estimating row vector size logic of rss shuffle
writer (#10235)
---
cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc | 51 +++++++-
cpp/velox/shuffle/VeloxRssSortShuffleWriter.h | 14 +++
cpp/velox/tests/CMakeLists.txt | 3 +
cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc | 144 +++++++++++++++++++++++
4 files changed, 209 insertions(+), 3 deletions(-)
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index 98b4b95ef4..61367d9015 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -26,6 +26,7 @@
#include "velox/common/base/Nulls.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
+#include "velox/vector/VectorEncoding.h"
namespace gluten {
@@ -51,14 +52,13 @@ arrow::Status VeloxRssSortShuffleWriter::init() {
}
arrow::Status VeloxRssSortShuffleWriter::doSort(facebook::velox::RowVectorPtr
rv, int64_t memLimit) {
- currentInputColumnBytes_ += rv->estimateFlatSize();
+ calculateBatchesSize(rv);
batches_.push_back(rv);
if (currentInputColumnBytes_ > memLimit) {
for (auto pid = 0; pid < numPartitions(); ++pid) {
RETURN_NOT_OK(evictRowVector(pid));
}
- batches_.clear();
- currentInputColumnBytes_ = 0;
+ resetBatches();
}
setSortState(RssSortState::kSortInit);
return arrow::Status::OK();
@@ -255,4 +255,49 @@ void VeloxRssSortShuffleWriter::setSortState(RssSortState
state) {
sortState_ = state;
}
+void VeloxRssSortShuffleWriter::calculateBatchesSize(const
facebook::velox::RowVectorPtr& vector) {
+ currentInputColumnBytes_ += vector->retainedSize();
+ for (auto& child : vector->children()) {
+ deduplicateStrBuffer(child);
+ }
+}
+
+void VeloxRssSortShuffleWriter::deduplicateStrBuffer(const
facebook::velox::VectorPtr& vector) {
+ switch (vector->encoding()) {
+ case facebook::velox::VectorEncoding::Simple::FLAT:
+ if ((vector->type()->isVarchar() || vector->type()->isVarbinary())) {
+ for (auto& buffer :
vector->asFlatVector<facebook::velox::StringView>()->stringBuffers()) {
+ if (!stringBuffers_.insert(buffer.get()).second) {
+ currentInputColumnBytes_ -= buffer->capacity();
+ }
+ }
+ }
+ break;
+ case facebook::velox::VectorEncoding::Simple::MAP:
+
deduplicateStrBuffer(vector->asUnchecked<facebook::velox::MapVector>()->mapKeys());
+
deduplicateStrBuffer(vector->asUnchecked<facebook::velox::MapVector>()->mapValues());
+ break;
+ case facebook::velox::VectorEncoding::Simple::ROW:
+ for (auto& child :
vector->asUnchecked<facebook::velox::RowVector>()->children()) {
+ deduplicateStrBuffer(child);
+ }
+ break;
+ case facebook::velox::VectorEncoding::Simple::ARRAY:
+
deduplicateStrBuffer(vector->asUnchecked<facebook::velox::ArrayVector>()->elements());
+ break;
+ default:
+ VELOX_FAIL("The encoding of flatten vector should not be " +
mapSimpleToName(vector->encoding()));
+ }
+}
+
+uint32_t VeloxRssSortShuffleWriter::getInputColumnBytes() const {
+ return currentInputColumnBytes_;
+}
+
+void VeloxRssSortShuffleWriter::resetBatches() {
+ batches_.clear();
+ currentInputColumnBytes_ = 0;
+ stringBuffers_.clear();
+}
+
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
index ec35609103..dbb5051bbd 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
@@ -66,6 +66,12 @@ class VeloxRssSortShuffleWriter final : public
VeloxShuffleWriter {
arrow::Status evictRowVector(uint32_t partitionId) override;
+ // Only for test.
+ uint32_t getInputColumnBytes() const;
+
+ // Public for test.
+ void resetBatches();
+
private:
VeloxRssSortShuffleWriter(
uint32_t numPartitions,
@@ -89,6 +95,10 @@ class VeloxRssSortShuffleWriter final : public
VeloxShuffleWriter {
void stat() const;
+ void calculateBatchesSize(const facebook::velox::RowVectorPtr& vector);
+
+ void deduplicateStrBuffer(const facebook::velox::VectorPtr& vector);
+
int32_t splitBufferSize_;
int64_t sortBufferMaxSize_;
facebook::velox::common::CompressionKind compressionKind_;
@@ -108,6 +118,10 @@ class VeloxRssSortShuffleWriter final : public
VeloxShuffleWriter {
uint32_t currentInputColumnBytes_ = 0;
RssSortState sortState_{kSortInit};
+
+ // The existing string buffers in the current batches.
+ folly::F14FastSet<facebook::velox::Buffer*> stringBuffers_;
+
bool stopped_{false};
}; // class VeloxSortBasedShuffleWriter
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index a6d04822b6..3f95bf3121 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -39,6 +39,9 @@ add_velox_test(velox_shuffle_writer_test SOURCES
VeloxShuffleWriterTest.cc)
add_velox_test(velox_shuffle_writer_spill_test SOURCES
VeloxShuffleWriterSpillTest.cc)
+add_velox_test(velox_rss_sort_shuffle_writer_test SOURCES
+ VeloxRssSortShuffleWriterTest.cc)
+
# TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc)
add_velox_test(
velox_operators_test
diff --git a/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
b/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
new file mode 100644
index 0000000000..a787c78039
--- /dev/null
+++ b/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <arrow/util/compression.h>
+
+#include "config/GlutenConfig.h"
+#include "memory/VeloxMemoryManager.h"
+#include "shuffle/VeloxRssSortShuffleWriter.h"
+#include "tests/VeloxShuffleWriterTestBase.h"
+#include "utils/TestAllocationListener.h"
+#include "utils/TestUtils.h"
+
+#include "velox/buffer/Buffer.h"
+#include "velox/type/Type.h"
+#include "velox/vector/tests/VectorTestUtils.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+
+#include <cstdint>
+
+using namespace facebook::velox;
+using namespace facebook::velox::test;
+
+namespace gluten {
+
+class VeloxRssSortShuffleWriterTest : public VeloxShuffleWriterTestBase,
public testing::Test {
+ protected:
+ static void SetUpTestSuite() {
+ setUpVeloxBackend();
+ }
+
+ static void TearDownTestSuite() {
+ tearDownVeloxBackend();
+ }
+
+ void SetUp() override {
+ setUpTestData();
+ }
+
+ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) {
+ auto options = std::make_shared<RssPartitionWriterOptions>();
+ auto writerOptions = std::make_shared<RssSortShuffleWriterOptions>();
+ auto rssClient = std::make_unique<LocalRssClient>(dataFile_);
+ std::unique_ptr<arrow::util::Codec> codec;
+ if (writerOptions->compressionType ==
arrow::Compression::type::UNCOMPRESSED) {
+ codec = nullptr;
+ } else {
+ GLUTEN_ASSIGN_OR_THROW(codec,
arrow::util::Codec::Create(writerOptions->compressionType));
+ }
+ auto partitionWriter = std::make_shared<RssPartitionWriter>(
+ numPartitions, std::move(codec), getDefaultMemoryManager(), options,
std::move(rssClient));
+ GLUTEN_ASSIGN_OR_THROW(
+ auto shuffleWriter,
+ VeloxRssSortShuffleWriter::create(
+ numPartitions, std::move(partitionWriter),
std::move(writerOptions), getDefaultMemoryManager()));
+ return shuffleWriter;
+ }
+};
+
+TEST_F(VeloxRssSortShuffleWriterTest, calculateBatchesSize) {
+ auto shuffleWriter =
std::dynamic_pointer_cast<VeloxRssSortShuffleWriter>(createShuffleWriter(10));
+ // Do not trigger resetBatches by shuffle writer.
+ const int64_t memLimit = INT64_MAX;
+
+ // Shared string buffer in FlatVector<StringView>.
+ BufferPtr strBuffer = AlignedBuffer::allocate<char>(200, pool());
+ auto vector1 = BaseVector::create<FlatVector<StringView>>(VARCHAR(), 100,
pool());
+ vector1->setStringBuffers({strBuffer});
+ auto vector2 = BaseVector::create<FlatVector<StringView>>(VARCHAR(), 100,
pool());
+ vector2->setStringBuffers({strBuffer});
+ auto vector3 = BaseVector::create<FlatVector<StringView>>(VARCHAR(), 100,
pool());
+ vector3->setStringBuffers({strBuffer});
+ auto vector4 = BaseVector::create<FlatVector<int64_t>>(INTEGER(), 100,
pool());
+
+ auto rowVector1 = makeRowVector({vector1, vector2});
+ auto rowVector2 = makeRowVector({vector3, vector4});
+ std::shared_ptr<ColumnarBatch> cb1 =
std::make_shared<VeloxColumnarBatch>(rowVector1);
+ std::shared_ptr<ColumnarBatch> cb2 =
std::make_shared<VeloxColumnarBatch>(rowVector2);
+
+ ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
+ ASSERT_NOT_OK(shuffleWriter->write(cb2, memLimit));
+ auto expectedSize = rowVector1->retainedSize() + rowVector2->retainedSize()
- strBuffer->capacity() * 2;
+ EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
+ shuffleWriter->resetBatches();
+
+ // Shared string buffer in ArrayVector.
+ BufferPtr offsets = allocateOffsets(1, vector1->pool());
+ BufferPtr sizes = allocateOffsets(1, vector1->pool());
+ sizes->asMutable<vector_size_t>()[0] = vector1->size();
+
+ auto arrayVector =
+ std::make_shared<facebook::velox::ArrayVector>(pool(), ARRAY(VARCHAR()),
nullptr, 1, offsets, sizes, vector1);
+ auto rowVector3 = makeRowVector({arrayVector, vector1});
+ cb1 = std::make_shared<VeloxColumnarBatch>(rowVector3);
+ ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
+ expectedSize = rowVector3->retainedSize() - strBuffer->capacity();
+ EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
+ shuffleWriter->resetBatches();
+
+ // Shared string buffer in MapVector.
+ auto keys = vector1;
+ auto values = vector2;
+ auto mapVector = makeMapVector({0, 10, 20, 50}, keys, values);
+ auto rowVector4 = makeRowVector({mapVector, vector3});
+ cb1 = std::make_shared<VeloxColumnarBatch>(rowVector4);
+ ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
+ expectedSize = rowVector4->retainedSize() - strBuffer->capacity() * 2;
+ EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
+ shuffleWriter->resetBatches();
+
+ // Shared string buffer in RowVector.
+ auto rowVector5 = makeRowVector({rowVector1, vector3});
+ cb1 = std::make_shared<VeloxColumnarBatch>(rowVector5);
+ ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
+ expectedSize = rowVector5->retainedSize() - strBuffer->capacity() * 2;
+ EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
+ shuffleWriter->resetBatches();
+
+ // Vector is not flatten.
+ auto dictionaryVector = BaseVector::wrapInDictionary(
+ BufferPtr(nullptr),
+ makeIndices(vector1->size(), [](vector_size_t row) { return row; }),
+ vector1->size(),
+ vector1);
+ auto rowVector6 = makeRowVector({dictionaryVector});
+ cb1 = std::make_shared<VeloxColumnarBatch>(rowVector6);
+ ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
+ EXPECT_EQ(rowVector6->retainedSize(), shuffleWriter->getInputColumnBytes());
+}
+
+} // namespace gluten
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]