This is an automated email from the ASF dual-hosted git repository.
gfphoenix78 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push:
new 12b43e38d19 PAX: optimize io read for multiple discrete columns in a
group
12b43e38d19 is described below
commit 12b43e38d1972aefa7fab2a88560c970ffe7e0c3
Author: Hao Wu <[email protected]>
AuthorDate: Thu Nov 27 15:24:49 2025 +0000
PAX: optimize io read for multiple discrete columns in a group
When reading multiple discrete columns in a group, the code
reads columnar data block by block in synchronous mode. It
means that all I/O requests on the columnar data are completed
in a serialized manner, which is low efficient.
This commit uses iouring to submit a batch of IO request to allow
OS optimizes IO in parallel for better throughput.
libaio is another candidate. But it doesn't bring improvement
in our benchmark test(without O_DIRECT).
---
contrib/pax_storage/doc/README.md | 1 +
contrib/pax_storage/src/cpp/cmake/pax.cmake | 9 +-
contrib/pax_storage/src/cpp/cmake/pax_format.cmake | 7 +-
contrib/pax_storage/src/cpp/comm/common_io.h | 39 ++++++
contrib/pax_storage/src/cpp/comm/fast_io.cc | 134 +++++++++++++++++++++
contrib/pax_storage/src/cpp/comm/fast_io.h | 88 ++++++++++++++
contrib/pax_storage/src/cpp/storage/file_system.cc | 10 ++
contrib/pax_storage/src/cpp/storage/file_system.h | 2 +
.../src/cpp/storage/local_file_system.cc | 22 ++++
.../src/cpp/storage/orc/orc_format_reader.cc | 10 +-
10 files changed, 314 insertions(+), 8 deletions(-)
diff --git a/contrib/pax_storage/doc/README.md
b/contrib/pax_storage/doc/README.md
index 39077955a0c..43393f2f859 100644
--- a/contrib/pax_storage/doc/README.md
+++ b/contrib/pax_storage/doc/README.md
@@ -46,6 +46,7 @@ PAX will be built with `--enable-pax` when you build the
Cloudberry. Dependency
- **CMake**: 3.11 or later
- **Protobuf**: 3.5.0 or later
- **ZSTD (libzstd)**: 1.4.0 or later
+- **liburing**: 2.12 or later
Also, you need to run the following command at the top level of the Cloudberry
source code directory to download the submodules:
diff --git a/contrib/pax_storage/src/cpp/cmake/pax.cmake
b/contrib/pax_storage/src/cpp/cmake/pax.cmake
index 099a66f30d8..528b4e8cafc 100644
--- a/contrib/pax_storage/src/cpp/cmake/pax.cmake
+++ b/contrib/pax_storage/src/cpp/cmake/pax.cmake
@@ -30,6 +30,7 @@ set(pax_comm_src
comm/bitmap.cc
comm/bloomfilter.cc
comm/byte_buffer.cc
+ comm/fast_io.cc
comm/guc.cc
comm/paxc_wrappers.cc
comm/pax_memory.cc
@@ -173,7 +174,7 @@ add_subdirectory(contrib/tabulate)
set(pax_target_src ${PROTO_SRCS} ${pax_storage_src} ${pax_clustering_src}
${pax_exceptions_src}
${pax_access_src} ${pax_comm_src} ${pax_catalog_src} ${pax_vec_src})
set(pax_target_include ${pax_target_include} ${ZTSD_HEADER}
${CMAKE_CURRENT_SOURCE_DIR} ${CBDB_INCLUDE_DIR} contrib/tabulate/include)
-set(pax_target_link_libs ${pax_target_link_libs} protobuf zstd z postgres)
+set(pax_target_link_libs ${pax_target_link_libs} protobuf zstd z postgres
uring)
if (PAX_USE_LZ4)
list(APPEND pax_target_link_libs lz4)
endif()
@@ -207,7 +208,7 @@ endif(VEC_BUILD)
target_include_directories(pax PUBLIC ${pax_target_include})
target_link_directories(pax PUBLIC ${pax_target_link_directories})
-target_link_libraries(pax PUBLIC ${pax_target_link_libs})
+target_link_libraries(pax PRIVATE ${pax_target_link_libs})
set_target_properties(pax PROPERTIES
BUILD_RPATH_USE_ORIGIN ON
BUILD_WITH_INSTALL_RPATH ON
@@ -233,8 +234,8 @@ if (BUILD_GTEST)
add_dependencies(test_main ${pax_target_dependencies} gtest gmock)
target_include_directories(test_main PUBLIC ${pax_target_include}
${CMAKE_CURRENT_SOURCE_DIR} ${gtest_SOURCE_DIR}/include contrib/cpp-stub/src/
contrib/cpp-stub/src_linux/)
- target_link_directories(test_main PUBLIC ${pax_target_link_directories})
- target_link_libraries(test_main PUBLIC ${pax_target_link_libs} gtest gmock
postgres)
+ target_link_directories(test_main PRIVATE ${pax_target_link_directories})
+ target_link_libraries(test_main PRIVATE ${pax_target_link_libs} gtest gmock
postgres)
endif(BUILD_GTEST)
if(BUILD_GBENCH)
diff --git a/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
b/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
index 5a12185a0e6..8d28e793d27 100644
--- a/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
+++ b/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
@@ -20,6 +20,7 @@ set(pax_comm_src
comm/bitmap.cc
comm/bloomfilter.cc
comm/byte_buffer.cc
+ comm/fast_io.cc
comm/guc.cc
comm/paxc_wrappers.cc
comm/pax_memory.cc
@@ -108,7 +109,7 @@ set(pax_vec_src ${pax_vec_src}
endif()
set(pax_target_include ${ZTSD_HEADER} ${CMAKE_CURRENT_SOURCE_DIR}
${CBDB_INCLUDE_DIR} contrib/tabulate/include)
-set(pax_target_link_libs uuid protobuf zstd z)
+set(pax_target_link_libs uuid protobuf zstd z uring)
if (PAX_USE_LZ4)
list(APPEND pax_target_link_libs lz4)
endif()
@@ -135,7 +136,7 @@ endif(VEC_BUILD)
add_library(paxformat SHARED ${PROTO_SRCS} ${pax_storage_src}
${pax_clustering_src} ${pax_exceptions_src} ${pax_comm_src} ${pax_vec_src})
target_include_directories(paxformat PUBLIC ${pax_target_include})
target_link_directories(paxformat PUBLIC ${pax_target_link_directories})
-target_link_libraries(paxformat PUBLIC ${pax_target_link_libs})
+target_link_libraries(paxformat PRIVATE ${pax_target_link_libs})
set_target_properties(paxformat PROPERTIES
OUTPUT_NAME paxformat)
@@ -196,4 +197,4 @@ install(TARGETS paxformat
add_executable(paxformat_test paxformat_test.cc)
target_include_directories(paxformat_test PUBLIC ${pax_target_include}
${CMAKE_CURRENT_SOURCE_DIR})
add_dependencies(paxformat_test paxformat)
-target_link_libraries(paxformat_test PUBLIC paxformat postgres)
+target_link_libraries(paxformat_test PRIVATE paxformat postgres)
diff --git a/contrib/pax_storage/src/cpp/comm/common_io.h
b/contrib/pax_storage/src/cpp/comm/common_io.h
new file mode 100644
index 00000000000..44730376054
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/comm/common_io.h
@@ -0,0 +1,39 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * common_io.h
+ *
+ * IDENTIFICATION
+ * contrib/pax_storage/src/cpp/comm/common_io.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#pragma once
+#include <cstddef>
+#include <fcntl.h>
+
+namespace pax
+{
+struct IORequest {
+ void* buffer;
+ size_t size;
+ off_t offset;
+};
+} // namespace pax
diff --git a/contrib/pax_storage/src/cpp/comm/fast_io.cc
b/contrib/pax_storage/src/cpp/comm/fast_io.cc
new file mode 100644
index 00000000000..5b9e593def9
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/comm/fast_io.cc
@@ -0,0 +1,134 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * fast_io.cc
+ *
+ * IDENTIFICATION
+ * contrib/pax_storage/src/cpp/comm/fast_io.cc
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "fast_io.h"
+
+namespace pax
+{
+
+bool IOUringFastIO::available() {
+ static char support_io_uring = 0;
+
+ if (support_io_uring == 1) return true;
+ if (support_io_uring == -1) return false;
+
+ struct io_uring ring;
+ bool supported = (io_uring_queue_init(128, &ring, 0) == 0);
+ if (supported) {
+ io_uring_queue_exit(&ring);
+ }
+ support_io_uring = supported ? 1 : -1;
+ return supported;
+}
+
+// if pair.first == 0, all read requests are successful
+// pair.second indicates the number of successful read requests
+std::pair<int, int> IOUringFastIO::read(int fd, std::vector<IORequest>
&request, std::vector<bool> &result) {
+ size_t index = 0;
+ int success_read = 0;
+ int retcode = 0;
+ size_t completed = 0;
+ size_t total_requests = request.size();
+
+ // Implementation for synchronous read using io_uring
+ if (uring_likely(request.empty())) return {0, 0};
+ if (status_ != 'i') return {-EINVAL, 0};
+
+ result.resize(request.size(), false);
+
+ while (completed < total_requests) {
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ unsigned head;
+ unsigned count;
+ int rc;
+ // Submit read requests
+ while (index < total_requests) {
+ sqe = io_uring_get_sqe(&ring_);
+ if (!sqe) break; // No more SQEs available, retry later
+
+ io_uring_prep_read(sqe, fd, request[index].buffer, request[index].size,
request[index].offset);
+ io_uring_sqe_set_data(sqe, (void*)(uintptr_t)index);
+ index++;
+ }
+
+ // submit and wait for completions
+ do {
+ rc = io_uring_submit_and_wait(&ring_, 1);
+ } while (rc == -EINTR);
+ if (rc < 0) return {rc, success_read};
+
+ count = 0;
+ io_uring_for_each_cqe(&ring_, head, cqe) {
+ size_t req_index = (size_t)(uintptr_t)io_uring_cqe_get_data(cqe);
+ if (cqe->res >= 0) {
+ // Successful read
+ result[req_index] = true;
+ success_read++;
+ } else if (retcode == 0) {
+ retcode = cqe->res; // capture the first error
+ }
+ completed++;
+ count++;
+ }
+ io_uring_cq_advance(&ring_, count);
+ }
+ return {retcode, success_read}; // Placeholder
+}
+
+std::pair<int, int> SyncFastIO::read(int fd, std::vector<IORequest> &request,
std::vector<bool> &result) {
+ size_t total_requests = request.size();
+ if (total_requests == 0) return {0, 0};
+
+ result.resize(total_requests, false);
+
+ int success_read = 0;
+ int retcode = 0;
+
+ for (size_t i = 0; i < total_requests; ++i) {
+ ssize_t bytes_read = 0;
+ ssize_t nbytes;
+ auto &req = request[i];
+ do {
+ nbytes = pread(fd, (char *)req.buffer + bytes_read, req.size -
bytes_read, req.offset + bytes_read);
+ if (nbytes > 0) bytes_read += nbytes;
+ } while ((nbytes == -1 && errno == EINTR) || (nbytes > 0 &&
static_cast<size_t>(bytes_read) < req.size));
+
+ if (bytes_read < 0) {
+ if (retcode == 0) {
+ retcode = static_cast<int>(bytes_read); // capture first error
+ }
+ } else if (static_cast<size_t>(bytes_read) == request[i].size) {
+ result[i] = true;
+ success_read++;
+ }
+ }
+
+ return {retcode, success_read};
+}
+
+} // namespace pax
\ No newline at end of file
diff --git a/contrib/pax_storage/src/cpp/comm/fast_io.h
b/contrib/pax_storage/src/cpp/comm/fast_io.h
new file mode 100644
index 00000000000..da63b4d89ea
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/comm/fast_io.h
@@ -0,0 +1,88 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * fast_io.h
+ *
+ * IDENTIFICATION
+ * contrib/pax_storage/src/cpp/comm/fast_io.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#pragma once
+
+#include "comm/common_io.h"
+
+#include <liburing.h>
+#include <cstddef>
+#include <cstdio>
+#include <algorithm>
+#include <vector>
+
+namespace pax
+{
+
+template<typename T>
+int fast_io_read(int fd, std::vector<IORequest> &request) {
+ T io_handler(request.size());
+ return io_handler.read(fd, request).first;
+}
+
+template<typename T>
+std::pair<int, int> fast_io_read2(int fd, std::vector<IORequest> &request) {
+ T io_handler(request.size());
+ return io_handler.read(fd, request);
+}
+
+class SyncFastIO {
+public:
+ SyncFastIO(size_t dummy_queue_size = 0) {}
+ std::pair<int, int> read(int fd, std::vector<IORequest> &request,
std::vector<bool> &result);
+};
+
+// io_uring-based FastIO
+class IOUringFastIO {
+public:
+ IOUringFastIO(size_t queue_size = 128) {
+ int ret = io_uring_queue_init(std::max(queue_size,
static_cast<size_t>(128)), &ring_, 0);
+
+ // ret < 0: unsupported
+ // otherwise initialized
+ status_ = ret < 0 ? 'x' : 'i';
+ }
+
+ ~IOUringFastIO() {
+ if (status_ == 'i')
+ io_uring_queue_exit(&ring_);
+ }
+
+ static bool available();
+
+ // if pair.first == 0, all read requests are successful
+ // pair.second indicates the number of successful read requests
+ std::pair<int, int> read(int fd, std::vector<IORequest> &request,
std::vector<bool> &result);
+
+private:
+ struct io_uring ring_;
+
+ // 'u' for uninitialized, 'i' for initialized, 'x' for unsupported
+ char status_ = 'u';
+};
+
+} // namespace pax
\ No newline at end of file
diff --git a/contrib/pax_storage/src/cpp/storage/file_system.cc
b/contrib/pax_storage/src/cpp/storage/file_system.cc
index fa0667241e8..2ac4803ba8c 100644
--- a/contrib/pax_storage/src/cpp/storage/file_system.cc
+++ b/contrib/pax_storage/src/cpp/storage/file_system.cc
@@ -65,4 +65,14 @@ void File::PWriteN(const void *buf, size_t count, off64_t
offset) {
"errno=%d], %s",
offset, count, num, errno, DebugString().c_str()));
}
+
+void File::ReadBatch(const std::vector<IORequest> &requests) const {
+ if (requests.empty()) {
+ return;
+ }
+ for (const auto &req : requests) {
+ PReadN(req.buffer, req.size, req.offset);
+ }
+}
+
} // namespace pax
diff --git a/contrib/pax_storage/src/cpp/storage/file_system.h
b/contrib/pax_storage/src/cpp/storage/file_system.h
index ca1af8877cc..6569ee3b858 100644
--- a/contrib/pax_storage/src/cpp/storage/file_system.h
+++ b/contrib/pax_storage/src/cpp/storage/file_system.h
@@ -33,6 +33,7 @@
#include <string>
#include <vector>
+#include "comm/common_io.h"
#include "comm/pax_memory.h"
namespace pax {
@@ -74,6 +75,7 @@ class File {
virtual void WriteN(const void *ptr, size_t n);
virtual void PWriteN(const void *buf, size_t count, off_t offset);
virtual void PReadN(void *buf, size_t count, off_t offset) const;
+ virtual void ReadBatch(const std::vector<IORequest> &requests) const;
virtual void Flush() = 0;
virtual void Delete() = 0;
diff --git a/contrib/pax_storage/src/cpp/storage/local_file_system.cc
b/contrib/pax_storage/src/cpp/storage/local_file_system.cc
index 1c71eceb7a9..82afafbfcfd 100644
--- a/contrib/pax_storage/src/cpp/storage/local_file_system.cc
+++ b/contrib/pax_storage/src/cpp/storage/local_file_system.cc
@@ -35,6 +35,7 @@
#include "access/pax_access_handle.h"
#include "comm/cbdb_wrappers.h"
+#include "comm/fast_io.h"
#include "comm/fmt.h"
#include "comm/pax_memory.h"
#include "comm/pax_resource.h"
@@ -51,6 +52,7 @@ class LocalFile final : public File {
ssize_t Write(const void *ptr, size_t n) override;
ssize_t PWrite(const void *ptr, size_t n, off_t offset) override;
ssize_t PRead(void *ptr, size_t n, off_t offset) const override;
+ void ReadBatch(const std::vector<IORequest> &requests) const override;
size_t FileLength() const override;
void Flush() override;
void Delete() override;
@@ -132,6 +134,26 @@ ssize_t LocalFile::PWrite(const void *ptr, size_t n, off_t
offset) {
return num;
}
+void LocalFile::ReadBatch(const std::vector<IORequest> &requests) const {
+ if (unlikely(requests.empty())) return;
+
+ if (IOUringFastIO::available()) {
+ IOUringFastIO fast_io(requests.size());
+ std::vector<bool> result(requests.size(), false);
+ auto res = fast_io.read(fd_,
const_cast<std::vector<IORequest>&>(requests), result);
+ CBDB_CHECK(res.first == 0, cbdb::CException::ExType::kExTypeIOError,
+ fmt("Fail to ReadBatch with io_uring [successful=%d,
total=%lu], %s",
+ res.second, requests.size(), DebugString().c_str()));
+ } else {
+ SyncFastIO fast_io;
+ std::vector<bool> result(requests.size(), false);
+ auto res = fast_io.read(fd_,
const_cast<std::vector<IORequest>&>(requests), result);
+ CBDB_CHECK(res.first == 0, cbdb::CException::ExType::kExTypeIOError,
+ fmt("Fail to ReadBatch with sync read [successful=%d,
total=%lu], %s",
+ res.second, requests.size(), DebugString().c_str()));
+ }
+}
+
size_t LocalFile::FileLength() const {
struct stat file_stat {};
int rc;
diff --git a/contrib/pax_storage/src/cpp/storage/orc/orc_format_reader.cc
b/contrib/pax_storage/src/cpp/storage/orc/orc_format_reader.cc
index 9e939305c61..ad1736f4f08 100644
--- a/contrib/pax_storage/src/cpp/storage/orc/orc_format_reader.cc
+++ b/contrib/pax_storage/src/cpp/storage/orc/orc_format_reader.cc
@@ -327,6 +327,7 @@ pax::porc::proto::StripeFooter
OrcFormatReader::ReadStripeWithProjection(
batch_offset = stripe_footer_offset;
+ std::vector<IORequest> io_requests;
while (index < column_types_.size()) {
// Current column have been skipped
// Move `batch_offset` and `streams_index` to the right position
@@ -398,10 +399,17 @@ pax::porc::proto::StripeFooter
OrcFormatReader::ReadStripeWithProjection(
continue;
}
- file_->PReadN(data_buffer->GetAvailableBuffer(), batch_len, batch_offset);
+ {
+ IORequest io_request;
+ io_request.offset = batch_offset;
+ io_request.size = batch_len;
+ io_request.buffer = data_buffer->GetAvailableBuffer();
+ io_requests.emplace_back(io_request);
+ }
data_buffer->Brush(batch_len);
batch_offset += batch_len;
}
+ file_->ReadBatch(io_requests);
return stripe_footer;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]