This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f56f0d81e [GLUTEN-10200][VL] Fix estimating row vector size logic of 
rss shuffle writer (#10235)
5f56f0d81e is described below

commit 5f56f0d81e9a413eb0ffa4dfd0a09b4f18b03e50
Author: Jaime Pan <[email protected]>
AuthorDate: Tue Jul 22 17:22:25 2025 +0800

    [GLUTEN-10200][VL] Fix estimating row vector size logic of rss shuffle 
writer (#10235)
---
 cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc   |  51 +++++++-
 cpp/velox/shuffle/VeloxRssSortShuffleWriter.h    |  14 +++
 cpp/velox/tests/CMakeLists.txt                   |   3 +
 cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc | 144 +++++++++++++++++++++++
 4 files changed, 209 insertions(+), 3 deletions(-)

diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index 98b4b95ef4..61367d9015 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -26,6 +26,7 @@
 #include "velox/common/base/Nulls.h"
 #include "velox/type/Type.h"
 #include "velox/vector/ComplexVector.h"
+#include "velox/vector/VectorEncoding.h"
 
 namespace gluten {
 
@@ -51,14 +52,13 @@ arrow::Status VeloxRssSortShuffleWriter::init() {
 }
 
 arrow::Status VeloxRssSortShuffleWriter::doSort(facebook::velox::RowVectorPtr 
rv, int64_t memLimit) {
-  currentInputColumnBytes_ += rv->estimateFlatSize();
+  calculateBatchesSize(rv);
   batches_.push_back(rv);
   if (currentInputColumnBytes_ > memLimit) {
     for (auto pid = 0; pid < numPartitions(); ++pid) {
       RETURN_NOT_OK(evictRowVector(pid));
     }
-    batches_.clear();
-    currentInputColumnBytes_ = 0;
+    resetBatches();
   }
   setSortState(RssSortState::kSortInit);
   return arrow::Status::OK();
@@ -255,4 +255,49 @@ void VeloxRssSortShuffleWriter::setSortState(RssSortState 
state) {
   sortState_ = state;
 }
 
+void VeloxRssSortShuffleWriter::calculateBatchesSize(const 
facebook::velox::RowVectorPtr& vector) {
+  currentInputColumnBytes_ += vector->retainedSize();
+  for (auto& child : vector->children()) {
+    deduplicateStrBuffer(child);
+  }
+}
+
+void VeloxRssSortShuffleWriter::deduplicateStrBuffer(const 
facebook::velox::VectorPtr& vector) {
+  switch (vector->encoding()) {
+    case facebook::velox::VectorEncoding::Simple::FLAT:
+      if ((vector->type()->isVarchar() || vector->type()->isVarbinary())) {
+        for (auto& buffer : 
vector->asFlatVector<facebook::velox::StringView>()->stringBuffers()) {
+          if (!stringBuffers_.insert(buffer.get()).second) {
+            currentInputColumnBytes_ -= buffer->capacity();
+          }
+        }
+      }
+      break;
+    case facebook::velox::VectorEncoding::Simple::MAP:
+      
deduplicateStrBuffer(vector->asUnchecked<facebook::velox::MapVector>()->mapKeys());
+      
deduplicateStrBuffer(vector->asUnchecked<facebook::velox::MapVector>()->mapValues());
+      break;
+    case facebook::velox::VectorEncoding::Simple::ROW:
+      for (auto& child : 
vector->asUnchecked<facebook::velox::RowVector>()->children()) {
+        deduplicateStrBuffer(child);
+      }
+      break;
+    case facebook::velox::VectorEncoding::Simple::ARRAY:
+      
deduplicateStrBuffer(vector->asUnchecked<facebook::velox::ArrayVector>()->elements());
+      break;
+    default:
+      VELOX_FAIL("The encoding of flatten vector should not be " + 
mapSimpleToName(vector->encoding()));
+  }
+}
+
+uint32_t VeloxRssSortShuffleWriter::getInputColumnBytes() const {
+  return currentInputColumnBytes_;
+}
+
+void VeloxRssSortShuffleWriter::resetBatches() {
+  batches_.clear();
+  currentInputColumnBytes_ = 0;
+  stringBuffers_.clear();
+}
+
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h 
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
index ec35609103..dbb5051bbd 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
@@ -66,6 +66,12 @@ class VeloxRssSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   arrow::Status evictRowVector(uint32_t partitionId) override;
 
+  // Only for test.
+  uint32_t getInputColumnBytes() const;
+
+  // Public for test.
+  void resetBatches();
+
  private:
   VeloxRssSortShuffleWriter(
       uint32_t numPartitions,
@@ -89,6 +95,10 @@ class VeloxRssSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   void stat() const;
 
+  void calculateBatchesSize(const facebook::velox::RowVectorPtr& vector);
+
+  void deduplicateStrBuffer(const facebook::velox::VectorPtr& vector);
+
   int32_t splitBufferSize_;
   int64_t sortBufferMaxSize_;
   facebook::velox::common::CompressionKind compressionKind_;
@@ -108,6 +118,10 @@ class VeloxRssSortShuffleWriter final : public 
VeloxShuffleWriter {
   uint32_t currentInputColumnBytes_ = 0;
 
   RssSortState sortState_{kSortInit};
+
+  // The existing string buffers in the current batches.
+  folly::F14FastSet<facebook::velox::Buffer*> stringBuffers_;
+
   bool stopped_{false};
 }; // class VeloxSortBasedShuffleWriter
 
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index a6d04822b6..3f95bf3121 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -39,6 +39,9 @@ add_velox_test(velox_shuffle_writer_test SOURCES 
VeloxShuffleWriterTest.cc)
 add_velox_test(velox_shuffle_writer_spill_test SOURCES
                VeloxShuffleWriterSpillTest.cc)
 
+add_velox_test(velox_rss_sort_shuffle_writer_test SOURCES
+               VeloxRssSortShuffleWriterTest.cc)
+
 # TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc)
 add_velox_test(
   velox_operators_test
diff --git a/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
new file mode 100644
index 0000000000..a787c78039
--- /dev/null
+++ b/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <arrow/util/compression.h>
+
+#include "config/GlutenConfig.h"
+#include "memory/VeloxMemoryManager.h"
+#include "shuffle/VeloxRssSortShuffleWriter.h"
+#include "tests/VeloxShuffleWriterTestBase.h"
+#include "utils/TestAllocationListener.h"
+#include "utils/TestUtils.h"
+
+#include "velox/buffer/Buffer.h"
+#include "velox/type/Type.h"
+#include "velox/vector/tests/VectorTestUtils.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+
+#include <cstdint>
+
+using namespace facebook::velox;
+using namespace facebook::velox::test;
+
+namespace gluten {
+
+class VeloxRssSortShuffleWriterTest : public VeloxShuffleWriterTestBase, 
public testing::Test {
+ protected:
+  static void SetUpTestSuite() {
+    setUpVeloxBackend();
+  }
+
+  static void TearDownTestSuite() {
+    tearDownVeloxBackend();
+  }
+
+  void SetUp() override {
+    setUpTestData();
+  }
+
+  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) {
+    auto options = std::make_shared<RssPartitionWriterOptions>();
+    auto writerOptions = std::make_shared<RssSortShuffleWriterOptions>();
+    auto rssClient = std::make_unique<LocalRssClient>(dataFile_);
+    std::unique_ptr<arrow::util::Codec> codec;
+    if (writerOptions->compressionType == 
arrow::Compression::type::UNCOMPRESSED) {
+      codec = nullptr;
+    } else {
+      GLUTEN_ASSIGN_OR_THROW(codec, 
arrow::util::Codec::Create(writerOptions->compressionType));
+    }
+    auto partitionWriter = std::make_shared<RssPartitionWriter>(
+        numPartitions, std::move(codec), getDefaultMemoryManager(), options, 
std::move(rssClient));
+    GLUTEN_ASSIGN_OR_THROW(
+        auto shuffleWriter,
+        VeloxRssSortShuffleWriter::create(
+            numPartitions, std::move(partitionWriter), 
std::move(writerOptions), getDefaultMemoryManager()));
+    return shuffleWriter;
+  }
+};
+
+TEST_F(VeloxRssSortShuffleWriterTest, calculateBatchesSize) {
+  auto shuffleWriter = 
std::dynamic_pointer_cast<VeloxRssSortShuffleWriter>(createShuffleWriter(10));
+  // Do not trigger resetBatches by shuffle writer.
+  const int64_t memLimit = INT64_MAX;
+
+  // Shared string buffer in FlatVector<StringView>.
+  BufferPtr strBuffer = AlignedBuffer::allocate<char>(200, pool());
+  auto vector1 = BaseVector::create<FlatVector<StringView>>(VARCHAR(), 100, 
pool());
+  vector1->setStringBuffers({strBuffer});
+  auto vector2 = BaseVector::create<FlatVector<StringView>>(VARCHAR(), 100, 
pool());
+  vector2->setStringBuffers({strBuffer});
+  auto vector3 = BaseVector::create<FlatVector<StringView>>(VARCHAR(), 100, 
pool());
+  vector3->setStringBuffers({strBuffer});
+  auto vector4 = BaseVector::create<FlatVector<int64_t>>(INTEGER(), 100, 
pool());
+
+  auto rowVector1 = makeRowVector({vector1, vector2});
+  auto rowVector2 = makeRowVector({vector3, vector4});
+  std::shared_ptr<ColumnarBatch> cb1 = 
std::make_shared<VeloxColumnarBatch>(rowVector1);
+  std::shared_ptr<ColumnarBatch> cb2 = 
std::make_shared<VeloxColumnarBatch>(rowVector2);
+
+  ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
+  ASSERT_NOT_OK(shuffleWriter->write(cb2, memLimit));
+  auto expectedSize = rowVector1->retainedSize() + rowVector2->retainedSize() 
- strBuffer->capacity() * 2;
+  EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
+  shuffleWriter->resetBatches();
+
+  // Shared string buffer in ArrayVector.
+  BufferPtr offsets = allocateOffsets(1, vector1->pool());
+  BufferPtr sizes = allocateOffsets(1, vector1->pool());
+  sizes->asMutable<vector_size_t>()[0] = vector1->size();
+
+  auto arrayVector =
+      std::make_shared<facebook::velox::ArrayVector>(pool(), ARRAY(VARCHAR()), 
nullptr, 1, offsets, sizes, vector1);
+  auto rowVector3 = makeRowVector({arrayVector, vector1});
+  cb1 = std::make_shared<VeloxColumnarBatch>(rowVector3);
+  ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
+  expectedSize = rowVector3->retainedSize() - strBuffer->capacity();
+  EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
+  shuffleWriter->resetBatches();
+
+  // Shared string buffer in MapVector.
+  auto keys = vector1;
+  auto values = vector2;
+  auto mapVector = makeMapVector({0, 10, 20, 50}, keys, values);
+  auto rowVector4 = makeRowVector({mapVector, vector3});
+  cb1 = std::make_shared<VeloxColumnarBatch>(rowVector4);
+  ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
+  expectedSize = rowVector4->retainedSize() - strBuffer->capacity() * 2;
+  EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
+  shuffleWriter->resetBatches();
+
+  // Shared string buffer in RowVector.
+  auto rowVector5 = makeRowVector({rowVector1, vector3});
+  cb1 = std::make_shared<VeloxColumnarBatch>(rowVector5);
+  ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
+  expectedSize = rowVector5->retainedSize() - strBuffer->capacity() * 2;
+  EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
+  shuffleWriter->resetBatches();
+
+  // Vector is not flatten.
+  auto dictionaryVector = BaseVector::wrapInDictionary(
+      BufferPtr(nullptr),
+      makeIndices(vector1->size(), [](vector_size_t row) { return row; }),
+      vector1->size(),
+      vector1);
+  auto rowVector6 = makeRowVector({dictionaryVector});
+  cb1 = std::make_shared<VeloxColumnarBatch>(rowVector6);
+  ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
+  EXPECT_EQ(rowVector6->retainedSize(), shuffleWriter->getInputColumnBytes());
+}
+
+} // namespace gluten
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to