This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 70ee0f411 [VL] Add BufferedOutputStream to track the memory usage in
PrestoSerializer (#5785)
70ee0f411 is described below
commit 70ee0f411cde6d6bbf3772ea3d623ef698b9f174
Author: Rong Ma <[email protected]>
AuthorDate: Fri May 17 16:28:35 2024 +0800
[VL] Add BufferedOutputStream to track the memory usage in PrestoSerializer
(#5785)
---
cpp/velox/CMakeLists.txt | 1 +
cpp/velox/memory/BufferOutputStream.cc | 45 +++++++++++++++
cpp/velox/memory/BufferOutputStream.h | 42 ++++++++++++++
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc | 25 +++------
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h | 8 +--
cpp/velox/tests/BufferOutputStreamTest.cc | 70 ++++++++++++++++++++++++
cpp/velox/tests/CMakeLists.txt | 1 +
7 files changed, 171 insertions(+), 21 deletions(-)
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index c058883b6..9bedfe45b 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -300,6 +300,7 @@ set(VELOX_SRCS
jni/VeloxJniWrapper.cc
jni/JniFileSystem.cc
jni/JniUdf.cc
+ memory/BufferOutputStream.cc
memory/VeloxColumnarBatch.cc
memory/VeloxMemoryManager.cc
operators/functions/RegistrationAllFunctions.cc
diff --git a/cpp/velox/memory/BufferOutputStream.cc
b/cpp/velox/memory/BufferOutputStream.cc
new file mode 100644
index 000000000..31d7b0936
--- /dev/null
+++ b/cpp/velox/memory/BufferOutputStream.cc
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "memory/BufferOutputStream.h"
+
+namespace gluten {
+BufferOutputStream::BufferOutputStream(
+ facebook::velox::memory::MemoryPool* pool,
+ int32_t initialSize,
+ facebook::velox::OutputStreamListener* listener)
+ : facebook::velox::OutputStream(listener) {
+ buffer_ = facebook::velox::AlignedBuffer::allocate<char>(initialSize, pool);
+ buffer_->setSize(0);
+}
+
+void BufferOutputStream::write(const char* s, std::streamsize count) {
+ facebook::velox::AlignedBuffer::appendTo(&buffer_, s, count);
+}
+
+std::streampos BufferOutputStream::tellp() const {
+ return buffer_->size();
+}
+
+void BufferOutputStream::seekp(std::streampos pos) {
+ buffer_->setSize(pos);
+}
+
+facebook::velox::BufferPtr BufferOutputStream::getBuffer() const {
+ return buffer_;
+}
+} // namespace gluten
diff --git a/cpp/velox/memory/BufferOutputStream.h
b/cpp/velox/memory/BufferOutputStream.h
new file mode 100644
index 000000000..49774e09d
--- /dev/null
+++ b/cpp/velox/memory/BufferOutputStream.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/buffer/Buffer.h"
+#include "velox/common/memory/ByteStream.h"
+
+namespace gluten {
+class BufferOutputStream : public facebook::velox::OutputStream {
+ public:
+ BufferOutputStream(
+ facebook::velox::memory::MemoryPool* pool,
+ int32_t initialSize =
facebook::velox::memory::AllocationTraits::kPageSize,
+ facebook::velox::OutputStreamListener* listener = nullptr);
+
+ void write(const char* s, std::streamsize count);
+
+ std::streampos tellp() const;
+
+ void seekp(std::streampos pos);
+
+ facebook::velox::BufferPtr getBuffer() const;
+
+ private:
+ facebook::velox::BufferPtr buffer_;
+};
+} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
index b0c2cc8ad..bd56bc62e 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
@@ -84,6 +84,7 @@ arrow::Status VeloxSortBasedShuffleWriter::init() {
for (auto pid = 0; pid < numPartitions_; ++pid) {
rowVectorIndexMap_[pid].reserve(options_.bufferSize);
}
+ bufferOutputStream_ = std::make_unique<BufferOutputStream>(veloxPool_.get());
return arrow::Status::OK();
}
@@ -153,18 +154,12 @@ arrow::Status
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
return arrow::Status::OK();
}
-arrow::Status VeloxSortBasedShuffleWriter::evictBatch(
- uint32_t partitionId,
- std::ostringstream* output,
- facebook::velox::OStreamOutputStream* out,
- facebook::velox::RowTypePtr* rowTypePtr) {
+arrow::Status VeloxSortBasedShuffleWriter::evictBatch(uint32_t partitionId,
facebook::velox::RowTypePtr* rowTypePtr) {
int64_t rawSize = batch_->size();
- batch_->flush(out);
- const std::string& outputStr = output->str();
- RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize,
outputStr.c_str(), outputStr.size()));
- batch_.reset();
- output->clear();
- output->str("");
+ bufferOutputStream_->seekp(0);
+ batch_->flush(bufferOutputStream_.get());
+ auto buffer = bufferOutputStream_->getBuffer();
+ RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize,
buffer->as<char>(), buffer->size()));
batch_ =
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(),
serde_.get());
batch_->createStreamTree(*rowTypePtr, options_.bufferSize, &serdeOptions_);
return arrow::Status::OK();
@@ -174,8 +169,6 @@ arrow::Status
VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId)
int32_t rowNum = 0;
const int32_t maxBatchNum = options_.bufferSize;
auto rowTypePtr = std::static_pointer_cast<const
facebook::velox::RowType>(rowType_.value());
- std::ostringstream output;
- facebook::velox::OStreamOutputStream out(&output);
if (options_.partitioning != Partitioning::kSingle) {
if (auto it = rowVectorIndexMap_.find(partitionId); it !=
rowVectorIndexMap_.end()) {
@@ -219,7 +212,7 @@ arrow::Status
VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId)
rowNum += groupedSize[pair.first];
if (rowNum >= maxBatchNum) {
rowNum = 0;
- RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+ RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
}
}
@@ -231,13 +224,13 @@ arrow::Status
VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId)
rowNum += rowVectorPtr->size();
batch_->append(rowVectorPtr);
if (rowNum >= maxBatchNum) {
- RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+ RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
rowNum = 0;
}
}
}
if (rowNum > 0) {
- RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+ RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
}
return arrow::Status::OK();
}
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
index e3ac07dfc..710590184 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
@@ -37,6 +37,7 @@
#include <arrow/type.h>
#include "VeloxShuffleWriter.h"
+#include "memory/BufferOutputStream.h"
#include "memory/VeloxMemoryManager.h"
#include "shuffle/PartitionWriter.h"
#include "shuffle/Partitioner.h"
@@ -65,11 +66,7 @@ class VeloxSortBasedShuffleWriter : public
VeloxShuffleWriter {
arrow::Status evictRowVector(uint32_t partitionId) override;
- arrow::Status evictBatch(
- uint32_t partitionId,
- std::ostringstream* output,
- facebook::velox::OStreamOutputStream* out,
- facebook::velox::RowTypePtr* rowTypePtr);
+ arrow::Status evictBatch(uint32_t partitionId, facebook::velox::RowTypePtr*
rowTypePtr);
private:
VeloxSortBasedShuffleWriter(
@@ -93,6 +90,7 @@ class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter
{
std::optional<facebook::velox::TypePtr> rowType_;
std::unique_ptr<facebook::velox::VectorStreamGroup> batch_;
+ std::unique_ptr<BufferOutputStream> bufferOutputStream_;
// Partition ID -> Row Count
// subscript: Partition ID
diff --git a/cpp/velox/tests/BufferOutputStreamTest.cc
b/cpp/velox/tests/BufferOutputStreamTest.cc
new file mode 100644
index 000000000..3b3f78cea
--- /dev/null
+++ b/cpp/velox/tests/BufferOutputStreamTest.cc
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "memory/BufferOutputStream.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "velox/common/memory/ByteStream.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+
+using namespace facebook::velox;
+
+namespace gluten {
+class BufferOutputStreamTest : public ::testing::Test, public
test::VectorTestBase {
+ protected:
+ // Velox requires the mem manager to be instanced.
+ static void SetUpTestCase() {
+ memory::MemoryManager::testingSetInstance({});
+ }
+
+ std::shared_ptr<memory::MemoryPool> veloxPool_ =
defaultLeafVeloxMemoryPool();
+};
+
+TEST_F(BufferOutputStreamTest, outputStream) {
+ auto out = std::make_unique<BufferOutputStream>(veloxPool_.get(), 10000);
+ std::stringstream referenceSStream;
+ auto reference =
std::make_unique<facebook::velox::OStreamOutputStream>(&referenceSStream);
+ for (auto i = 0; i < 100; ++i) {
+ std::string data;
+ data.resize(10000);
+ std::fill(data.begin(), data.end(), i);
+ out->write(data.data(), data.size());
+ reference->write(data.data(), data.size());
+ }
+ EXPECT_EQ(reference->tellp(), out->tellp());
+ for (auto i = 0; i < 100; ++i) {
+ std::string data;
+ data.resize(6000);
+ std::fill(data.begin(), data.end(), i + 10);
+ out->seekp(i * 10000 + 5000);
+ reference->seekp(i * 10000 + 5000);
+ out->write(data.data(), data.size());
+ reference->write(data.data(), data.size());
+ }
+ auto str = referenceSStream.str();
+ auto numBytes = veloxPool_->currentBytes();
+ EXPECT_LT(0, numBytes);
+ {
+ auto buffer = out->getBuffer();
+ EXPECT_EQ(numBytes, veloxPool_->currentBytes());
+ EXPECT_EQ(str, std::string(buffer->as<char>(), buffer->size()));
+ }
+
+ out.reset();
+ // We expect dropping the stream frees the backing memory.
+ EXPECT_EQ(0, veloxPool_->currentBytes());
+}
+} // namespace gluten
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index 58482fe15..a5bd5b4f7 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -61,3 +61,4 @@ add_velox_test(
FilePathGenerator.cc)
add_velox_test(spark_functions_test SOURCES SparkFunctionTest.cc)
add_velox_test(execution_ctx_test SOURCES RuntimeTest.cc)
+add_velox_test(buffer_outputstream_test SOURCES BufferOutputStreamTest.cc)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]