This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d77aa1537 [VL] RSS client should push complete rows (#11123)
7d77aa1537 is described below

commit 7d77aa15374297d44438a2ca8bda076e579e492c
Author: Wechar Yu <[email protected]>
AuthorDate: Sat Nov 22 00:07:35 2025 +0800

    [VL] RSS client should push complete rows (#11123)
---
 cpp/core/shuffle/rss/RssPartitionWriter.cc       |  40 +++++----
 cpp/core/shuffle/rss/RssPartitionWriter.h        |   4 +
 cpp/velox/shuffle/VeloxSortShuffleWriter.cc      |  15 ++--
 cpp/velox/shuffle/VeloxSortShuffleWriter.h       |   2 +-
 cpp/velox/tests/CMakeLists.txt                   |   3 +
 cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc |   1 -
 cpp/velox/tests/VeloxShuffleWriterSpillTest.cc   |   1 -
 cpp/velox/tests/VeloxShuffleWriterTest.cc        |   1 -
 cpp/velox/tests/VeloxShuffleWriterTestBase.h     |   1 +
 cpp/velox/tests/VeloxSortShuffleWriterTest.cc    | 100 +++++++++++++++++++++++
 10 files changed, 144 insertions(+), 24 deletions(-)

diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc 
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 243769a040..05068b8035 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -61,24 +61,34 @@ arrow::Status
 RssPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
   ScopedTimer timer(&spillTime_);
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
-  ARROW_ASSIGN_OR_RAISE(
-      auto rssOs, 
arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize, 
arrow::default_memory_pool()));
-  if (codec_ != nullptr) {
+  if (shouldInitializeOs_) {
     ARROW_ASSIGN_OR_RAISE(
-        auto compressedOs,
-        ShuffleCompressedOutputStream::Make(
-            codec_.get(), options_->compressionBufferSize, rssOs, 
arrow::default_memory_pool()));
-    RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs.get()));
-    RETURN_NOT_OK(compressedOs->Flush());
-    RETURN_NOT_OK(compressedOs->Close());
-    compressTime_ += compressedOs->compressTime();
+        rssOs_, 
arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize, 
arrow::default_memory_pool()));
+    if (codec_ != nullptr) {
+      ARROW_ASSIGN_OR_RAISE(
+          compressedOs_,
+          ShuffleCompressedOutputStream::Make(
+              codec_.get(), options_->compressionBufferSize, rssOs_, 
arrow::default_memory_pool()));
+    }
+    shouldInitializeOs_ = false;
+  }
+  if (compressedOs_ != nullptr) {
+    RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs_.get()));
   } else {
-    RETURN_NOT_OK(inMemoryPayload->serialize(rssOs.get()));
+    RETURN_NOT_OK(inMemoryPayload->serialize(rssOs_.get()));
   }
-  ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs->Finish());
-  bytesEvicted_[partitionId] +=
-      rssClient_->pushPartitionData(partitionId, buffer->data_as<char>(), 
buffer->size());
-  
+  if (inMemoryPayload->numRows() > 0) {
+    // Push data to rss only when there are complete rows.
+    if (compressedOs_ != nullptr) {
+      RETURN_NOT_OK(compressedOs_->Flush());
+      RETURN_NOT_OK(compressedOs_->Close());
+      compressTime_ += compressedOs_->compressTime();
+    }
+    ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs_->Finish());
+    bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, 
buffer->data_as<char>(), buffer->size());
+    shouldInitializeOs_ = true;
+  }
+
   return arrow::Status::OK();
 }
 
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h 
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index 6e6c5d7722..a60b22bbb0 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -63,6 +63,10 @@ class RssPartitionWriter final : public PartitionWriter {
 
   std::vector<int64_t> bytesEvicted_;
   std::vector<int64_t> rawPartitionLengths_;
+
+  bool shouldInitializeOs_{true};
+  std::shared_ptr<arrow::io::BufferOutputStream> rssOs_{nullptr};
+  std::shared_ptr<ShuffleCompressedOutputStream> compressedOs_{nullptr};
 };
 
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 57166e7ab7..30eb357912 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -292,7 +292,7 @@ arrow::Status 
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
     recordSize = *(reinterpret_cast<RowSizeType*>(addr)) + sizeof(RowSizeType);
     if (offset + recordSize > diskWriteBufferSize_ && offset > 0) {
       sortTime.stop();
-      RETURN_NOT_OK(evictPartitionInternal(partitionId, sortedBufferPtr_, 
offset));
+      RETURN_NOT_OK(evictPartitionInternal(partitionId, index - begin, 
sortedBufferPtr_, offset));
       sortTime.start();
       begin = index;
       offset = 0;
@@ -305,7 +305,8 @@ arrow::Status 
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
       while (bytes < recordSize) {
         auto rawLength = 
std::min<RowSizeType>(static_cast<uint32_t>(diskWriteBufferSize_), recordSize - 
bytes);
         // Use numRows = 0 to represent a part of row.
-        RETURN_NOT_OK(evictPartitionInternal(partitionId, buffer + bytes, 
rawLength));
+        auto numRows = (bytes + rawLength == recordSize) ? 1 : 0;
+        RETURN_NOT_OK(evictPartitionInternal(partitionId, numRows, buffer + 
bytes, rawLength));
         bytes += rawLength;
       }
       begin++;
@@ -320,16 +321,20 @@ arrow::Status 
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
   sortTime.stop();
   if (offset > 0) {
     VELOX_CHECK(index > begin);
-    RETURN_NOT_OK(evictPartitionInternal(partitionId, sortedBufferPtr_, 
offset));
+    RETURN_NOT_OK(evictPartitionInternal(partitionId, index - begin, 
sortedBufferPtr_, offset));
   }
   sortTime_ += sortTime.realTimeUsed();
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxSortShuffleWriter::evictPartitionInternal(uint32_t 
partitionId, uint8_t* buffer, int64_t rawLength) {
+arrow::Status VeloxSortShuffleWriter::evictPartitionInternal(
+    uint32_t partitionId,
+    uint32_t numRows,
+    uint8_t* buffer,
+    int64_t rawLength) {
   VELOX_CHECK(rawLength > 0);
   auto payload = std::make_unique<InMemoryPayload>(
-      0,
+      numRows,
       nullptr,
       nullptr,
       
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
 rawLength)});
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 9ab842718b..a30f0edb83 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -75,7 +75,7 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);
 
-  arrow::Status evictPartitionInternal(uint32_t partitionId, uint8_t* buffer, 
int64_t rawLength);
+  arrow::Status evictPartitionInternal(uint32_t partitionId, uint32_t numRows, 
uint8_t* buffer, int64_t rawLength);
 
   facebook::velox::vector_size_t maxRowsToInsert(
       facebook::velox::vector_size_t offset,
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index 4fdc95910a..08d6c7422c 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -42,6 +42,9 @@ add_velox_test(velox_shuffle_writer_spill_test SOURCES
 add_velox_test(velox_rss_sort_shuffle_writer_test SOURCES
                VeloxRssSortShuffleWriterTest.cc)
 
+add_velox_test(velox_sort_shuffle_writer_test SOURCES
+               VeloxSortShuffleWriterTest.cc)
+
 # TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc)
 add_velox_test(
   velox_operators_test
diff --git a/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
index 9741c2b672..32fe57b241 100644
--- a/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
@@ -17,7 +17,6 @@
 
 #include <arrow/util/compression.h>
 
-#include "config/GlutenConfig.h"
 #include "memory/VeloxMemoryManager.h"
 #include "shuffle/VeloxRssSortShuffleWriter.h"
 #include "tests/VeloxShuffleWriterTestBase.h"
diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
index 489c0eafbe..0da2a2f187 100644
--- a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
@@ -17,7 +17,6 @@
 
 #include <duckdb/common/enums/compression_type.hpp>
 
-#include "config/GlutenConfig.h"
 #include "shuffle/VeloxHashShuffleWriter.h"
 #include "tests/VeloxShuffleWriterTestBase.h"
 #include "tests/utils/TestUtils.h"
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 0d62faafee..e79c8de1de 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -18,7 +18,6 @@
 #include <arrow/c/bridge.h>
 #include <arrow/io/api.h>
 
-#include "config/GlutenConfig.h"
 #include "shuffle/VeloxHashShuffleWriter.h"
 #include "shuffle/VeloxRssSortShuffleWriter.h"
 #include "shuffle/VeloxSortShuffleWriter.h"
diff --git a/cpp/velox/tests/VeloxShuffleWriterTestBase.h 
b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
index 03d9dda046..448a98f130 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
@@ -23,6 +23,7 @@
 
 #include <compute/VeloxBackend.h>
 #include "../utils/VeloxArrowUtils.h"
+#include "config/GlutenConfig.h"
 #include "memory/VeloxColumnarBatch.h"
 #include "shuffle/LocalPartitionWriter.h"
 #include "shuffle/PartitionWriter.h"
diff --git a/cpp/velox/tests/VeloxSortShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxSortShuffleWriterTest.cc
new file mode 100644
index 0000000000..130f1bb010
--- /dev/null
+++ b/cpp/velox/tests/VeloxSortShuffleWriterTest.cc
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "tests/VeloxShuffleWriterTestBase.h"
+
+using namespace facebook::velox;
+using namespace facebook::velox::test;
+
+namespace gluten {
+
+namespace {
+class FakeBufferRssClient : public RssClient {
+ public:
+  FakeBufferRssClient() = default;
+
+  int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t 
size) override {
+    receiveTimes_++;
+    return size;
+  }
+
+  void stop() override {}
+
+  const uint32_t getReceiveTimes() const {
+    return receiveTimes_;
+  }
+
+ private:
+  uint32_t receiveTimes_{0};
+};
+} // namespace
+
+class VeloxSortShuffleWriterTest : public VeloxShuffleWriterTestBase, public 
testing::Test {
+ protected:
+  static void SetUpTestSuite() {
+    setUpVeloxBackend();
+  }
+
+  static void TearDownTestSuite() {
+    tearDownVeloxBackend();
+  }
+
+  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
+      uint32_t numPartitions,
+      std::shared_ptr<SortShuffleWriterOptions> writeOptions,
+      std::shared_ptr<RssClient> rssClient) {
+    auto options = std::make_shared<RssPartitionWriterOptions>();
+    auto partitionWriter = std::make_shared<RssPartitionWriter>(
+        numPartitions, nullptr, getDefaultMemoryManager(), options, 
std::move(rssClient));
+    GLUTEN_ASSIGN_OR_THROW(
+        auto shuffleWriter,
+        VeloxSortShuffleWriter::create(
+            numPartitions, std::move(partitionWriter), 
std::move(writeOptions), getDefaultMemoryManager()));
+    return shuffleWriter;
+  }
+};
+
+TEST_F(VeloxSortShuffleWriterTest, pushCompleteRows) {
+  auto rssClient = std::make_shared<FakeBufferRssClient>();
+  auto writeOptions = std::make_shared<SortShuffleWriterOptions>();
+  // Make buffer size smallest to ensure each push only contains one row.
+  writeOptions->diskWriteBufferSize = 1;
+
+  auto rowVector = makeRowVector({
+      makeFlatVector<StringView>(
+          {"alice0",
+           "bob1",
+           "alice2",
+           "bob3",
+           "Alice4",
+           "Bob5123456789098766notinline",
+           "AlicE6",
+           "boB7",
+           "ALICE8",
+           "BOB9"}),
+  });
+  std::shared_ptr<ColumnarBatch> cb = 
std::make_shared<VeloxColumnarBatch>(rowVector);
+
+  auto shuffleWriter = createShuffleWriter(1, writeOptions, rssClient);
+  auto status = shuffleWriter->write(cb, ShuffleWriter::kMinMemLimit);
+  ASSERT_TRUE(shuffleWriter->stop().ok());
+
+  // numRows should equal to push data times in rss client.
+  EXPECT_EQ(10, rssClient->getReceiveTimes());
+}
+
+} // namespace gluten


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to