This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a175073 feat: add IO stream infrastructure (#39)
a175073 is described below
commit a175073643108001ab8b0984437c92a002a70541
Author: lszskye <[email protected]>
AuthorDate: Thu Jun 4 11:17:22 2026 +0800
feat: add IO stream infrastructure (#39)
---
include/paimon/io/buffered_input_stream.h | 91 +++++++
include/paimon/io/byte_array_input_stream.h | 67 +++++
include/paimon/io/data_input_stream.h | 97 ++++++++
src/paimon/common/io/buffered_input_stream.cpp | 167 +++++++++++++
.../common/io/buffered_input_stream_test.cpp | 270 +++++++++++++++++++++
src/paimon/common/io/byte_array_input_stream.cpp | 107 ++++++++
.../common/io/byte_array_input_stream_test.cpp | 84 +++++++
.../common/io/data_input_output_stream_test.cpp | 208 ++++++++++++++++
src/paimon/common/io/data_input_stream.cpp | 129 ++++++++++
src/paimon/common/io/data_output_stream.cpp | 70 ++++++
src/paimon/common/io/data_output_stream.h | 79 ++++++
.../common/io/memory_segment_output_stream.cpp | 99 ++++++++
.../common/io/memory_segment_output_stream.h | 118 +++++++++
.../io/memory_segment_output_stream_test.cpp | 85 +++++++
src/paimon/common/io/offset_input_stream.cpp | 137 +++++++++++
src/paimon/common/io/offset_input_stream.h | 67 +++++
src/paimon/common/io/offset_input_stream_test.cpp | 216 +++++++++++++++++
17 files changed, 2091 insertions(+)
diff --git a/include/paimon/io/buffered_input_stream.h
b/include/paimon/io/buffered_input_stream.h
new file mode 100644
index 0000000..2af6b73
--- /dev/null
+++ b/include/paimon/io/buffered_input_stream.h
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Bytes;
+class MemoryPool;
+
+/// A buffered input stream that wraps another `InputStream` to provide
buffering capabilities.
+///
+/// `BufferedInputStream` improves I/O performance by reducing the number of
system calls
+/// through internal buffering. It reads data from the underlying stream in
larger chunks
+/// and serves subsequent read requests from the internal buffer when possible.
+class PAIMON_EXPORT BufferedInputStream : public InputStream {
+ public:
+ /// Creates a new buffered input stream that wraps the provided input
stream.
+ /// The buffer is allocated from the specified memory pool.
+ ///
+ /// @param in The underlying input stream to wrap.
+ /// @param buffer_size Size of the internal buffer in bytes.
+ /// @param pool Memory pool for buffer allocation.
+ BufferedInputStream(const std::shared_ptr<InputStream>& in, int32_t
buffer_size,
+ MemoryPool* pool);
+
+ ~BufferedInputStream() noexcept override;
+
+ Status Seek(int64_t offset, SeekOrigin origin) override;
+
+ Result<int64_t> GetPos() const override;
+
+ Result<int32_t> Read(char* buffer, uint32_t size) override;
+
+ Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset)
override;
+
+ void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+ std::function<void(Status)>&& callback) override;
+
+ Result<uint64_t> Length() const override;
+
+ Status Close() override;
+
+ Result<std::string> GetUri() const override;
+
+ static constexpr int32_t DEFAULT_BUFFER_SIZE = 8192;
+
+ private:
+ /// Fill the internal buffer from the underlying stream.
+ Status Fill();
+
+ /// Internal read implementation.
+ /// @pre size > 0
+ Result<int32_t> InnerRead(char* buffer, int32_t size);
+
+ /// Validate that the expected number of bytes were read.
+ Status AssertReadLength(int32_t read_length, int32_t actual_read_length)
const;
+
+ private:
+ int32_t buffer_size_;
+ int32_t pos_ = 0;
+ int32_t count_ = 0;
+ std::unique_ptr<Bytes> buffer_;
+ std::shared_ptr<InputStream> in_;
+};
+
+} // namespace paimon
diff --git a/include/paimon/io/byte_array_input_stream.h
b/include/paimon/io/byte_array_input_stream.h
new file mode 100644
index 0000000..e66c5e8
--- /dev/null
+++ b/include/paimon/io/byte_array_input_stream.h
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <string>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// Input stream for memory buffer, inherits from `InputStream`.
+class PAIMON_EXPORT ByteArrayInputStream : public InputStream {
+ public:
+ ByteArrayInputStream(const char* buffer, uint64_t length);
+ ~ByteArrayInputStream() override = default;
+
+ /// @return The raw data pointer of current pos.
+ const char* GetRawData() const;
+
+ Status Seek(int64_t offset, SeekOrigin origin) override;
+
+ Result<int64_t> GetPos() const override {
+ return position_;
+ }
+
+ Result<int32_t> Read(char* buffer, uint32_t size) override;
+
+ Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset)
override;
+
+ void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+ std::function<void(Status)>&& callback) override;
+
+ Result<uint64_t> Length() const override {
+ return length_;
+ }
+
+ Status Close() override;
+
+ Result<std::string> GetUri() const override;
+
+ private:
+ const char* buffer_;
+ const uint64_t length_;
+ int64_t position_;
+};
+} // namespace paimon
diff --git a/include/paimon/io/data_input_stream.h
b/include/paimon/io/data_input_stream.h
new file mode 100644
index 0000000..8dc15a2
--- /dev/null
+++ b/include/paimon/io/data_input_stream.h
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "paimon/io/byte_order.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Bytes;
+class InputStream;
+
+/// `DataInputStream` provides a convenient wrapper around `InputStream` for
reading typed data.
+/// @note The default byte order is big-endian to maintain compatibility with
the Java
+/// implementation.
+class PAIMON_EXPORT DataInputStream {
+ public:
+ /// Constructs a `DataInputStream` wrapping the given `InputStream`.
+ /// @param input_stream The underlying input stream to read from.
+ explicit DataInputStream(const std::shared_ptr<InputStream>& input_stream);
+
+ /// Seek to a specific position in the underlying input stream.
+ /// @param offset The absolute byte offset to seek to.
+ Status Seek(int64_t offset) const;
+
+ /// Read a typed value from the stream.
+ /// @return Result containing the read value or an error status.
+ template <typename T>
+ Result<T> ReadValue() const;
+
+ /// Read some bytes to a `Bytes` object from the stream. The length of
bytes is the number of
+ /// bytes read from the stream.
+ /// @param bytes Buffer to store the read bytes.
+ Status ReadBytes(Bytes* bytes) const;
+
+ /// Read raw data of specified size from the stream.
+ /// @param data Buffer to store the read data.
+ /// @param size Number of bytes to read.
+ Status Read(char* data, uint32_t size) const;
+
+ /// Read string from the stream.
+ /// @note First read length (int16), then read string bytes.
+ Result<std::string> ReadString() const;
+
+ /// Get the current position in the underlying input stream.
+ Result<int64_t> GetPos() const;
+
+ /// Get the total length of the underlying input stream.
+ Result<uint64_t> Length() const;
+
+ /// Set the byte order for endianness conversion.
+ /// @param order The byte order to use `PAIMON_BIG_ENDIAN` or
`PAIMON_LITTLE_ENDIAN`.
+ void SetOrder(ByteOrder order) {
+ byte_order_ = order;
+ }
+
+ private:
+ /// Validate that the expected number of bytes were read.
+ /// @param read_length Expected number of bytes to read.
+ /// @param actual_read_length Actual number of bytes read.
+ Status AssertReadLength(int32_t read_length, int32_t actual_read_length)
const;
+
+ /// Check if there are enough bytes available to read.
+ /// @param need_length Number of bytes needed.
+ Status AssertBoundary(int32_t need_length) const;
+
+ /// Determine if byte swapping is needed based on current byte order and
system endianness.
+ /// @return `true` if byte swapping is required, `false` otherwise.
+ bool NeedSwap() const;
+
+ private:
+ std::shared_ptr<InputStream> input_stream_;
+ ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN;
+};
+} // namespace paimon
diff --git a/src/paimon/common/io/buffered_input_stream.cpp
b/src/paimon/common/io/buffered_input_stream.cpp
new file mode 100644
index 0000000..d2b50b7
--- /dev/null
+++ b/src/paimon/common/io/buffered_input_stream.cpp
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/io/buffered_input_stream.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cstring>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+class MemoryPool;
+
+BufferedInputStream::BufferedInputStream(const std::shared_ptr<InputStream>&
in,
+ int32_t buffer_size, MemoryPool* pool)
+ : buffer_size_(buffer_size), in_(in) {
+ assert(buffer_size > 0);
+ buffer_ = std::make_unique<Bytes>(buffer_size, pool);
+}
+
+BufferedInputStream::~BufferedInputStream() noexcept = default;
+
+Status BufferedInputStream::Seek(int64_t offset, SeekOrigin origin) {
+ // Convert all seek origins to an absolute offset so the buffer-hit fast
+ // path below can work uniformly on absolute positions.
+ int64_t target_abs_offset = offset;
+ if (origin == SeekOrigin::FS_SEEK_CUR) {
+ PAIMON_ASSIGN_OR_RAISE(int64_t cur_pos, GetPos());
+ target_abs_offset = cur_pos + offset;
+ } else if (origin == SeekOrigin::FS_SEEK_END) {
+ PAIMON_ASSIGN_OR_RAISE(int64_t length, in_->Length());
+ target_abs_offset = length + offset;
+ }
+ // else: FS_SEEK_SET — target_abs_offset is already absolute.
+
+ // Fast path: if the new absolute offset still falls into the bytes already
+ // cached in buffer_ (i.e. the window from buf_start_abs to buf_end_abs),
just
+ // adjust pos_ without touching the underlying stream.
+ if (count_ > 0) {
+ PAIMON_ASSIGN_OR_RAISE(int64_t in_pos, in_->GetPos());
+ const int64_t buf_start_abs = in_pos - count_;
+ const int64_t buf_end_abs = in_pos;
+ if (target_abs_offset >= buf_start_abs && target_abs_offset <=
buf_end_abs) {
+ pos_ = static_cast<int32_t>(target_abs_offset - buf_start_abs);
+ return Status::OK();
+ }
+ }
+
+ // Slow path: the target is outside the current buffer window, fall back to
+ // a real seek on the underlying stream and invalidate the buffer.
+ PAIMON_RETURN_NOT_OK(in_->Seek(target_abs_offset, FS_SEEK_SET));
+ pos_ = 0;
+ count_ = 0;
+ return Status::OK();
+}
+
+Result<int64_t> BufferedInputStream::GetPos() const {
+ PAIMON_ASSIGN_OR_RAISE(int64_t in_pos, in_->GetPos());
+ return in_pos - count_ + pos_;
+}
+
+Result<int32_t> BufferedInputStream::Read(char* buffer, uint32_t size) {
+ uint32_t actual_read_len = 0;
+ while (actual_read_len < size) {
+ PAIMON_ASSIGN_OR_RAISE(int32_t nread,
+ InnerRead(buffer + actual_read_len, size -
actual_read_len));
+ assert(nread > 0);
+ actual_read_len += nread;
+ }
+ PAIMON_RETURN_NOT_OK(AssertReadLength(size, actual_read_len));
+ return actual_read_len;
+}
+
+Result<int32_t> BufferedInputStream::Read(char* buffer, uint32_t size,
uint64_t offset) {
+ return Status::Invalid("BufferedInputStream does not support Read from
offset");
+}
+
+void BufferedInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t
offset,
+ std::function<void(Status)>&& callback) {
+ callback(Status::NotImplemented("BufferedInputStream do not support
ReadAsync"));
+}
+
+Result<uint64_t> BufferedInputStream::Length() const {
+ return in_->Length();
+}
+
+Status BufferedInputStream::Close() {
+ pos_ = 0;
+ count_ = 0;
+ buffer_.reset();
+ return Status::OK();
+}
+
+Result<std::string> BufferedInputStream::GetUri() const {
+ return in_->GetUri();
+}
+
+Status BufferedInputStream::Fill() {
+ pos_ = 0;
+ count_ = 0;
+ PAIMON_ASSIGN_OR_RAISE(int64_t in_pos, in_->GetPos());
+ PAIMON_ASSIGN_OR_RAISE(int64_t length, in_->Length());
+ int64_t left_to_read = std::min((length - in_pos),
static_cast<int64_t>(buffer_size_));
+ PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len, in_->Read(buffer_->data(),
left_to_read));
+ PAIMON_RETURN_NOT_OK(AssertReadLength(left_to_read, actual_read_len));
+ count_ = actual_read_len;
+ return Status::OK();
+}
+
+Result<int32_t> BufferedInputStream::InnerRead(char* buffer, int32_t size) {
+ assert(size > 0);
+ int32_t avail = count_ - pos_;
+ if (avail <= 0) {
+ assert(avail == 0);
+ /* If the requested length is at least as large as the buffer, and
+ if there is no mark/reset activity, do not bother to copy the
+ bytes into the local buffer. In this way buffered streams will
+ cascade harmlessly. */
+ if (size >= buffer_size_) {
+ return in_->Read(buffer, size);
+ }
+ PAIMON_RETURN_NOT_OK(Fill());
+ avail = count_ - pos_;
+ if (avail <= 0) {
+ return Status::Invalid(fmt::format(
+ "InnerRead failed, after Fill(), still no bytes available (may
read eof), but "
+ "expect read {} bytes",
+ size));
+ }
+ }
+ int32_t copy_length = std::min(avail, size);
+ memcpy(buffer, buffer_->data() + pos_, copy_length);
+ pos_ += copy_length;
+ return copy_length;
+}
+
+Status BufferedInputStream::AssertReadLength(int32_t read_length,
+ int32_t actual_read_length) const
{
+ if (read_length != actual_read_length) {
+ return Status::Invalid(
+ fmt::format("assert read length failed: read length not match,
read length {}, actual "
+ "read length {}",
+ read_length, actual_read_length));
+ }
+ return Status::OK();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/io/buffered_input_stream_test.cpp
b/src/paimon/common/io/buffered_input_stream_test.cpp
new file mode 100644
index 0000000..0f6c9ef
--- /dev/null
+++ b/src/paimon/common/io/buffered_input_stream_test.cpp
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/io/buffered_input_stream.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(BufferedInputStreamTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ auto output_stream =
std::make_unique<MemorySegmentOutputStream>(/*segment_size=*/8, pool);
+ std::string str = "abcdef";
+ auto bytes = std::make_shared<Bytes>(str, pool.get());
+ output_stream->WriteBytes(bytes);
+ auto out_bytes =
MemorySegmentUtils::CopyToBytes(output_stream->Segments(), 0,
+
output_stream->CurrentSize(), pool.get());
+ auto in = std::make_shared<ByteArrayInputStream>(out_bytes->data(),
out_bytes->size());
+ auto input_stream = std::make_shared<BufferedInputStream>(in,
/*buffer_size=*/4, pool.get());
+ ASSERT_EQ(6, input_stream->Length().value());
+ ASSERT_TRUE(input_stream->GetUri().value().empty());
+
+ // read from pos 0
+ std::string value(3, '\0');
+ ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value());
+ ASSERT_EQ("abc", value);
+ ASSERT_EQ(3, input_stream->GetPos().value());
+ ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value());
+ ASSERT_EQ("def", value);
+ ASSERT_EQ(6, input_stream->GetPos().value());
+
+ // read from pos 1
+ ASSERT_NOK_WITH_MSG(input_stream->Read(value.data(), value.size(),
/*offset=*/1),
+ "BufferedInputStream does not support Read from
offset");
+ ASSERT_EQ(6, input_stream->GetPos().value());
+
+ // seek to pos 3
+ ASSERT_OK(input_stream->Seek(3, SeekOrigin::FS_SEEK_SET));
+ ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value());
+ ASSERT_EQ("def", value);
+ ASSERT_EQ(6, input_stream->GetPos().value());
+
+ // seek to pos 2
+ ASSERT_OK(input_stream->Seek(-4, SeekOrigin::FS_SEEK_END));
+ ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value());
+ ASSERT_EQ("cde", value);
+ ASSERT_EQ(5, input_stream->GetPos().value());
+
+ // seek to pos 1
+ ASSERT_OK(input_stream->Seek(-4, SeekOrigin::FS_SEEK_CUR));
+ ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value());
+ ASSERT_EQ("bcd", value);
+ ASSERT_EQ(4, input_stream->GetPos().value());
+
+ // test exceed eof, seek to pos 4, want to read 4 bytes
+ ASSERT_OK(input_stream->Seek(-2, SeekOrigin::FS_SEEK_END));
+ ASSERT_NOK_WITH_MSG(input_stream->Read(value.data(), value.size()),
+ "InnerRead failed, after Fill(), still no bytes
available (may read eof), "
+ "but expect read 1 bytes");
+
+ // test invalid seek
+ ASSERT_NOK_WITH_MSG(input_stream->Seek(100, SeekOrigin::FS_SEEK_CUR),
+ "invalid seek, after seek, current pos 106, length 6");
+
+ // test ReadAsync not implemented
+ bool read_finished = false;
+ auto callback = [&](Status status) {
+ ASSERT_TRUE(status.IsNotImplemented());
+ read_finished = true;
+ };
+ input_stream->ReadAsync(value.data(), value.size(), /*offset=*/0,
callback);
+ ASSERT_TRUE(read_finished);
+
+ ASSERT_OK(input_stream->Close());
+}
+
+TEST(BufferedInputStreamTest, TestSeek) {
+ // Data: "0123456789abcdef" (16 bytes), buffer_size = 8
+ auto pool = GetDefaultPool();
+ std::string data = "0123456789abcdef";
+ auto in = std::make_shared<ByteArrayInputStream>(reinterpret_cast<const
char*>(data.data()),
+ data.size());
+ auto stream = std::make_shared<BufferedInputStream>(in, /*buffer_size=*/8,
pool.get());
+
+ // Helper: verify buffer_ content matches expected substring of data
+ auto check_buffer = [&](const BufferedInputStream& s, const std::string&
expected_content,
+ int32_t expected_pos, int32_t expected_count) {
+ ASSERT_EQ(s.pos_, expected_pos);
+ ASSERT_EQ(s.count_, expected_count);
+ std::string actual(s.buffer_->data(), s.buffer_->data() +
expected_count);
+ ASSERT_EQ(actual, expected_content) << "buffer content mismatch:
actual=\"" << actual
+ << "\", expected=\"" <<
expected_content << "\"";
+ };
+
+ std::string buf(4, '\0');
+
+ // Initial state: buffer empty
+ ASSERT_EQ(stream->pos_, 0);
+ ASSERT_EQ(stream->count_, 0);
+
+ // FS_SEEK_SET slow path (buffer empty, count_==0): seek to pos 4, read
"4567"
+ // First Read calls Fill: reads 8 bytes from pos 4 -> buffer = "456789ab",
count_=8.
+ // Then consumes 4 bytes -> pos_=4.
+ {
+ ASSERT_OK(stream->Seek(4, SeekOrigin::FS_SEEK_SET));
+ // After seek: slow path, buffer invalidated
+ ASSERT_EQ(stream->pos_, 0);
+ ASSERT_EQ(stream->count_, 0);
+ ASSERT_EQ(4, stream->GetPos().value());
+
+ ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+ ASSERT_EQ("4567", buf);
+ check_buffer(*stream, "456789ab", 4, 8);
+ }
+
+ // FS_SEEK_CUR buffer hit: pos=8, seek -4 -> target=4, inside buffer
[4..12)
+ // Fast path: only adjusts pos_ to 0, count_ stays 8, buffer unchanged.
+ {
+ ASSERT_OK(stream->Seek(-4, SeekOrigin::FS_SEEK_CUR));
+ check_buffer(*stream, "456789ab", 0, 8);
+ ASSERT_EQ(4, stream->GetPos().value());
+
+ ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+ ASSERT_EQ("4567", buf);
+ check_buffer(*stream, "456789ab", 4, 8);
+ }
+
+ // FS_SEEK_SET buffer hit: pos=8, seek to 5, inside buffer [4..12)
+ // Fast path: pos_ = 5 - 4 = 1, count_ stays 8, buffer unchanged.
+ {
+ ASSERT_OK(stream->Seek(5, SeekOrigin::FS_SEEK_SET));
+ check_buffer(*stream, "456789ab", 1, 8);
+ ASSERT_EQ(5, stream->GetPos().value());
+
+ ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+ ASSERT_EQ("5678", buf);
+ check_buffer(*stream, "456789ab", 5, 8);
+ }
+
+ // FS_SEEK_END buffer miss: target = 16 + (-6) = 10
+ // Current buffer covers [4..12). pos 10 is inside [4..12) -> actually
buffer hit!
+ // Fast path: pos_ = 10 - 4 = 6, count_ stays 8.
+ {
+ ASSERT_OK(stream->Seek(-6, SeekOrigin::FS_SEEK_END));
+ check_buffer(*stream, "456789ab", 6, 8);
+ ASSERT_EQ(10, stream->GetPos().value());
+
+ // Read 4 bytes from pos 10: 2 left in buffer ("ab"), then refill.
+ ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+ ASSERT_EQ("abcd", buf);
+ // After consuming "ab" (pos_==count_==8), InnerRead calls Fill from
in_ pos 12,
+ // reads [12..16) -> buffer = "cdef", count_=4, then consumes 2 ->
pos_=2.
+ check_buffer(*stream, "cdef", 2, 4);
+ }
+
+ // Buffer miss slow path: seek to pos 0, current buffer covers [12..16).
+ // Target 0 is outside [12..16) -> slow path, buffer invalidated.
+ {
+ ASSERT_OK(stream->Seek(0, SeekOrigin::FS_SEEK_SET));
+ ASSERT_EQ(stream->pos_, 0);
+ ASSERT_EQ(stream->count_, 0);
+ ASSERT_EQ(0, stream->GetPos().value());
+
+ ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+ ASSERT_EQ("0123", buf);
+ // Fill reads [0..8) -> buffer = "01234567", count_=8, pos_=4
+ check_buffer(*stream, "01234567", 4, 8);
+ }
+
+ // Buffer hit at exact boundary start: seek to pos 0 (= buf_start_abs=0)
+ // Fast path: pos_ = 0, count_ stays 8, buffer unchanged.
+ {
+ ASSERT_OK(stream->Seek(0, SeekOrigin::FS_SEEK_SET));
+ check_buffer(*stream, "01234567", 0, 8);
+
+ ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+ ASSERT_EQ("0123", buf);
+ check_buffer(*stream, "01234567", 4, 8);
+ }
+
+ // Buffer hit at exact boundary end: seek to pos 8 (= buf_end_abs=8)
+ // Fast path: pos_ = 8 == count_, buffer unchanged but next Read triggers
refill.
+ {
+ ASSERT_OK(stream->Seek(8, SeekOrigin::FS_SEEK_SET));
+ check_buffer(*stream, "01234567", 8, 8);
+ ASSERT_EQ(8, stream->GetPos().value());
+
+ ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+ ASSERT_EQ("89ab", buf);
+ // pos_==count_ triggered Fill: buffer = "89abcdef", count_=8, pos_=4
+ check_buffer(*stream, "89abcdef", 4, 8);
+ }
+
+ // FS_SEEK_CUR buffer miss: pos=12, seek -12 -> target=0, outside [8..16)
+ {
+ ASSERT_OK(stream->Seek(-12, SeekOrigin::FS_SEEK_CUR));
+ ASSERT_EQ(stream->pos_, 0);
+ ASSERT_EQ(stream->count_, 0);
+
+ ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+ ASSERT_EQ("0123", buf);
+ check_buffer(*stream, "01234567", 4, 8);
+ }
+
+ // FS_SEEK_END buffer hit: fill buffer near file end, then seek within via
FS_SEEK_END.
+ {
+ ASSERT_OK(stream->Seek(12, SeekOrigin::FS_SEEK_SET));
+ // pos 12 is outside current buffer [0..8) -> slow path
+ ASSERT_EQ(stream->pos_, 0);
+ ASSERT_EQ(stream->count_, 0);
+
+ ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+ ASSERT_EQ("cdef", buf);
+ // Fill from 12: buffer = "cdef", count_=4, pos_=4
+ check_buffer(*stream, "cdef", 4, 4);
+
+ // Seek -4 from end -> target = 12, buffer covers [12..16), 12 is
inside.
+ // Fast path: pos_ = 12 - 12 = 0
+ ASSERT_OK(stream->Seek(-4, SeekOrigin::FS_SEEK_END));
+ check_buffer(*stream, "cdef", 0, 4);
+ ASSERT_EQ(12, stream->GetPos().value());
+
+ ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+ ASSERT_EQ("cdef", buf);
+ check_buffer(*stream, "cdef", 4, 4);
+ }
+
+ // Empty buffer (count_==0): fresh stream, seek always takes slow path.
+ {
+ auto in2 = std::make_shared<ByteArrayInputStream>(
+ reinterpret_cast<const char*>(data.data()), data.size());
+ auto fresh = std::make_shared<BufferedInputStream>(in2,
/*buffer_size=*/8, pool.get());
+ ASSERT_EQ(fresh->pos_, 0);
+ ASSERT_EQ(fresh->count_, 0);
+
+ ASSERT_OK(fresh->Seek(10, SeekOrigin::FS_SEEK_SET));
+ ASSERT_EQ(fresh->pos_, 0);
+ ASSERT_EQ(fresh->count_, 0);
+ ASSERT_EQ(10, fresh->GetPos().value());
+
+ ASSERT_EQ(4, fresh->Read(buf.data(), 4).value());
+ ASSERT_EQ("abcd", buf);
+ // Fill from 10: buffer = "abcdef", count_=6, pos_=4
+ check_buffer(*fresh, "abcdef", 4, 6);
+ }
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/io/byte_array_input_stream.cpp
b/src/paimon/common/io/byte_array_input_stream.cpp
new file mode 100644
index 0000000..a88d6b8
--- /dev/null
+++ b/src/paimon/common/io/byte_array_input_stream.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/io/byte_array_input_stream.h"
+
+#include <cassert>
+#include <cstring>
+#include <utility>
+
+#include "fmt/format.h"
+
+namespace paimon {
+ByteArrayInputStream::ByteArrayInputStream(const char* buffer, uint64_t length)
+ : buffer_(buffer), length_(length), position_(0) {
+ assert(buffer_);
+}
+
+const char* ByteArrayInputStream::GetRawData() const {
+ return buffer_ + position_;
+}
+
+Status ByteArrayInputStream::Seek(int64_t offset, SeekOrigin origin) {
+ switch (origin) {
+ case SeekOrigin::FS_SEEK_SET: {
+ position_ = offset;
+ break;
+ }
+ case SeekOrigin::FS_SEEK_CUR: {
+ position_ += offset;
+ break;
+ }
+ case SeekOrigin::FS_SEEK_END: {
+ PAIMON_ASSIGN_OR_RAISE(uint64_t length, Length());
+ position_ = static_cast<int64_t>(length) + offset;
+ break;
+ }
+ default:
+ return Status::Invalid(
+ "invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR,
and FS_SEEK_END");
+ }
+ if (position_ < 0 || position_ > static_cast<int64_t>(length_)) {
+ return Status::Invalid(
+ fmt::format("invalid seek, after seek, current pos {}, length {}",
position_, length_));
+ }
+ return Status::OK();
+}
+
+Result<int32_t> ByteArrayInputStream::Read(char* buffer, uint32_t size) {
+ if (position_ + static_cast<int64_t>(size) >
static_cast<int64_t>(length_)) {
+ return Status::Invalid(
+ fmt::format("ByteArrayInputStream assert boundary failed: need
length {}, current "
+ "position {}, exceed length {}",
+ size, position_, length_));
+ }
+ memcpy(buffer, buffer_ + position_, size);
+ position_ += size;
+ return size;
+}
+
+Result<int32_t> ByteArrayInputStream::Read(char* buffer, uint32_t size,
uint64_t offset) {
+ if (offset + static_cast<uint64_t>(size) > length_) {
+ return Status::Invalid(
+ fmt::format("ByteArrayInputStream assert boundary failed: need
length {}, read offset "
+ "{}, exceed length {}",
+ size, offset, length_));
+ }
+ memcpy(buffer, buffer_ + offset, size);
+ return size;
+}
+
+void ByteArrayInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t
offset,
+ std::function<void(Status)>&& callback) {
+ Result<int32_t> read_size = Read(buffer, size, offset);
+ Status status = Status::OK();
+ if (read_size.ok() && static_cast<uint32_t>(read_size.value()) != size) {
+ status = Status::Invalid(fmt::format(
+ "ByteArrayInputStream async read size {} != expected {}",
read_size.value(), size));
+ } else if (!read_size.ok()) {
+ status = read_size.status();
+ }
+ callback(status);
+}
+
+Status ByteArrayInputStream::Close() {
+ return Status::OK();
+}
+
+Result<std::string> ByteArrayInputStream::GetUri() const {
+ return std::string();
+}
+} // namespace paimon
diff --git a/src/paimon/common/io/byte_array_input_stream_test.cpp
b/src/paimon/common/io/byte_array_input_stream_test.cpp
new file mode 100644
index 0000000..adac48f
--- /dev/null
+++ b/src/paimon/common/io/byte_array_input_stream_test.cpp
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/io/byte_array_input_stream.h"
+
+#include <memory>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(ByteArrayInputStreamTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ auto output_stream =
std::make_unique<MemorySegmentOutputStream>(/*segment_size=*/8, pool);
+ std::string str = "abcdef";
+ auto bytes = std::make_shared<Bytes>(str, pool.get());
+ output_stream->WriteBytes(bytes);
+ auto out_bytes =
MemorySegmentUtils::CopyToBytes(output_stream->Segments(), 0,
+
output_stream->CurrentSize(), pool.get());
+ auto input_stream =
+ std::make_shared<ByteArrayInputStream>(out_bytes->data(),
out_bytes->size());
+ ASSERT_EQ(6, input_stream->Length().value());
+ ASSERT_TRUE(input_stream->GetUri().value().empty());
+
+ // read from pos 1
+ std::string value(4, '\0');
+ ASSERT_EQ(4, input_stream->Read(value.data(), value.size(),
/*offset=*/1).value());
+ ASSERT_EQ("bcde", value);
+ ASSERT_EQ(0, input_stream->GetPos().value());
+
+ // seek to pos 2
+ ASSERT_OK(input_stream->Seek(2, SeekOrigin::FS_SEEK_SET));
+ ASSERT_EQ(4, input_stream->Read(value.data(), value.size()).value());
+ ASSERT_EQ("cdef", value);
+ ASSERT_EQ(6, input_stream->GetPos().value());
+
+ // although seek to pos 2, read set offset 0
+ ASSERT_OK(input_stream->Seek(2, SeekOrigin::FS_SEEK_SET));
+ bool read_finished = false;
+ auto callback = [&](Status status) {
+ ASSERT_OK(status);
+ if (status.ok()) {
+ read_finished = true;
+ }
+ };
+ input_stream->ReadAsync(value.data(), value.size(), /*offset=*/0,
callback);
+ ASSERT_TRUE(read_finished);
+ ASSERT_EQ("abcd", value);
+ ASSERT_EQ(2, input_stream->GetPos().value());
+
+ // test exceed eof, seek to pos 3, want to read 4 bytes
+ ASSERT_OK(input_stream->Seek(-3, SeekOrigin::FS_SEEK_END));
+ ASSERT_NOK_WITH_MSG(
+ input_stream->Read(value.data(), value.size()),
+ "assert boundary failed: need length 4, current position 3, exceed
length 6");
+
+ // test invalid seek
+ ASSERT_NOK_WITH_MSG(input_stream->Seek(100, SeekOrigin::FS_SEEK_CUR),
+ "invalid seek, after seek, current pos 103, length 6");
+ ASSERT_OK(input_stream->Close());
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/io/data_input_output_stream_test.cpp
b/src/paimon/common/io/data_input_output_stream_test.cpp
new file mode 100644
index 0000000..4e60637
--- /dev/null
+++ b/src/paimon/common/io/data_input_output_stream_test.cpp
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/io/data_output_stream.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/io/buffered_input_stream.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+class DataInputOutputStreamTest : public ::testing::Test,
+ public
::testing::WithParamInterface<ByteOrder> {
+ protected:
+ void SetUp() override {
+ pool_ = GetDefaultPool();
+ std::vector<char> bytes_big_endian = {
+ 127, 125, 67, 127, -2, -57, 127, 127, -1, -1, -1, -1,
-1, -1,
+ -3, 1, 0, 39, 84, 104, 105, 115, 32, 105, 115, 32,
97, 32,
+ 118, 101, 114, 121, 32, 118, 101, 114, 121, 32, 118, 101,
114, 121,
+ 32, 108, 111, 110, 103, 32, 115, 101, 110, 116, 101, 110,
99, 101,
+ 46, -26, -120, -111, -26, -104, -81, -28, -72, -128, -28, -72,
-86, -25,
+ -78, -119, -27, -120, -73, -27, -116, -96, -17, -67, -98};
+ serialized_bytes_big_endian_ =
+ std::make_shared<Bytes>(bytes_big_endian.size(), pool_.get());
+ memcpy(serialized_bytes_big_endian_->data(), bytes_big_endian.data(),
+ bytes_big_endian.size());
+
+ std::vector<char> bytes_little_endian = {
+ 127, 67, 125, 127, -57, -2, 127, -3, -1, -1, -1, -1,
-1, -1,
+ 127, 1, 39, 0, 84, 104, 105, 115, 32, 105, 115, 32,
97, 32,
+ 118, 101, 114, 121, 32, 118, 101, 114, 121, 32, 118, 101,
114, 121,
+ 32, 108, 111, 110, 103, 32, 115, 101, 110, 116, 101, 110,
99, 101,
+ 46, -26, -120, -111, -26, -104, -81, -28, -72, -128, -28, -72,
-86, -25,
+ -78, -119, -27, -120, -73, -27, -116, -96, -17, -67, -98};
+ serialized_bytes_little_endian_ =
+ std::make_shared<Bytes>(bytes_little_endian.size(), pool_.get());
+ memcpy(serialized_bytes_little_endian_->data(),
bytes_little_endian.data(),
+ bytes_little_endian.size());
+ }
+
+ template <typename T>
+ void WriteValues(T* data_output_stream) const {
+ (void)data_output_stream->WriteValue(static_cast<char>(127));
// 1 byte
+ (void)data_output_stream->WriteValue(static_cast<int16_t>(32067));
// 2 bytes
+
(void)data_output_stream->WriteValue(static_cast<int32_t>(2147403647));
// 4 bytes
+
(void)data_output_stream->WriteValue(static_cast<int64_t>(9223372036854775805));
// 8 bytes
+ (void)data_output_stream->WriteValue(true);
// 1 byte
+ std::string str1 = "This is a very very very long sentence.";
+ if constexpr (std::is_same_v<T, MemorySegmentOutputStream>) {
+ (void)data_output_stream->WriteString(str1); // 39 bytes + 2
bytes len
+ } else {
+ (void)data_output_stream->WriteString(str1); // 39 bytes + 2
bytes len
+ }
+ std::string str2 = "我是一个粉刷匠~"; // 24 bytes
+ auto bytes = std::make_shared<Bytes>(str2, pool_.get());
+ (void)data_output_stream->WriteBytes(bytes);
+ }
+
+ void CheckResult(const InputStream* input_stream,
+ const DataInputStream* data_input_stream) const {
+ ASSERT_EQ(127, data_input_stream->ReadValue<char>().value());
+ ASSERT_EQ(32067, data_input_stream->ReadValue<int16_t>().value());
+ ASSERT_EQ((int32_t)2147403647,
data_input_stream->ReadValue<int32_t>().value());
+ ASSERT_EQ((int64_t)9223372036854775805,
data_input_stream->ReadValue<int64_t>().value());
+ ASSERT_EQ(true, data_input_stream->ReadValue<bool>().value());
+ std::string str1 = "This is a very very very long sentence.";
+ ASSERT_EQ(str1, data_input_stream->ReadString().value());
+ std::string str2 = "我是一个粉刷匠~"; // 24 bytes
+ auto bytes = std::make_shared<Bytes>(str2, pool_.get());
+ auto read_bytes = std::make_shared<Bytes>(str2.length(), pool_.get());
+ ASSERT_OK(data_input_stream->ReadBytes(read_bytes.get()));
+ ASSERT_EQ(*bytes, *read_bytes);
+ // test GetPos
+ ASSERT_OK_AND_ASSIGN(int64_t in_pos, input_stream->GetPos());
+ ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, in_pos);
+ // read eof, return bad status
+ ASSERT_NOK(data_input_stream->ReadString());
+ // test seek
+ ASSERT_OK(data_input_stream->Seek(3));
+ ASSERT_EQ((int32_t)2147403647,
data_input_stream->ReadValue<int32_t>().value());
+ }
+ void TearDown() override {}
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+ std::shared_ptr<Bytes> serialized_bytes_big_endian_;
+ std::shared_ptr<Bytes> serialized_bytes_little_endian_;
+};
+INSTANTIATE_TEST_SUITE_P(ByteOrder, DataInputOutputStreamTest,
+ ::testing::Values(ByteOrder::PAIMON_BIG_ENDIAN,
+ ByteOrder::PAIMON_LITTLE_ENDIAN));
+
+TEST_P(DataInputOutputStreamTest, TestFileStream) {
+ ByteOrder byte_order = GetParam();
+ auto file_system = std::make_unique<LocalFileSystem>();
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir);
+ std::string test_file = dir->Str() + "/test";
+ // prepare output stream
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> output_stream,
+ file_system->Create(test_file, /*overwrite=*/true));
+ auto data_output_stream =
std::make_unique<DataOutputStream>(output_stream);
+ data_output_stream->SetOrder(byte_order);
+ WriteValues(data_output_stream.get());
+ ASSERT_OK_AND_ASSIGN(int64_t out_pos, output_stream->GetPos());
+ ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, out_pos);
+ ASSERT_OK(output_stream->Close());
+ // check file content
+ ASSERT_OK_AND_ASSIGN(bool exist, file_system->Exists(test_file));
+ ASSERT_TRUE(exist);
+ std::string out_str;
+ ASSERT_OK(file_system->ReadFile(test_file, &out_str));
+ auto out_bytes = std::make_shared<Bytes>(out_str, pool_.get());
+ // print_hex(out_str);
+ if (byte_order == ByteOrder::PAIMON_BIG_ENDIAN) {
+ ASSERT_EQ(*serialized_bytes_big_endian_, *out_bytes);
+ } else {
+ ASSERT_EQ(*serialized_bytes_little_endian_, *out_bytes);
+ }
+
+ // prepare input stream
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream,
file_system->Open(test_file));
+ auto data_input_stream = std::make_unique<DataInputStream>(input_stream);
+ data_input_stream->SetOrder(byte_order);
+ CheckResult(input_stream.get(), data_input_stream.get());
+}
+
+TEST_P(DataInputOutputStreamTest, TestInMemoryByteArrayStream) {
+ ByteOrder byte_order = GetParam();
+ // prepare output stream
+ auto data_output_stream =
+ std::make_unique<MemorySegmentOutputStream>(/*segment_size=*/8, pool_);
+ data_output_stream->SetOrder(byte_order);
+ WriteValues(data_output_stream.get());
+ ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, data_output_stream->CurrentSize());
+ auto out_bytes = MemorySegmentUtils::CopyToBytes(
+ data_output_stream->Segments(), 0, data_output_stream->CurrentSize(),
pool_.get());
+ if (byte_order == ByteOrder::PAIMON_BIG_ENDIAN) {
+ ASSERT_EQ(*serialized_bytes_big_endian_, *out_bytes);
+ } else {
+ ASSERT_EQ(*serialized_bytes_little_endian_, *out_bytes);
+ }
+ // prepare input stream
+ auto input_stream =
+ std::make_shared<ByteArrayInputStream>(out_bytes->data(),
out_bytes->size());
+ auto data_input_stream = std::make_unique<DataInputStream>(input_stream);
+ data_input_stream->SetOrder(byte_order);
+ CheckResult(input_stream.get(), data_input_stream.get());
+}
+
+TEST_P(DataInputOutputStreamTest, TestBufferedStream) {
+ ByteOrder byte_order = GetParam();
+ // prepare output stream
+ auto data_output_stream =
+ std::make_unique<MemorySegmentOutputStream>(/*segment_size=*/8, pool_);
+ data_output_stream->SetOrder(byte_order);
+ WriteValues(data_output_stream.get());
+ ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, data_output_stream->CurrentSize());
+ auto out_bytes = MemorySegmentUtils::CopyToBytes(
+ data_output_stream->Segments(), 0, data_output_stream->CurrentSize(),
pool_.get());
+ if (byte_order == ByteOrder::PAIMON_BIG_ENDIAN) {
+ ASSERT_EQ(*serialized_bytes_big_endian_, *out_bytes);
+ } else {
+ ASSERT_EQ(*serialized_bytes_little_endian_, *out_bytes);
+ }
+ // prepare input stream
+ auto input_stream =
+ std::make_unique<ByteArrayInputStream>(out_bytes->data(),
out_bytes->size());
+ auto buffered_input_stream = std::make_shared<BufferedInputStream>(
+ std::move(input_stream), /*buffer_size=*/4, pool_.get());
+ auto data_input_stream =
std::make_unique<DataInputStream>(buffered_input_stream);
+ data_input_stream->SetOrder(byte_order);
+ CheckResult(buffered_input_stream.get(), data_input_stream.get());
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/io/data_input_stream.cpp
b/src/paimon/common/io/data_input_stream.cpp
new file mode 100644
index 0000000..63f8338
--- /dev/null
+++ b/src/paimon/common/io/data_input_stream.cpp
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/io/data_input_stream.h"
+
+#include <cassert>
+#include <type_traits>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/math.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+
+DataInputStream::DataInputStream(const std::shared_ptr<InputStream>&
input_stream)
+ : input_stream_(input_stream) {
+ assert(input_stream_);
+}
+
+Status DataInputStream::Seek(int64_t offset) const {
+ return input_stream_->Seek(offset, SeekOrigin::FS_SEEK_SET);
+}
+
+template <typename T>
+Result<T> DataInputStream::ReadValue() const {
+ static_assert(std::is_trivially_copyable_v<T>, "T must be trivially
copyable");
+ int32_t read_length = sizeof(T);
+ PAIMON_RETURN_NOT_OK(AssertBoundary(read_length));
+ T value;
+ PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length,
+
input_stream_->Read(reinterpret_cast<char*>(&value), read_length));
+ PAIMON_RETURN_NOT_OK(AssertReadLength(read_length, actual_read_length));
+ if (NeedSwap()) {
+ value = EndianSwapValue(value);
+ }
+ return value;
+}
+
+Status DataInputStream::ReadBytes(Bytes* bytes) const {
+ int32_t read_length = bytes->size();
+ PAIMON_RETURN_NOT_OK(AssertBoundary(read_length));
+ PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length,
+ input_stream_->Read(bytes->data(), read_length));
+ PAIMON_RETURN_NOT_OK(AssertReadLength(read_length, actual_read_length));
+ return Status::OK();
+}
+
+Status DataInputStream::Read(char* data, uint32_t size) const {
+ PAIMON_RETURN_NOT_OK(AssertBoundary(size));
+ PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length,
input_stream_->Read(data, size));
+ PAIMON_RETURN_NOT_OK(AssertReadLength(size, actual_read_length));
+ return Status::OK();
+}
+
+Result<std::string> DataInputStream::ReadString() const {
+ uint16_t read_length = 0;
+ PAIMON_ASSIGN_OR_RAISE(read_length, ReadValue<uint16_t>());
+ PAIMON_RETURN_NOT_OK(AssertBoundary(read_length));
+ std::string value(read_length, '\0');
+ PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length,
+ input_stream_->Read(value.data(), read_length));
+ PAIMON_RETURN_NOT_OK(AssertReadLength(read_length, actual_read_length));
+ return value;
+}
+
+Result<int64_t> DataInputStream::GetPos() const {
+ return input_stream_->GetPos();
+}
+
+Result<uint64_t> DataInputStream::Length() const {
+ return input_stream_->Length();
+}
+
+Status DataInputStream::AssertReadLength(int32_t read_length, int32_t
actual_read_length) const {
+ if (read_length != actual_read_length) {
+ return Status::Invalid(
+ fmt::format("assert read length failed: read length not match,
read length {}, actual "
+ "read length {}",
+ read_length, actual_read_length));
+ }
+ return Status::OK();
+}
+
+Status DataInputStream::AssertBoundary(int32_t need_length) const {
+ // TODO(jinli.zjw): Store current_pos and file_length as member variables
to reduce the overhead
+ // of I/O calls.
+ PAIMON_ASSIGN_OR_RAISE(int64_t pos, input_stream_->GetPos());
+ PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
+ if (pos + need_length > static_cast<int64_t>(length)) {
+ return Status::Invalid(
+ fmt::format("DataInputStream assert boundary failed: need length
{}, current position "
+ "{}, exceed length {}",
+ need_length, pos, length));
+ }
+ return Status::OK();
+}
+
+bool DataInputStream::NeedSwap() const {
+ return SystemByteOrder() != byte_order_;
+}
+
+template Result<bool> DataInputStream::ReadValue() const;
+template Result<char> DataInputStream::ReadValue() const;
+template Result<int8_t> DataInputStream::ReadValue() const;
+template Result<int16_t> DataInputStream::ReadValue() const;
+template Result<uint16_t> DataInputStream::ReadValue() const;
+template Result<int32_t> DataInputStream::ReadValue() const;
+template Result<int64_t> DataInputStream::ReadValue() const;
+template Result<float> DataInputStream::ReadValue() const;
+template Result<double> DataInputStream::ReadValue() const;
+} // namespace paimon
diff --git a/src/paimon/common/io/data_output_stream.cpp
b/src/paimon/common/io/data_output_stream.cpp
new file mode 100644
index 0000000..14d7463
--- /dev/null
+++ b/src/paimon/common/io/data_output_stream.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/io/data_output_stream.h"
+
+#include "fmt/format.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+
+namespace paimon {
+DataOutputStream::DataOutputStream(const std::shared_ptr<OutputStream>&
output_stream)
+ : output_stream_(output_stream) {
+ assert(output_stream_);
+}
+
+Status DataOutputStream::WriteBytes(const std::shared_ptr<Bytes>& bytes) {
+ int32_t write_length = bytes->size();
+ PAIMON_ASSIGN_OR_RAISE(int32_t actual_write_length,
+ output_stream_->Write(bytes->data(), write_length));
+ PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length));
+ return Status::OK();
+}
+
+Status DataOutputStream::WriteString(const std::string& value) {
+ uint16_t write_length = value.size();
+ PAIMON_RETURN_NOT_OK(WriteValue<uint16_t>(write_length));
+ PAIMON_ASSIGN_OR_RAISE(int32_t actual_write_length,
+ output_stream_->Write(value.data(), write_length));
+ PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length));
+ return Status::OK();
+}
+
+Status DataOutputStream::AssertWriteLength(int32_t write_length,
+ int32_t actual_write_length) const {
+ if (write_length != actual_write_length) {
+ return Status::Invalid(fmt::format(
+ "assert write length failed: write length not match, write length
{}, actual "
+ "write length {}",
+ write_length, actual_write_length));
+ }
+ return Status::OK();
+}
+
+bool DataOutputStream::NeedSwap() const {
+ return SystemByteOrder() != byte_order_;
+}
+
+template Status DataOutputStream::WriteValue(const bool&);
+template Status DataOutputStream::WriteValue(const char&);
+template Status DataOutputStream::WriteValue(const int16_t&);
+template Status DataOutputStream::WriteValue(const uint16_t&);
+template Status DataOutputStream::WriteValue(const int32_t&);
+template Status DataOutputStream::WriteValue(const int64_t&);
+} // namespace paimon
diff --git a/src/paimon/common/io/data_output_stream.h
b/src/paimon/common/io/data_output_stream.h
new file mode 100644
index 0000000..ace5976
--- /dev/null
+++ b/src/paimon/common/io/data_output_stream.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <type_traits>
+
+#include "paimon/common/utils/math.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Bytes;
+class OutputStream;
+
+// data output stream, support WriteValue() and WriteString() from
OutputStream, also do big-endian
+// conversion to ensure cross-language compatibility
+class PAIMON_EXPORT DataOutputStream {
+ public:
+ explicit DataOutputStream(const std::shared_ptr<OutputStream>&
output_stream);
+
+ template <typename T>
+ Status WriteValue(const T& value) {
+ static_assert(std::is_trivially_copyable_v<T>, "T must be trivially
copyable");
+ T write_value = value;
+ if (NeedSwap()) {
+ write_value = EndianSwapValue(value);
+ }
+ int32_t write_length = sizeof(T);
+ PAIMON_ASSIGN_OR_RAISE(
+ int32_t actual_write_length,
+ output_stream_->Write(reinterpret_cast<char*>(&write_value),
write_length));
+ PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length,
actual_write_length));
+ return Status::OK();
+ }
+
+ Status WriteBytes(const std::shared_ptr<Bytes>& bytes);
+
+ /// First write length (int16), then write string bytes.
+ Status WriteString(const std::string& value);
+
+ void SetOrder(ByteOrder order) {
+ byte_order_ = order;
+ }
+
+ private:
+ Status AssertWriteLength(int32_t write_length, int32_t
actual_write_length) const;
+
+ bool NeedSwap() const;
+
+ private:
+ std::shared_ptr<OutputStream> output_stream_;
+
+ ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/io/memory_segment_output_stream.cpp
b/src/paimon/common/io/memory_segment_output_stream.cpp
new file mode 100644
index 0000000..498b5ab
--- /dev/null
+++ b/src/paimon/common/io/memory_segment_output_stream.cpp
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/io/memory_segment_output_stream.h"
+
+#include <algorithm>
+
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+class MemoryPool;
+
+MemorySegmentOutputStream::MemorySegmentOutputStream(int32_t segment_size,
+ const
std::shared_ptr<MemoryPool>& pool)
+ : segment_size_(segment_size), pool_(pool) {
+ Advance();
+}
+
+void MemorySegmentOutputStream::Advance() {
+ current_segment_ = NextSegment();
+ position_in_segment_ = 0;
+}
+
+MemorySegment MemorySegmentOutputStream::NextSegment() {
+ MemorySegment memory_segment =
MemorySegment::AllocateHeapMemory(segment_size_, pool_.get());
+ memory_segments_.push_back(memory_segment);
+ return memory_segment;
+}
+
+void MemorySegmentOutputStream::WriteBytes(const std::shared_ptr<Bytes>&
bytes) {
+ auto segment = MemorySegment::Wrap(bytes);
+ Write(segment, 0, segment.Size());
+}
+
+void MemorySegmentOutputStream::WriteString(const std::string& str) {
+ WriteValue<int16_t>(str.length());
+ Write(str.data(), str.size());
+}
+
+void MemorySegmentOutputStream::Write(const char* data, uint32_t size) {
+ auto bytes = std::make_shared<Bytes>(size, pool_.get());
+ memcpy(bytes->data(), data, size);
+ auto segment = MemorySegment::Wrap(bytes);
+ Write(segment, 0, segment.Size());
+}
+
+void MemorySegmentOutputStream::Write(const MemorySegment& segment, int32_t
offset, int32_t len) {
+ int32_t remaining = segment_size_ - position_in_segment_;
+ if (remaining >= len) {
+ segment.CopyTo(offset, ¤t_segment_, position_in_segment_, len);
+ position_in_segment_ += len;
+ } else {
+ if (remaining == 0) {
+ Advance();
+ remaining = segment_size_ - position_in_segment_;
+ }
+ while (true) {
+ int32_t to_put = std::min(remaining, len);
+ segment.CopyTo(offset, ¤t_segment_, position_in_segment_,
to_put);
+ offset += to_put;
+ len -= to_put;
+
+ if (len > 0) {
+ position_in_segment_ = segment_size_;
+ Advance();
+ remaining = segment_size_ - position_in_segment_;
+ } else {
+ position_in_segment_ += to_put;
+ break;
+ }
+ }
+ }
+}
+
+int64_t MemorySegmentOutputStream::CurrentSize() const {
+ return segment_size_ * (memory_segments_.size() - 1) +
CurrentPositionInSegment();
+}
+
+bool MemorySegmentOutputStream::NeedSwap() const {
+ return SystemByteOrder() != byte_order_;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/io/memory_segment_output_stream.h
b/src/paimon/common/io/memory_segment_output_stream.h
new file mode 100644
index 0000000..dbe3816
--- /dev/null
+++ b/src/paimon/common/io/memory_segment_output_stream.h
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/common/utils/math.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/type_fwd.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Bytes;
+class MemoryPool;
+
+// TODO(xinyu.lxy): use DataOutputStream to do big-endian conversion
+class PAIMON_EXPORT MemorySegmentOutputStream {
+ public:
+ MemorySegmentOutputStream(int32_t segment_size, const
std::shared_ptr<MemoryPool>& pool);
+
+ static constexpr int32_t DEFAULT_SEGMENT_SIZE = 64 * 1024;
+
+ void Write(const MemorySegment& segment, int32_t offset, int32_t len);
+
+ template <typename T>
+ void WriteValue(T v);
+
+ /// First write length (int16), then write string bytes.
+ void WriteString(const std::string& str);
+
+ void WriteBytes(const std::shared_ptr<Bytes>& bytes);
+
+ void Write(const char* data, uint32_t size);
+
+ int32_t CurrentPositionInSegment() const {
+ return position_in_segment_;
+ }
+
+ int64_t CurrentSize() const;
+
+ const std::vector<MemorySegment>& Segments() const {
+ return memory_segments_;
+ }
+
+ void SetOrder(ByteOrder order) {
+ byte_order_ = order;
+ }
+
+ private:
+ template <typename T>
+ void WriteValueImpl(T v);
+
+ void Advance();
+ MemorySegment NextSegment();
+
+ bool NeedSwap() const;
+
+ private:
+ int32_t segment_size_;
+ int32_t position_in_segment_;
+ std::shared_ptr<MemoryPool> pool_;
+ MemorySegment current_segment_;
+ std::vector<MemorySegment> memory_segments_;
+
+ ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN;
+};
+
+template <typename T>
+void MemorySegmentOutputStream::WriteValueImpl(T v) {
+ if (position_in_segment_ <= segment_size_ -
static_cast<int32_t>(sizeof(T))) {
+ current_segment_.PutValue<T>(position_in_segment_, v);
+ position_in_segment_ += sizeof(T);
+ } else if (position_in_segment_ == segment_size_) {
+ Advance();
+ WriteValueImpl<T>(v);
+ } else {
+ for (size_t i = 0; i < sizeof(T); i++) {
+ // because of endian swap, just copy the bytes of input v
+ uint8_t onebyte = 0;
+ memcpy(&onebyte, (reinterpret_cast<uint8_t*>(&v)) + i,
sizeof(onebyte));
+ WriteValueImpl<uint8_t>(onebyte);
+ }
+ }
+}
+
+template <typename T>
+void MemorySegmentOutputStream::WriteValue(T v) {
+ static_assert(std::is_trivially_copyable_v<T>, "T must be trivially
copyable");
+ if (NeedSwap()) {
+ v = EndianSwapValue(v);
+ }
+ return WriteValueImpl(v);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/io/memory_segment_output_stream_test.cpp
b/src/paimon/common/io/memory_segment_output_stream_test.cpp
new file mode 100644
index 0000000..61fbfe3
--- /dev/null
+++ b/src/paimon/common/io/memory_segment_output_stream_test.cpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/io/memory_segment_output_stream.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+
+class MemorySegmentOutputStreamTest : public ::testing::Test,
+ public
::testing::WithParamInterface<ByteOrder> {};
+
+INSTANTIATE_TEST_SUITE_P(ByteOrder, MemorySegmentOutputStreamTest,
+ ::testing::Values(ByteOrder::PAIMON_BIG_ENDIAN,
+ ByteOrder::PAIMON_LITTLE_ENDIAN));
+
+TEST_P(MemorySegmentOutputStreamTest, TestSimple) {
+ ByteOrder byte_order = GetParam();
+ auto pool = GetDefaultPool();
+ MemorySegmentOutputStream out(/*segment_size=*/8, pool);
+ out.SetOrder(byte_order);
+ out.WriteValue(static_cast<char>(127)); // 1 byte
+ out.WriteValue(static_cast<int16_t>(32067)); // 2 bytes
+ out.WriteValue(static_cast<int32_t>(2147403647)); // 4 bytes
+ // move to next segment
+ out.WriteValue(static_cast<int64_t>(9223372036854775805)); // 8 bytes
+ out.WriteValue(true); // 1 byte
+ std::string str1 = "This is a very very very long sentence.";
+ out.WriteString(str1); // 39 bytes + 2 bytes len
+ ASSERT_EQ(out.CurrentSize(), 1 + 1 + 2 + 4 + 8 + 41);
+ std::string str2 = "yes";
+ out.WriteString(str2); // 3 bytes + 2 bytes len
+ std::string str3 = "I have a dream.";
+ out.WriteString(str3); // 15 bytes + 2 bytes len
+ std::string str4 = "hello";
+ out.WriteValue<int32_t>(str4.size()); // 4 bytes
+ out.Write(str4.data(), str4.size()); // 5 bytes
+
+ ASSERT_EQ(out.CurrentSize(), 1 + 1 + 2 + 4 + 8 + 41 + 5 + 17 + 4 + 5);
+
+ auto bytes = MemorySegmentUtils::CopyToBytes(out.Segments(), 0,
out.CurrentSize(), pool.get());
+
+ auto input_stream = std::make_shared<ByteArrayInputStream>(bytes->data(),
bytes->size());
+ DataInputStream in(input_stream);
+ in.SetOrder(byte_order);
+ ASSERT_EQ(127, in.ReadValue<char>().value());
+ ASSERT_EQ(32067, in.ReadValue<int16_t>().value());
+ ASSERT_EQ(2147403647, in.ReadValue<int32_t>().value());
+ ASSERT_EQ(9223372036854775805l, in.ReadValue<int64_t>().value());
+ ASSERT_EQ(true, in.ReadValue<bool>().value());
+ ASSERT_EQ(str1, in.ReadString().value());
+ ASSERT_EQ(str2, in.ReadString().value());
+ ASSERT_EQ(str3, in.ReadString().value());
+ ASSERT_EQ(str4.size(), in.ReadValue<int32_t>().value());
+ std::string read_str4(str4.size(), '\0');
+ ASSERT_OK(in.Read(read_str4.data(), str4.size()));
+ ASSERT_EQ(read_str4, str4);
+ ASSERT_EQ(out.CurrentSize(), input_stream->GetPos().value());
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/io/offset_input_stream.cpp
b/src/paimon/common/io/offset_input_stream.cpp
new file mode 100644
index 0000000..69593cd
--- /dev/null
+++ b/src/paimon/common/io/offset_input_stream.cpp
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/io/offset_input_stream.h"
+
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/macros.h"
+
+namespace paimon {
+Result<std::unique_ptr<OffsetInputStream>> OffsetInputStream::Create(
+ const std::shared_ptr<InputStream>& wrapped, int64_t length, int64_t
offset) {
+ if (PAIMON_UNLIKELY(wrapped == nullptr)) {
+ return Status::Invalid("input stream is null pointer");
+ }
+ if (PAIMON_UNLIKELY(offset < 0)) {
+ return Status::Invalid(fmt::format("offset {} is less than 0",
offset));
+ }
+ if (PAIMON_UNLIKELY(length < -1)) {
+ return Status::Invalid(fmt::format("length {} is less than -1",
length));
+ }
+ PAIMON_ASSIGN_OR_RAISE(uint64_t total_length, wrapped->Length());
+ if (PAIMON_UNLIKELY((uint64_t)offset > total_length)) {
+ return Status::Invalid(
+ fmt::format("offset {} exceed total length {}", offset,
total_length));
+ }
+ if (length == -1) {
+ // length == -1 means it's dynamic length, should read to the end
+ length = total_length - offset;
+ }
+ if (PAIMON_UNLIKELY((uint64_t)offset + (uint64_t)length > total_length)) {
+ return Status::Invalid(fmt::format("offset {} + length {} exceed total
length {}", offset,
+ length, total_length));
+ }
+ PAIMON_RETURN_NOT_OK(wrapped->Seek(offset, SeekOrigin::FS_SEEK_SET));
+ return std::unique_ptr<OffsetInputStream>(
+ new OffsetInputStream(std::move(wrapped), length, offset));
+}
+
+OffsetInputStream::OffsetInputStream(const std::shared_ptr<InputStream>&
wrapped, int64_t length,
+ int64_t offset)
+ : wrapped_(wrapped), length_(length), offset_(offset) {}
+
+Status OffsetInputStream::Seek(int64_t offset, SeekOrigin origin) {
+ switch (origin) {
+ case SeekOrigin::FS_SEEK_SET: {
+ inner_position_ = offset;
+ PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_));
+ return wrapped_->Seek(offset_ + inner_position_,
SeekOrigin::FS_SEEK_SET);
+ }
+ case SeekOrigin::FS_SEEK_CUR: {
+ inner_position_ += offset;
+ PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_));
+ return wrapped_->Seek(offset, SeekOrigin::FS_SEEK_CUR);
+ }
+ case SeekOrigin::FS_SEEK_END: {
+ inner_position_ = length_ + offset;
+ PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_));
+ return wrapped_->Seek(offset_ + inner_position_,
SeekOrigin::FS_SEEK_SET);
+ }
+ default:
+ return Status::Invalid(
+ "invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR,
and FS_SEEK_END");
+ }
+ return Status::OK();
+}
+
+Result<int32_t> OffsetInputStream::Read(char* buffer, uint32_t size) {
+ PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_ + size));
+ inner_position_ += size;
+ return wrapped_->Read(buffer, size);
+}
+
+Result<int32_t> OffsetInputStream::Read(char* buffer, uint32_t size, uint64_t
offset) {
+ PAIMON_RETURN_NOT_OK(AssertBoundary(offset));
+ PAIMON_RETURN_NOT_OK(AssertBoundary(offset + size));
+ return wrapped_->Read(buffer, size, offset_ + offset);
+}
+
+void OffsetInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+ std::function<void(Status)>&& callback) {
+ auto status = AssertBoundary(offset);
+ if (!status.ok()) {
+ callback(status);
+ return;
+ }
+ status = AssertBoundary(offset + size);
+ if (!status.ok()) {
+ callback(status);
+ return;
+ }
+ wrapped_->ReadAsync(buffer, size, offset_ + offset, std::move(callback));
+}
+
+Status OffsetInputStream::Close() {
+ return wrapped_->Close();
+}
+
+Result<std::string> OffsetInputStream::GetUri() const {
+ return wrapped_->GetUri();
+}
+
+Result<int64_t> OffsetInputStream::GetPos() const {
+ return inner_position_;
+}
+
+Result<uint64_t> OffsetInputStream::Length() const {
+ return length_;
+}
+
+Status OffsetInputStream::AssertBoundary(int32_t inner_pos) const {
+ if (inner_pos < 0 || inner_pos > length_) {
+ return Status::Invalid(
+ fmt::format("OffsetInputStream assert boundary failed: inner pos
{} exceed length {}",
+ inner_pos, length_));
+ }
+ return Status::OK();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/io/offset_input_stream.h
b/src/paimon/common/io/offset_input_stream.h
new file mode 100644
index 0000000..38f97ef
--- /dev/null
+++ b/src/paimon/common/io/offset_input_stream.h
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// A `InputStream` wrapping another `InputStream` with offset and length.
+class PAIMON_EXPORT OffsetInputStream : public InputStream {
+ public:
+ static Result<std::unique_ptr<OffsetInputStream>> Create(
+ const std::shared_ptr<InputStream>& wrapped, int64_t length, int64_t
offset);
+ ~OffsetInputStream() override = default;
+
+ Status Seek(int64_t offset, SeekOrigin origin) override;
+
+ Result<int64_t> GetPos() const override;
+
+ Result<int32_t> Read(char* buffer, uint32_t size) override;
+
+ Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset)
override;
+
+ void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+ std::function<void(Status)>&& callback) override;
+
+ Result<uint64_t> Length() const override;
+
+ Status Close() override;
+
+ Result<std::string> GetUri() const override;
+
+ private:
+ OffsetInputStream(const std::shared_ptr<InputStream>& wrapped, int64_t
length, int64_t offset);
+ Status AssertBoundary(int32_t inner_pos) const;
+
+ private:
+ std::shared_ptr<InputStream> wrapped_;
+ const int64_t length_;
+ const int64_t offset_;
+ int64_t inner_position_ = 0;
+};
+} // namespace paimon
diff --git a/src/paimon/common/io/offset_input_stream_test.cpp
b/src/paimon/common/io/offset_input_stream_test.cpp
new file mode 100644
index 0000000..3734650
--- /dev/null
+++ b/src/paimon/common/io/offset_input_stream_test.cpp
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/io/offset_input_stream.h"
+
+#include <memory>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(OffsetInputStreamTest, TestBasicConstruction) {
+ auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij",
/*length=*/10);
+ ASSERT_OK_AND_ASSIGN(auto offset_stream,
OffsetInputStream::Create(std::move(inner_stream),
+
/*length=*/6, /*offset=*/2));
+
+ ASSERT_OK_AND_ASSIGN(auto length, offset_stream->Length());
+ ASSERT_EQ(6, length);
+
+ ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos());
+ ASSERT_EQ(0, pos);
+
+ ASSERT_OK_AND_ASSIGN(auto uri, offset_stream->GetUri());
+ ASSERT_EQ("", uri);
+}
+
+TEST(OffsetInputStreamTest, TestSeekOperations) {
+ auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij",
/*length=*/10);
+ ASSERT_OK_AND_ASSIGN(auto offset_stream,
OffsetInputStream::Create(std::move(inner_stream),
+
/*length=*/6, /*offset=*/2));
+
+ // Test FS_SEEK_SET
+ ASSERT_OK(offset_stream->Seek(3, SeekOrigin::FS_SEEK_SET));
+ ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos());
+ ASSERT_EQ(3, pos);
+
+ // Test FS_SEEK_CUR
+ ASSERT_OK(offset_stream->Seek(1, SeekOrigin::FS_SEEK_CUR));
+ ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos());
+ ASSERT_EQ(4, pos);
+
+ // Test FS_SEEK_END
+ ASSERT_OK(offset_stream->Seek(-2, SeekOrigin::FS_SEEK_END));
+ ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos());
+ ASSERT_EQ(4, pos);
+
+ // Test boundary conditions
+ ASSERT_NOK(offset_stream->Seek(-10, SeekOrigin::FS_SEEK_SET));
+ ASSERT_NOK(offset_stream->Seek(10, SeekOrigin::FS_SEEK_SET));
+ ASSERT_NOK(offset_stream->Seek(10, SeekOrigin::FS_SEEK_CUR));
+ ASSERT_NOK(offset_stream->Seek(10, SeekOrigin::FS_SEEK_END));
+}
+
+TEST(OffsetInputStreamTest, TestReadOperations) {
+ auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij",
/*length=*/10);
+ ASSERT_OK_AND_ASSIGN(auto offset_stream,
OffsetInputStream::Create(std::move(inner_stream),
+
/*length=*/6, /*offset=*/2));
+
+ // Test sequential read
+ std::string buffer(4, '\0');
+ ASSERT_OK_AND_ASSIGN(auto bytes_read, offset_stream->Read(buffer.data(),
/*size=*/4));
+ ASSERT_EQ(4, bytes_read);
+ ASSERT_EQ("cdef", buffer);
+
+ ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos());
+ ASSERT_EQ(4, pos);
+
+ // Test read with offset
+ std::string buffer2(2, '\0');
+ ASSERT_OK_AND_ASSIGN(bytes_read, offset_stream->Read(buffer2.data(),
/*size=*/2, /*offset=*/1));
+ ASSERT_EQ(2, bytes_read);
+ ASSERT_EQ("de", buffer2);
+
+ // Position should not change after offset read
+ ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos());
+ ASSERT_EQ(4, pos);
+
+ // Test close
+ ASSERT_OK(offset_stream->Close());
+}
+
+TEST(OffsetInputStreamTest, TestReadAsync) {
+ auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij",
/*length=*/10);
+ ASSERT_OK_AND_ASSIGN(auto offset_stream,
OffsetInputStream::Create(std::move(inner_stream),
+
/*length=*/6, /*offset=*/2));
+
+ std::string buffer1(3, '\0'), buffer2(2, '\0');
+ bool callback_called1 = false, callback_called2 = false;
+ Status callback_status1, callback_status2;
+
+ auto callback1 = [&](Status status) {
+ callback_called1 = true;
+ callback_status1 = status;
+ };
+ auto callback2 = [&](Status status) {
+ callback_called2 = true;
+ callback_status2 = status;
+ };
+
+ offset_stream->ReadAsync(buffer1.data(), 3, 1, std::move(callback1));
+ offset_stream->ReadAsync(buffer2.data(), 2, 3, std::move(callback2));
+
+ ASSERT_TRUE(callback_called1);
+ ASSERT_OK(callback_status1);
+ ASSERT_EQ("def", buffer1);
+ ASSERT_TRUE(callback_called2);
+ ASSERT_OK(callback_status2);
+ ASSERT_EQ("fg", buffer2);
+
+ // Position should not change after offset read
+ ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos());
+ ASSERT_EQ(0, pos);
+}
+
+TEST(OffsetInputStreamTest, TestBoundaryValidation) {
+ auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij",
/*length=*/10);
+ ASSERT_OK_AND_ASSIGN(auto offset_stream,
OffsetInputStream::Create(std::move(inner_stream),
+
/*length=*/6, /*offset=*/2));
+
+ // Test read beyond boundary
+ std::string buffer(10, '\0');
+ ASSERT_NOK_WITH_MSG(offset_stream->Read(buffer.data(), /*size=*/10),
+ "assert boundary failed: inner pos 10 exceed length
6");
+
+ // Test offset read beyond boundary
+ ASSERT_NOK_WITH_MSG(offset_stream->Read(buffer.data(), /*size=*/4,
/*offset=*/5),
+ "assert boundary failed: inner pos 9 exceed length 6");
+}
+
+TEST(OffsetInputStreamTest, TestReadWithUnspecifiedLength) {
+ auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij",
/*length=*/10);
+ // Use -1 for length to test dynamic length calculation
+ ASSERT_OK_AND_ASSIGN(
+ auto offset_stream,
+ OffsetInputStream::Create(std::move(inner_stream), /*length=*/-1,
/*offset=*/2));
+
+ // Test that length is calculated correctly
+ ASSERT_OK_AND_ASSIGN(auto length, offset_stream->Length());
+ // Should be total length (10) minus offset (2) = 8
+ ASSERT_EQ(8, length);
+
+ // Test sequential read within the calculated bounds
+ std::string buffer(4, '\0');
+ ASSERT_OK_AND_ASSIGN(auto bytes_read, offset_stream->Read(buffer.data(),
/*size=*/4));
+ ASSERT_EQ(4, bytes_read);
+ ASSERT_EQ("cdef", buffer);
+
+ ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos());
+ ASSERT_EQ(4, pos);
+
+ // Test read with offset within the calculated bounds
+ std::string buffer2(3, '\0');
+ ASSERT_OK_AND_ASSIGN(bytes_read, offset_stream->Read(buffer2.data(),
/*size=*/3, /*offset=*/5));
+ ASSERT_EQ(3, bytes_read);
+ ASSERT_EQ("hij", buffer2);
+
+ // Position should not change after offset read
+ ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos());
+ ASSERT_EQ(4, pos);
+
+ // Test boundary validation with dynamic length
+ std::string buffer3(10, '\0');
+ ASSERT_NOK_WITH_MSG(offset_stream->Read(buffer3.data(), /*size=*/10),
+ "assert boundary failed: inner pos 14 exceed length
8");
+}
+
+TEST(OffsetInputStreamTest, TestInvalidParameters) {
+ // Test null wrapped stream
+ ASSERT_NOK_WITH_MSG(OffsetInputStream::Create(nullptr, /*length=*/6,
/*offset=*/2),
+ "input stream is null pointer");
+
+ // Test negative offset
+ auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij",
/*length=*/10);
+ ASSERT_NOK_WITH_MSG(
+ OffsetInputStream::Create(std::move(inner_stream), /*length=*/6,
/*offset=*/-1),
+ "offset -1 is less than 0");
+
+ // Test length less than -1
+ inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij",
/*length=*/10);
+ ASSERT_NOK_WITH_MSG(
+ OffsetInputStream::Create(std::move(inner_stream), /*length=*/-2,
/*offset=*/2),
+ "length -2 is less than -1");
+
+ // Test length + offset beyond wrapped stream length
+ inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij",
/*length=*/10);
+ ASSERT_NOK_WITH_MSG(
+ OffsetInputStream::Create(std::move(inner_stream), /*length=*/8,
/*offset=*/7),
+ "offset 7 + length 8 exceed total length 10");
+
+ // Test dynamic length with offset beyond wrapped stream length
+ inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij",
/*length=*/10);
+ ASSERT_NOK_WITH_MSG(
+ OffsetInputStream::Create(std::move(inner_stream), /*length=*/-1,
/*offset=*/15),
+ "offset 15 exceed total length 10");
+}
+
+} // namespace paimon::test