This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7d77aa1537 [VL] RSS client should push complete rows (#11123)
7d77aa1537 is described below
commit 7d77aa15374297d44438a2ca8bda076e579e492c
Author: Wechar Yu <[email protected]>
AuthorDate: Sat Nov 22 00:07:35 2025 +0800
[VL] RSS client should push complete rows (#11123)
---
cpp/core/shuffle/rss/RssPartitionWriter.cc | 40 +++++----
cpp/core/shuffle/rss/RssPartitionWriter.h | 4 +
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 15 ++--
cpp/velox/shuffle/VeloxSortShuffleWriter.h | 2 +-
cpp/velox/tests/CMakeLists.txt | 3 +
cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc | 1 -
cpp/velox/tests/VeloxShuffleWriterSpillTest.cc | 1 -
cpp/velox/tests/VeloxShuffleWriterTest.cc | 1 -
cpp/velox/tests/VeloxShuffleWriterTestBase.h | 1 +
cpp/velox/tests/VeloxSortShuffleWriterTest.cc | 100 +++++++++++++++++++++++
10 files changed, 144 insertions(+), 24 deletions(-)
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 243769a040..05068b8035 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -61,24 +61,34 @@ arrow::Status
RssPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
ScopedTimer timer(&spillTime_);
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
- ARROW_ASSIGN_OR_RAISE(
- auto rssOs,
arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize,
arrow::default_memory_pool()));
- if (codec_ != nullptr) {
+ if (shouldInitializeOs_) {
ARROW_ASSIGN_OR_RAISE(
- auto compressedOs,
- ShuffleCompressedOutputStream::Make(
- codec_.get(), options_->compressionBufferSize, rssOs,
arrow::default_memory_pool()));
- RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs.get()));
- RETURN_NOT_OK(compressedOs->Flush());
- RETURN_NOT_OK(compressedOs->Close());
- compressTime_ += compressedOs->compressTime();
+ rssOs_,
arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize,
arrow::default_memory_pool()));
+ if (codec_ != nullptr) {
+ ARROW_ASSIGN_OR_RAISE(
+ compressedOs_,
+ ShuffleCompressedOutputStream::Make(
+ codec_.get(), options_->compressionBufferSize, rssOs_,
arrow::default_memory_pool()));
+ }
+ shouldInitializeOs_ = false;
+ }
+ if (compressedOs_ != nullptr) {
+ RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs_.get()));
} else {
- RETURN_NOT_OK(inMemoryPayload->serialize(rssOs.get()));
+ RETURN_NOT_OK(inMemoryPayload->serialize(rssOs_.get()));
}
- ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs->Finish());
- bytesEvicted_[partitionId] +=
- rssClient_->pushPartitionData(partitionId, buffer->data_as<char>(),
buffer->size());
-
+ if (inMemoryPayload->numRows() > 0) {
+ // Push data to rss only when there are complete rows.
+ if (compressedOs_ != nullptr) {
+ RETURN_NOT_OK(compressedOs_->Flush());
+ RETURN_NOT_OK(compressedOs_->Close());
+ compressTime_ += compressedOs_->compressTime();
+ }
+ ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs_->Finish());
+ bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId,
buffer->data_as<char>(), buffer->size());
+ shouldInitializeOs_ = true;
+ }
+
return arrow::Status::OK();
}
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index 6e6c5d7722..a60b22bbb0 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -63,6 +63,10 @@ class RssPartitionWriter final : public PartitionWriter {
std::vector<int64_t> bytesEvicted_;
std::vector<int64_t> rawPartitionLengths_;
+
+ bool shouldInitializeOs_{true};
+ std::shared_ptr<arrow::io::BufferOutputStream> rssOs_{nullptr};
+ std::shared_ptr<ShuffleCompressedOutputStream> compressedOs_{nullptr};
};
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 57166e7ab7..30eb357912 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -292,7 +292,7 @@ arrow::Status
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
recordSize = *(reinterpret_cast<RowSizeType*>(addr)) + sizeof(RowSizeType);
if (offset + recordSize > diskWriteBufferSize_ && offset > 0) {
sortTime.stop();
- RETURN_NOT_OK(evictPartitionInternal(partitionId, sortedBufferPtr_,
offset));
+ RETURN_NOT_OK(evictPartitionInternal(partitionId, index - begin,
sortedBufferPtr_, offset));
sortTime.start();
begin = index;
offset = 0;
@@ -305,7 +305,8 @@ arrow::Status
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
while (bytes < recordSize) {
auto rawLength =
std::min<RowSizeType>(static_cast<uint32_t>(diskWriteBufferSize_), recordSize -
bytes);
// Use numRows = 0 to represent a part of row.
- RETURN_NOT_OK(evictPartitionInternal(partitionId, buffer + bytes,
rawLength));
+ auto numRows = (bytes + rawLength == recordSize) ? 1 : 0;
+ RETURN_NOT_OK(evictPartitionInternal(partitionId, numRows, buffer +
bytes, rawLength));
bytes += rawLength;
}
begin++;
@@ -320,16 +321,20 @@ arrow::Status
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
sortTime.stop();
if (offset > 0) {
VELOX_CHECK(index > begin);
- RETURN_NOT_OK(evictPartitionInternal(partitionId, sortedBufferPtr_,
offset));
+ RETURN_NOT_OK(evictPartitionInternal(partitionId, index - begin,
sortedBufferPtr_, offset));
}
sortTime_ += sortTime.realTimeUsed();
return arrow::Status::OK();
}
-arrow::Status VeloxSortShuffleWriter::evictPartitionInternal(uint32_t
partitionId, uint8_t* buffer, int64_t rawLength) {
+arrow::Status VeloxSortShuffleWriter::evictPartitionInternal(
+ uint32_t partitionId,
+ uint32_t numRows,
+ uint8_t* buffer,
+ int64_t rawLength) {
VELOX_CHECK(rawLength > 0);
auto payload = std::make_unique<InMemoryPayload>(
- 0,
+ numRows,
nullptr,
nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
rawLength)});
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 9ab842718b..a30f0edb83 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -75,7 +75,7 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);
- arrow::Status evictPartitionInternal(uint32_t partitionId, uint8_t* buffer,
int64_t rawLength);
+ arrow::Status evictPartitionInternal(uint32_t partitionId, uint32_t numRows,
uint8_t* buffer, int64_t rawLength);
facebook::velox::vector_size_t maxRowsToInsert(
facebook::velox::vector_size_t offset,
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index 4fdc95910a..08d6c7422c 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -42,6 +42,9 @@ add_velox_test(velox_shuffle_writer_spill_test SOURCES
add_velox_test(velox_rss_sort_shuffle_writer_test SOURCES
VeloxRssSortShuffleWriterTest.cc)
+add_velox_test(velox_sort_shuffle_writer_test SOURCES
+ VeloxSortShuffleWriterTest.cc)
+
# TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc)
add_velox_test(
velox_operators_test
diff --git a/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
b/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
index 9741c2b672..32fe57b241 100644
--- a/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxRssSortShuffleWriterTest.cc
@@ -17,7 +17,6 @@
#include <arrow/util/compression.h>
-#include "config/GlutenConfig.h"
#include "memory/VeloxMemoryManager.h"
#include "shuffle/VeloxRssSortShuffleWriter.h"
#include "tests/VeloxShuffleWriterTestBase.h"
diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
index 489c0eafbe..0da2a2f187 100644
--- a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
@@ -17,7 +17,6 @@
#include <duckdb/common/enums/compression_type.hpp>
-#include "config/GlutenConfig.h"
#include "shuffle/VeloxHashShuffleWriter.h"
#include "tests/VeloxShuffleWriterTestBase.h"
#include "tests/utils/TestUtils.h"
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 0d62faafee..e79c8de1de 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -18,7 +18,6 @@
#include <arrow/c/bridge.h>
#include <arrow/io/api.h>
-#include "config/GlutenConfig.h"
#include "shuffle/VeloxHashShuffleWriter.h"
#include "shuffle/VeloxRssSortShuffleWriter.h"
#include "shuffle/VeloxSortShuffleWriter.h"
diff --git a/cpp/velox/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
index 03d9dda046..448a98f130 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
@@ -23,6 +23,7 @@
#include <compute/VeloxBackend.h>
#include "../utils/VeloxArrowUtils.h"
+#include "config/GlutenConfig.h"
#include "memory/VeloxColumnarBatch.h"
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/PartitionWriter.h"
diff --git a/cpp/velox/tests/VeloxSortShuffleWriterTest.cc
b/cpp/velox/tests/VeloxSortShuffleWriterTest.cc
new file mode 100644
index 0000000000..130f1bb010
--- /dev/null
+++ b/cpp/velox/tests/VeloxSortShuffleWriterTest.cc
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "tests/VeloxShuffleWriterTestBase.h"
+
+using namespace facebook::velox;
+using namespace facebook::velox::test;
+
+namespace gluten {
+
+namespace {
+class FakeBufferRssClient : public RssClient {
+ public:
+ FakeBufferRssClient() = default;
+
+ int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t
size) override {
+ receiveTimes_++;
+ return size;
+ }
+
+ void stop() override {}
+
+ const uint32_t getReceiveTimes() const {
+ return receiveTimes_;
+ }
+
+ private:
+ uint32_t receiveTimes_{0};
+};
+} // namespace
+
+class VeloxSortShuffleWriterTest : public VeloxShuffleWriterTestBase, public
testing::Test {
+ protected:
+ static void SetUpTestSuite() {
+ setUpVeloxBackend();
+ }
+
+ static void TearDownTestSuite() {
+ tearDownVeloxBackend();
+ }
+
+ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
+ uint32_t numPartitions,
+ std::shared_ptr<SortShuffleWriterOptions> writeOptions,
+ std::shared_ptr<RssClient> rssClient) {
+ auto options = std::make_shared<RssPartitionWriterOptions>();
+ auto partitionWriter = std::make_shared<RssPartitionWriter>(
+ numPartitions, nullptr, getDefaultMemoryManager(), options,
std::move(rssClient));
+ GLUTEN_ASSIGN_OR_THROW(
+ auto shuffleWriter,
+ VeloxSortShuffleWriter::create(
+ numPartitions, std::move(partitionWriter),
std::move(writeOptions), getDefaultMemoryManager()));
+ return shuffleWriter;
+ }
+};
+
+TEST_F(VeloxSortShuffleWriterTest, pushCompleteRows) {
+ auto rssClient = std::make_shared<FakeBufferRssClient>();
+ auto writeOptions = std::make_shared<SortShuffleWriterOptions>();
+ // Make buffer size smallest to ensure each push only contains one row.
+ writeOptions->diskWriteBufferSize = 1;
+
+ auto rowVector = makeRowVector({
+ makeFlatVector<StringView>(
+ {"alice0",
+ "bob1",
+ "alice2",
+ "bob3",
+ "Alice4",
+ "Bob5123456789098766notinline",
+ "AlicE6",
+ "boB7",
+ "ALICE8",
+ "BOB9"}),
+ });
+ std::shared_ptr<ColumnarBatch> cb =
std::make_shared<VeloxColumnarBatch>(rowVector);
+
+ auto shuffleWriter = createShuffleWriter(1, writeOptions, rssClient);
+ auto status = shuffleWriter->write(cb, ShuffleWriter::kMinMemLimit);
+ ASSERT_TRUE(shuffleWriter->stop().ok());
+
+ // numRows should equal to push data times in rss client.
+ EXPECT_EQ(10, rssClient->getReceiveTimes());
+}
+
+} // namespace gluten
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]