This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a175073  feat: add IO stream infrastructure (#39)
a175073 is described below

commit a175073643108001ab8b0984437c92a002a70541
Author: lszskye <[email protected]>
AuthorDate: Thu Jun 4 11:17:22 2026 +0800

    feat: add IO stream infrastructure (#39)
---
 include/paimon/io/buffered_input_stream.h          |  91 +++++++
 include/paimon/io/byte_array_input_stream.h        |  67 +++++
 include/paimon/io/data_input_stream.h              |  97 ++++++++
 src/paimon/common/io/buffered_input_stream.cpp     | 167 +++++++++++++
 .../common/io/buffered_input_stream_test.cpp       | 270 +++++++++++++++++++++
 src/paimon/common/io/byte_array_input_stream.cpp   | 107 ++++++++
 .../common/io/byte_array_input_stream_test.cpp     |  84 +++++++
 .../common/io/data_input_output_stream_test.cpp    | 208 ++++++++++++++++
 src/paimon/common/io/data_input_stream.cpp         | 129 ++++++++++
 src/paimon/common/io/data_output_stream.cpp        |  70 ++++++
 src/paimon/common/io/data_output_stream.h          |  79 ++++++
 .../common/io/memory_segment_output_stream.cpp     |  99 ++++++++
 .../common/io/memory_segment_output_stream.h       | 118 +++++++++
 .../io/memory_segment_output_stream_test.cpp       |  85 +++++++
 src/paimon/common/io/offset_input_stream.cpp       | 137 +++++++++++
 src/paimon/common/io/offset_input_stream.h         |  67 +++++
 src/paimon/common/io/offset_input_stream_test.cpp  | 216 +++++++++++++++++
 17 files changed, 2091 insertions(+)

diff --git a/include/paimon/io/buffered_input_stream.h 
b/include/paimon/io/buffered_input_stream.h
new file mode 100644
index 0000000..2af6b73
--- /dev/null
+++ b/include/paimon/io/buffered_input_stream.h
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Bytes;
+class MemoryPool;
+
+/// A buffered input stream that wraps another `InputStream` to provide 
buffering capabilities.
+///
+/// `BufferedInputStream` improves I/O performance by reducing the number of 
system calls
+/// through internal buffering. It reads data from the underlying stream in 
larger chunks
+/// and serves subsequent read requests from the internal buffer when possible.
+class PAIMON_EXPORT BufferedInputStream : public InputStream {
+ public:
+    /// Creates a new buffered input stream that wraps the provided input 
stream.
+    /// The buffer is allocated from the specified memory pool.
+    ///
+    /// @param in The underlying input stream to wrap.
+    /// @param buffer_size Size of the internal buffer in bytes.
+    /// @param pool Memory pool for buffer allocation.
+    BufferedInputStream(const std::shared_ptr<InputStream>& in, int32_t 
buffer_size,
+                        MemoryPool* pool);
+
+    ~BufferedInputStream() noexcept override;
+
+    Status Seek(int64_t offset, SeekOrigin origin) override;
+
+    Result<int64_t> GetPos() const override;
+
+    Result<int32_t> Read(char* buffer, uint32_t size) override;
+
+    Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) 
override;
+
+    void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+                   std::function<void(Status)>&& callback) override;
+
+    Result<uint64_t> Length() const override;
+
+    Status Close() override;
+
+    Result<std::string> GetUri() const override;
+
+    static constexpr int32_t DEFAULT_BUFFER_SIZE = 8192;
+
+ private:
+    /// Fill the internal buffer from the underlying stream.
+    Status Fill();
+
+    /// Internal read implementation.
+    /// @pre size > 0
+    Result<int32_t> InnerRead(char* buffer, int32_t size);
+
+    /// Validate that the expected number of bytes were read.
+    Status AssertReadLength(int32_t read_length, int32_t actual_read_length) 
const;
+
+ private:
+    int32_t buffer_size_;
+    int32_t pos_ = 0;
+    int32_t count_ = 0;
+    std::unique_ptr<Bytes> buffer_;
+    std::shared_ptr<InputStream> in_;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/io/byte_array_input_stream.h 
b/include/paimon/io/byte_array_input_stream.h
new file mode 100644
index 0000000..e66c5e8
--- /dev/null
+++ b/include/paimon/io/byte_array_input_stream.h
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <string>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// Input stream for memory buffer, inherits from `InputStream`.
+class PAIMON_EXPORT ByteArrayInputStream : public InputStream {
+ public:
+    ByteArrayInputStream(const char* buffer, uint64_t length);
+    ~ByteArrayInputStream() override = default;
+
+    /// @return The raw data pointer of current pos.
+    const char* GetRawData() const;
+
+    Status Seek(int64_t offset, SeekOrigin origin) override;
+
+    Result<int64_t> GetPos() const override {
+        return position_;
+    }
+
+    Result<int32_t> Read(char* buffer, uint32_t size) override;
+
+    Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) 
override;
+
+    void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+                   std::function<void(Status)>&& callback) override;
+
+    Result<uint64_t> Length() const override {
+        return length_;
+    }
+
+    Status Close() override;
+
+    Result<std::string> GetUri() const override;
+
+ private:
+    const char* buffer_;
+    const uint64_t length_;
+    int64_t position_;
+};
+}  // namespace paimon
diff --git a/include/paimon/io/data_input_stream.h 
b/include/paimon/io/data_input_stream.h
new file mode 100644
index 0000000..8dc15a2
--- /dev/null
+++ b/include/paimon/io/data_input_stream.h
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "paimon/io/byte_order.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Bytes;
+class InputStream;
+
+/// `DataInputStream` provides a convenient wrapper around `InputStream` for 
reading typed data.
+/// @note The default byte order is big-endian to maintain compatibility with 
the Java
+/// implementation.
+class PAIMON_EXPORT DataInputStream {
+ public:
+    /// Constructs a `DataInputStream` wrapping the given `InputStream`.
+    /// @param input_stream The underlying input stream to read from.
+    explicit DataInputStream(const std::shared_ptr<InputStream>& input_stream);
+
+    /// Seek to a specific position in the underlying input stream.
+    /// @param offset The absolute byte offset to seek to.
+    Status Seek(int64_t offset) const;
+
+    /// Read a typed value from the stream.
+    /// @return Result containing the read value or an error status.
+    template <typename T>
+    Result<T> ReadValue() const;
+
+    /// Read some bytes to a `Bytes` object from the stream. The length of 
bytes is the number of
+    /// bytes read from the stream.
+    /// @param bytes Buffer to store the read bytes.
+    Status ReadBytes(Bytes* bytes) const;
+
+    /// Read raw data of specified size from the stream.
+    /// @param data Buffer to store the read data.
+    /// @param size Number of bytes to read.
+    Status Read(char* data, uint32_t size) const;
+
+    /// Read string from the stream.
+    /// @note First read length (int16), then read string bytes.
+    Result<std::string> ReadString() const;
+
+    /// Get the current position in the underlying input stream.
+    Result<int64_t> GetPos() const;
+
+    /// Get the total length of the underlying input stream.
+    Result<uint64_t> Length() const;
+
+    /// Set the byte order for endianness conversion.
+    /// @param order The byte order to use `PAIMON_BIG_ENDIAN` or 
`PAIMON_LITTLE_ENDIAN`.
+    void SetOrder(ByteOrder order) {
+        byte_order_ = order;
+    }
+
+ private:
+    /// Validate that the expected number of bytes were read.
+    /// @param read_length Expected number of bytes to read.
+    /// @param actual_read_length Actual number of bytes read.
+    Status AssertReadLength(int32_t read_length, int32_t actual_read_length) 
const;
+
+    /// Check if there are enough bytes available to read.
+    /// @param need_length Number of bytes needed.
+    Status AssertBoundary(int32_t need_length) const;
+
+    /// Determine if byte swapping is needed based on current byte order and 
system endianness.
+    /// @return `true` if byte swapping is required, `false` otherwise.
+    bool NeedSwap() const;
+
+ private:
+    std::shared_ptr<InputStream> input_stream_;
+    ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/io/buffered_input_stream.cpp 
b/src/paimon/common/io/buffered_input_stream.cpp
new file mode 100644
index 0000000..d2b50b7
--- /dev/null
+++ b/src/paimon/common/io/buffered_input_stream.cpp
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/io/buffered_input_stream.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cstring>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+class MemoryPool;
+
+BufferedInputStream::BufferedInputStream(const std::shared_ptr<InputStream>& 
in,
+                                         int32_t buffer_size, MemoryPool* pool)
+    : buffer_size_(buffer_size), in_(in) {
+    assert(buffer_size > 0);
+    buffer_ = std::make_unique<Bytes>(buffer_size, pool);
+}
+
+BufferedInputStream::~BufferedInputStream() noexcept = default;
+
+Status BufferedInputStream::Seek(int64_t offset, SeekOrigin origin) {
+    // Convert all seek origins to an absolute offset so the buffer-hit fast
+    // path below can work uniformly on absolute positions.
+    int64_t target_abs_offset = offset;
+    if (origin == SeekOrigin::FS_SEEK_CUR) {
+        PAIMON_ASSIGN_OR_RAISE(int64_t cur_pos, GetPos());
+        target_abs_offset = cur_pos + offset;
+    } else if (origin == SeekOrigin::FS_SEEK_END) {
+        PAIMON_ASSIGN_OR_RAISE(int64_t length, in_->Length());
+        target_abs_offset = length + offset;
+    }
+    // else: FS_SEEK_SET — target_abs_offset is already absolute.
+
+    // Fast path: if the new absolute offset still falls into the bytes already
+    // cached in buffer_ (i.e. the window from buf_start_abs to buf_end_abs), 
just
+    // adjust pos_ without touching the underlying stream.
+    if (count_ > 0) {
+        PAIMON_ASSIGN_OR_RAISE(int64_t in_pos, in_->GetPos());
+        const int64_t buf_start_abs = in_pos - count_;
+        const int64_t buf_end_abs = in_pos;
+        if (target_abs_offset >= buf_start_abs && target_abs_offset <= 
buf_end_abs) {
+            pos_ = static_cast<int32_t>(target_abs_offset - buf_start_abs);
+            return Status::OK();
+        }
+    }
+
+    // Slow path: the target is outside the current buffer window, fall back to
+    // a real seek on the underlying stream and invalidate the buffer.
+    PAIMON_RETURN_NOT_OK(in_->Seek(target_abs_offset, FS_SEEK_SET));
+    pos_ = 0;
+    count_ = 0;
+    return Status::OK();
+}
+
+Result<int64_t> BufferedInputStream::GetPos() const {
+    PAIMON_ASSIGN_OR_RAISE(int64_t in_pos, in_->GetPos());
+    return in_pos - count_ + pos_;
+}
+
+Result<int32_t> BufferedInputStream::Read(char* buffer, uint32_t size) {
+    uint32_t actual_read_len = 0;
+    while (actual_read_len < size) {
+        PAIMON_ASSIGN_OR_RAISE(int32_t nread,
+                               InnerRead(buffer + actual_read_len, size - 
actual_read_len));
+        assert(nread > 0);
+        actual_read_len += nread;
+    }
+    PAIMON_RETURN_NOT_OK(AssertReadLength(size, actual_read_len));
+    return actual_read_len;
+}
+
+Result<int32_t> BufferedInputStream::Read(char* buffer, uint32_t size, 
uint64_t offset) {
+    return Status::Invalid("BufferedInputStream does not support Read from 
offset");
+}
+
+void BufferedInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t 
offset,
+                                    std::function<void(Status)>&& callback) {
+    callback(Status::NotImplemented("BufferedInputStream do not support 
ReadAsync"));
+}
+
+Result<uint64_t> BufferedInputStream::Length() const {
+    return in_->Length();
+}
+
+Status BufferedInputStream::Close() {
+    pos_ = 0;
+    count_ = 0;
+    buffer_.reset();
+    return Status::OK();
+}
+
+Result<std::string> BufferedInputStream::GetUri() const {
+    return in_->GetUri();
+}
+
+Status BufferedInputStream::Fill() {
+    pos_ = 0;
+    count_ = 0;
+    PAIMON_ASSIGN_OR_RAISE(int64_t in_pos, in_->GetPos());
+    PAIMON_ASSIGN_OR_RAISE(int64_t length, in_->Length());
+    int64_t left_to_read = std::min((length - in_pos), 
static_cast<int64_t>(buffer_size_));
+    PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len, in_->Read(buffer_->data(), 
left_to_read));
+    PAIMON_RETURN_NOT_OK(AssertReadLength(left_to_read, actual_read_len));
+    count_ = actual_read_len;
+    return Status::OK();
+}
+
+Result<int32_t> BufferedInputStream::InnerRead(char* buffer, int32_t size) {
+    assert(size > 0);
+    int32_t avail = count_ - pos_;
+    if (avail <= 0) {
+        assert(avail == 0);
+        /* If the requested length is at least as large as the buffer, and
+           if there is no mark/reset activity, do not bother to copy the
+           bytes into the local buffer.  In this way buffered streams will
+           cascade harmlessly. */
+        if (size >= buffer_size_) {
+            return in_->Read(buffer, size);
+        }
+        PAIMON_RETURN_NOT_OK(Fill());
+        avail = count_ - pos_;
+        if (avail <= 0) {
+            return Status::Invalid(fmt::format(
+                "InnerRead failed, after Fill(), still no bytes available (may 
read eof), but "
+                "expect read {} bytes",
+                size));
+        }
+    }
+    int32_t copy_length = std::min(avail, size);
+    memcpy(buffer, buffer_->data() + pos_, copy_length);
+    pos_ += copy_length;
+    return copy_length;
+}
+
+Status BufferedInputStream::AssertReadLength(int32_t read_length,
+                                             int32_t actual_read_length) const 
{
+    if (read_length != actual_read_length) {
+        return Status::Invalid(
+            fmt::format("assert read length failed: read length not match, 
read length {}, actual "
+                        "read length {}",
+                        read_length, actual_read_length));
+    }
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/buffered_input_stream_test.cpp 
b/src/paimon/common/io/buffered_input_stream_test.cpp
new file mode 100644
index 0000000..0f6c9ef
--- /dev/null
+++ b/src/paimon/common/io/buffered_input_stream_test.cpp
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/io/buffered_input_stream.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(BufferedInputStreamTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    auto output_stream = 
std::make_unique<MemorySegmentOutputStream>(/*segment_size=*/8, pool);
+    std::string str = "abcdef";
+    auto bytes = std::make_shared<Bytes>(str, pool.get());
+    output_stream->WriteBytes(bytes);
+    auto out_bytes = 
MemorySegmentUtils::CopyToBytes(output_stream->Segments(), 0,
+                                                     
output_stream->CurrentSize(), pool.get());
+    auto in = std::make_shared<ByteArrayInputStream>(out_bytes->data(), 
out_bytes->size());
+    auto input_stream = std::make_shared<BufferedInputStream>(in, 
/*buffer_size=*/4, pool.get());
+    ASSERT_EQ(6, input_stream->Length().value());
+    ASSERT_TRUE(input_stream->GetUri().value().empty());
+
+    // read from pos 0
+    std::string value(3, '\0');
+    ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value());
+    ASSERT_EQ("abc", value);
+    ASSERT_EQ(3, input_stream->GetPos().value());
+    ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value());
+    ASSERT_EQ("def", value);
+    ASSERT_EQ(6, input_stream->GetPos().value());
+
+    // read from pos 1
+    ASSERT_NOK_WITH_MSG(input_stream->Read(value.data(), value.size(), 
/*offset=*/1),
+                        "BufferedInputStream does not support Read from 
offset");
+    ASSERT_EQ(6, input_stream->GetPos().value());
+
+    // seek to pos 3
+    ASSERT_OK(input_stream->Seek(3, SeekOrigin::FS_SEEK_SET));
+    ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value());
+    ASSERT_EQ("def", value);
+    ASSERT_EQ(6, input_stream->GetPos().value());
+
+    // seek to pos 2
+    ASSERT_OK(input_stream->Seek(-4, SeekOrigin::FS_SEEK_END));
+    ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value());
+    ASSERT_EQ("cde", value);
+    ASSERT_EQ(5, input_stream->GetPos().value());
+
+    // seek to pos 1
+    ASSERT_OK(input_stream->Seek(-4, SeekOrigin::FS_SEEK_CUR));
+    ASSERT_EQ(3, input_stream->Read(value.data(), value.size()).value());
+    ASSERT_EQ("bcd", value);
+    ASSERT_EQ(4, input_stream->GetPos().value());
+
+    // test exceed eof, seek to pos 4, want to read 4 bytes
+    ASSERT_OK(input_stream->Seek(-2, SeekOrigin::FS_SEEK_END));
+    ASSERT_NOK_WITH_MSG(input_stream->Read(value.data(), value.size()),
+                        "InnerRead failed, after Fill(), still no bytes 
available (may read eof), "
+                        "but expect read 1 bytes");
+
+    // test invalid seek
+    ASSERT_NOK_WITH_MSG(input_stream->Seek(100, SeekOrigin::FS_SEEK_CUR),
+                        "invalid seek, after seek, current pos 106, length 6");
+
+    // test ReadAsync not implemented
+    bool read_finished = false;
+    auto callback = [&](Status status) {
+        ASSERT_TRUE(status.IsNotImplemented());
+        read_finished = true;
+    };
+    input_stream->ReadAsync(value.data(), value.size(), /*offset=*/0, 
callback);
+    ASSERT_TRUE(read_finished);
+
+    ASSERT_OK(input_stream->Close());
+}
+
+TEST(BufferedInputStreamTest, TestSeek) {
+    // Data: "0123456789abcdef" (16 bytes), buffer_size = 8
+    auto pool = GetDefaultPool();
+    std::string data = "0123456789abcdef";
+    auto in = std::make_shared<ByteArrayInputStream>(reinterpret_cast<const 
char*>(data.data()),
+                                                     data.size());
+    auto stream = std::make_shared<BufferedInputStream>(in, /*buffer_size=*/8, 
pool.get());
+
+    // Helper: verify buffer_ content matches expected substring of data
+    auto check_buffer = [&](const BufferedInputStream& s, const std::string& 
expected_content,
+                            int32_t expected_pos, int32_t expected_count) {
+        ASSERT_EQ(s.pos_, expected_pos);
+        ASSERT_EQ(s.count_, expected_count);
+        std::string actual(s.buffer_->data(), s.buffer_->data() + 
expected_count);
+        ASSERT_EQ(actual, expected_content) << "buffer content mismatch: 
actual=\"" << actual
+                                            << "\", expected=\"" << 
expected_content << "\"";
+    };
+
+    std::string buf(4, '\0');
+
+    // Initial state: buffer empty
+    ASSERT_EQ(stream->pos_, 0);
+    ASSERT_EQ(stream->count_, 0);
+
+    // FS_SEEK_SET slow path (buffer empty, count_==0): seek to pos 4, read 
"4567"
+    // First Read calls Fill: reads 8 bytes from pos 4 -> buffer = "456789ab", 
count_=8.
+    // Then consumes 4 bytes -> pos_=4.
+    {
+        ASSERT_OK(stream->Seek(4, SeekOrigin::FS_SEEK_SET));
+        // After seek: slow path, buffer invalidated
+        ASSERT_EQ(stream->pos_, 0);
+        ASSERT_EQ(stream->count_, 0);
+        ASSERT_EQ(4, stream->GetPos().value());
+
+        ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+        ASSERT_EQ("4567", buf);
+        check_buffer(*stream, "456789ab", 4, 8);
+    }
+
+    // FS_SEEK_CUR buffer hit: pos=8, seek -4 -> target=4, inside buffer 
[4..12)
+    // Fast path: only adjusts pos_ to 0, count_ stays 8, buffer unchanged.
+    {
+        ASSERT_OK(stream->Seek(-4, SeekOrigin::FS_SEEK_CUR));
+        check_buffer(*stream, "456789ab", 0, 8);
+        ASSERT_EQ(4, stream->GetPos().value());
+
+        ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+        ASSERT_EQ("4567", buf);
+        check_buffer(*stream, "456789ab", 4, 8);
+    }
+
+    // FS_SEEK_SET buffer hit: pos=8, seek to 5, inside buffer [4..12)
+    // Fast path: pos_ = 5 - 4 = 1, count_ stays 8, buffer unchanged.
+    {
+        ASSERT_OK(stream->Seek(5, SeekOrigin::FS_SEEK_SET));
+        check_buffer(*stream, "456789ab", 1, 8);
+        ASSERT_EQ(5, stream->GetPos().value());
+
+        ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+        ASSERT_EQ("5678", buf);
+        check_buffer(*stream, "456789ab", 5, 8);
+    }
+
+    // FS_SEEK_END buffer miss: target = 16 + (-6) = 10
+    // Current buffer covers [4..12). pos 10 is inside [4..12) -> actually 
buffer hit!
+    // Fast path: pos_ = 10 - 4 = 6, count_ stays 8.
+    {
+        ASSERT_OK(stream->Seek(-6, SeekOrigin::FS_SEEK_END));
+        check_buffer(*stream, "456789ab", 6, 8);
+        ASSERT_EQ(10, stream->GetPos().value());
+
+        // Read 4 bytes from pos 10: 2 left in buffer ("ab"), then refill.
+        ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+        ASSERT_EQ("abcd", buf);
+        // After consuming "ab" (pos_==count_==8), InnerRead calls Fill from 
in_ pos 12,
+        // reads [12..16) -> buffer = "cdef", count_=4, then consumes 2 -> 
pos_=2.
+        check_buffer(*stream, "cdef", 2, 4);
+    }
+
+    // Buffer miss slow path: seek to pos 0, current buffer covers [12..16).
+    // Target 0 is outside [12..16) -> slow path, buffer invalidated.
+    {
+        ASSERT_OK(stream->Seek(0, SeekOrigin::FS_SEEK_SET));
+        ASSERT_EQ(stream->pos_, 0);
+        ASSERT_EQ(stream->count_, 0);
+        ASSERT_EQ(0, stream->GetPos().value());
+
+        ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+        ASSERT_EQ("0123", buf);
+        // Fill reads [0..8) -> buffer = "01234567", count_=8, pos_=4
+        check_buffer(*stream, "01234567", 4, 8);
+    }
+
+    // Buffer hit at exact boundary start: seek to pos 0 (= buf_start_abs=0)
+    // Fast path: pos_ = 0, count_ stays 8, buffer unchanged.
+    {
+        ASSERT_OK(stream->Seek(0, SeekOrigin::FS_SEEK_SET));
+        check_buffer(*stream, "01234567", 0, 8);
+
+        ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+        ASSERT_EQ("0123", buf);
+        check_buffer(*stream, "01234567", 4, 8);
+    }
+
+    // Buffer hit at exact boundary end: seek to pos 8 (= buf_end_abs=8)
+    // Fast path: pos_ = 8 == count_, buffer unchanged but next Read triggers 
refill.
+    {
+        ASSERT_OK(stream->Seek(8, SeekOrigin::FS_SEEK_SET));
+        check_buffer(*stream, "01234567", 8, 8);
+        ASSERT_EQ(8, stream->GetPos().value());
+
+        ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+        ASSERT_EQ("89ab", buf);
+        // pos_==count_ triggered Fill: buffer = "89abcdef", count_=8, pos_=4
+        check_buffer(*stream, "89abcdef", 4, 8);
+    }
+
+    // FS_SEEK_CUR buffer miss: pos=12, seek -12 -> target=0, outside [8..16)
+    {
+        ASSERT_OK(stream->Seek(-12, SeekOrigin::FS_SEEK_CUR));
+        ASSERT_EQ(stream->pos_, 0);
+        ASSERT_EQ(stream->count_, 0);
+
+        ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+        ASSERT_EQ("0123", buf);
+        check_buffer(*stream, "01234567", 4, 8);
+    }
+
+    // FS_SEEK_END buffer hit: fill buffer near file end, then seek within via 
FS_SEEK_END.
+    {
+        ASSERT_OK(stream->Seek(12, SeekOrigin::FS_SEEK_SET));
+        // pos 12 is outside current buffer [0..8) -> slow path
+        ASSERT_EQ(stream->pos_, 0);
+        ASSERT_EQ(stream->count_, 0);
+
+        ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+        ASSERT_EQ("cdef", buf);
+        // Fill from 12: buffer = "cdef", count_=4, pos_=4
+        check_buffer(*stream, "cdef", 4, 4);
+
+        // Seek -4 from end -> target = 12, buffer covers [12..16), 12 is 
inside.
+        // Fast path: pos_ = 12 - 12 = 0
+        ASSERT_OK(stream->Seek(-4, SeekOrigin::FS_SEEK_END));
+        check_buffer(*stream, "cdef", 0, 4);
+        ASSERT_EQ(12, stream->GetPos().value());
+
+        ASSERT_EQ(4, stream->Read(buf.data(), 4).value());
+        ASSERT_EQ("cdef", buf);
+        check_buffer(*stream, "cdef", 4, 4);
+    }
+
+    // Empty buffer (count_==0): fresh stream, seek always takes slow path.
+    {
+        auto in2 = std::make_shared<ByteArrayInputStream>(
+            reinterpret_cast<const char*>(data.data()), data.size());
+        auto fresh = std::make_shared<BufferedInputStream>(in2, 
/*buffer_size=*/8, pool.get());
+        ASSERT_EQ(fresh->pos_, 0);
+        ASSERT_EQ(fresh->count_, 0);
+
+        ASSERT_OK(fresh->Seek(10, SeekOrigin::FS_SEEK_SET));
+        ASSERT_EQ(fresh->pos_, 0);
+        ASSERT_EQ(fresh->count_, 0);
+        ASSERT_EQ(10, fresh->GetPos().value());
+
+        ASSERT_EQ(4, fresh->Read(buf.data(), 4).value());
+        ASSERT_EQ("abcd", buf);
+        // Fill from 10: buffer = "abcdef", count_=6, pos_=4
+        check_buffer(*fresh, "abcdef", 4, 6);
+    }
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/io/byte_array_input_stream.cpp 
b/src/paimon/common/io/byte_array_input_stream.cpp
new file mode 100644
index 0000000..a88d6b8
--- /dev/null
+++ b/src/paimon/common/io/byte_array_input_stream.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/io/byte_array_input_stream.h"
+
+#include <cassert>
+#include <cstring>
+#include <utility>
+
+#include "fmt/format.h"
+
+namespace paimon {
+ByteArrayInputStream::ByteArrayInputStream(const char* buffer, uint64_t length)
+    : buffer_(buffer), length_(length), position_(0) {
+    assert(buffer_);
+}
+
+const char* ByteArrayInputStream::GetRawData() const {
+    return buffer_ + position_;
+}
+
+Status ByteArrayInputStream::Seek(int64_t offset, SeekOrigin origin) {
+    switch (origin) {
+        case SeekOrigin::FS_SEEK_SET: {
+            position_ = offset;
+            break;
+        }
+        case SeekOrigin::FS_SEEK_CUR: {
+            position_ += offset;
+            break;
+        }
+        case SeekOrigin::FS_SEEK_END: {
+            PAIMON_ASSIGN_OR_RAISE(uint64_t length, Length());
+            position_ = static_cast<int64_t>(length) + offset;
+            break;
+        }
+        default:
+            return Status::Invalid(
+                "invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR, 
and FS_SEEK_END");
+    }
+    if (position_ < 0 || position_ > static_cast<int64_t>(length_)) {
+        return Status::Invalid(
+            fmt::format("invalid seek, after seek, current pos {}, length {}", 
position_, length_));
+    }
+    return Status::OK();
+}
+
+Result<int32_t> ByteArrayInputStream::Read(char* buffer, uint32_t size) {
+    if (position_ + static_cast<int64_t>(size) > 
static_cast<int64_t>(length_)) {
+        return Status::Invalid(
+            fmt::format("ByteArrayInputStream assert boundary failed: need 
length {}, current "
+                        "position {}, exceed length {}",
+                        size, position_, length_));
+    }
+    memcpy(buffer, buffer_ + position_, size);
+    position_ += size;
+    return size;
+}
+
+Result<int32_t> ByteArrayInputStream::Read(char* buffer, uint32_t size, 
uint64_t offset) {
+    if (offset + static_cast<uint64_t>(size) > length_) {
+        return Status::Invalid(
+            fmt::format("ByteArrayInputStream assert boundary failed: need 
length {}, read offset "
+                        "{}, exceed length {}",
+                        size, offset, length_));
+    }
+    memcpy(buffer, buffer_ + offset, size);
+    return size;
+}
+
+void ByteArrayInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t 
offset,
+                                     std::function<void(Status)>&& callback) {
+    Result<int32_t> read_size = Read(buffer, size, offset);
+    Status status = Status::OK();
+    if (read_size.ok() && static_cast<uint32_t>(read_size.value()) != size) {
+        status = Status::Invalid(fmt::format(
+            "ByteArrayInputStream async read size {} != expected {}", 
read_size.value(), size));
+    } else if (!read_size.ok()) {
+        status = read_size.status();
+    }
+    callback(status);
+}
+
+Status ByteArrayInputStream::Close() {
+    return Status::OK();
+}
+
+Result<std::string> ByteArrayInputStream::GetUri() const {
+    return std::string();
+}
+}  // namespace paimon
diff --git a/src/paimon/common/io/byte_array_input_stream_test.cpp 
b/src/paimon/common/io/byte_array_input_stream_test.cpp
new file mode 100644
index 0000000..adac48f
--- /dev/null
+++ b/src/paimon/common/io/byte_array_input_stream_test.cpp
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/io/byte_array_input_stream.h"
+
+#include <memory>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(ByteArrayInputStreamTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    auto output_stream = 
std::make_unique<MemorySegmentOutputStream>(/*segment_size=*/8, pool);
+    std::string str = "abcdef";
+    auto bytes = std::make_shared<Bytes>(str, pool.get());
+    output_stream->WriteBytes(bytes);
+    auto out_bytes = 
MemorySegmentUtils::CopyToBytes(output_stream->Segments(), 0,
+                                                     
output_stream->CurrentSize(), pool.get());
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(out_bytes->data(), 
out_bytes->size());
+    ASSERT_EQ(6, input_stream->Length().value());
+    ASSERT_TRUE(input_stream->GetUri().value().empty());
+
+    // read from pos 1
+    std::string value(4, '\0');
+    ASSERT_EQ(4, input_stream->Read(value.data(), value.size(), 
/*offset=*/1).value());
+    ASSERT_EQ("bcde", value);
+    ASSERT_EQ(0, input_stream->GetPos().value());
+
+    // seek to pos 2
+    ASSERT_OK(input_stream->Seek(2, SeekOrigin::FS_SEEK_SET));
+    ASSERT_EQ(4, input_stream->Read(value.data(), value.size()).value());
+    ASSERT_EQ("cdef", value);
+    ASSERT_EQ(6, input_stream->GetPos().value());
+
+    // although seek to pos 2, read set offset 0
+    ASSERT_OK(input_stream->Seek(2, SeekOrigin::FS_SEEK_SET));
+    bool read_finished = false;
+    auto callback = [&](Status status) {
+        ASSERT_OK(status);
+        if (status.ok()) {
+            read_finished = true;
+        }
+    };
+    input_stream->ReadAsync(value.data(), value.size(), /*offset=*/0, 
callback);
+    ASSERT_TRUE(read_finished);
+    ASSERT_EQ("abcd", value);
+    ASSERT_EQ(2, input_stream->GetPos().value());
+
+    // test exceed eof, seek to pos 3, want to read 4 bytes
+    ASSERT_OK(input_stream->Seek(-3, SeekOrigin::FS_SEEK_END));
+    ASSERT_NOK_WITH_MSG(
+        input_stream->Read(value.data(), value.size()),
+        "assert boundary failed: need length 4, current position 3, exceed 
length 6");
+
+    // test invalid seek
+    ASSERT_NOK_WITH_MSG(input_stream->Seek(100, SeekOrigin::FS_SEEK_CUR),
+                        "invalid seek, after seek, current pos 103, length 6");
+    ASSERT_OK(input_stream->Close());
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/io/data_input_output_stream_test.cpp 
b/src/paimon/common/io/data_input_output_stream_test.cpp
new file mode 100644
index 0000000..4e60637
--- /dev/null
+++ b/src/paimon/common/io/data_input_output_stream_test.cpp
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/io/data_output_stream.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/io/buffered_input_stream.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+class DataInputOutputStreamTest : public ::testing::Test,
+                                  public 
::testing::WithParamInterface<ByteOrder> {
+ protected:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+        std::vector<char> bytes_big_endian = {
+            127, 125,  67,   127,  -2,  -57,  127,  127, -1,  -1,   -1,  -1,  
-1,  -1,
+            -3,  1,    0,    39,   84,  104,  105,  115, 32,  105,  115, 32,  
97,  32,
+            118, 101,  114,  121,  32,  118,  101,  114, 121, 32,   118, 101, 
114, 121,
+            32,  108,  111,  110,  103, 32,   115,  101, 110, 116,  101, 110, 
99,  101,
+            46,  -26,  -120, -111, -26, -104, -81,  -28, -72, -128, -28, -72, 
-86, -25,
+            -78, -119, -27,  -120, -73, -27,  -116, -96, -17, -67,  -98};
+        serialized_bytes_big_endian_ =
+            std::make_shared<Bytes>(bytes_big_endian.size(), pool_.get());
+        memcpy(serialized_bytes_big_endian_->data(), bytes_big_endian.data(),
+               bytes_big_endian.size());
+
+        std::vector<char> bytes_little_endian = {
+            127, 67,   125,  127,  -57, -2,   127,  -3,  -1,  -1,   -1,  -1,  
-1,  -1,
+            127, 1,    39,   0,    84,  104,  105,  115, 32,  105,  115, 32,  
97,  32,
+            118, 101,  114,  121,  32,  118,  101,  114, 121, 32,   118, 101, 
114, 121,
+            32,  108,  111,  110,  103, 32,   115,  101, 110, 116,  101, 110, 
99,  101,
+            46,  -26,  -120, -111, -26, -104, -81,  -28, -72, -128, -28, -72, 
-86, -25,
+            -78, -119, -27,  -120, -73, -27,  -116, -96, -17, -67,  -98};
+        serialized_bytes_little_endian_ =
+            std::make_shared<Bytes>(bytes_little_endian.size(), pool_.get());
+        memcpy(serialized_bytes_little_endian_->data(), 
bytes_little_endian.data(),
+               bytes_little_endian.size());
+    }
+
+    template <typename T>
+    void WriteValues(T* data_output_stream) const {
+        (void)data_output_stream->WriteValue(static_cast<char>(127));          
           // 1 byte
+        (void)data_output_stream->WriteValue(static_cast<int16_t>(32067));     
           // 2 bytes
+        
(void)data_output_stream->WriteValue(static_cast<int32_t>(2147403647));         
  // 4 bytes
+        
(void)data_output_stream->WriteValue(static_cast<int64_t>(9223372036854775805));
  // 8 bytes
+        (void)data_output_stream->WriteValue(true);                            
           // 1 byte
+        std::string str1 = "This is a very very very long sentence.";
+        if constexpr (std::is_same_v<T, MemorySegmentOutputStream>) {
+            (void)data_output_stream->WriteString(str1);  // 39 bytes + 2 
bytes len
+        } else {
+            (void)data_output_stream->WriteString(str1);  // 39 bytes + 2 
bytes len
+        }
+        std::string str2 = "我是一个粉刷匠~";  // 24 bytes
+        auto bytes = std::make_shared<Bytes>(str2, pool_.get());
+        (void)data_output_stream->WriteBytes(bytes);
+    }
+
+    void CheckResult(const InputStream* input_stream,
+                     const DataInputStream* data_input_stream) const {
+        ASSERT_EQ(127, data_input_stream->ReadValue<char>().value());
+        ASSERT_EQ(32067, data_input_stream->ReadValue<int16_t>().value());
+        ASSERT_EQ((int32_t)2147403647, 
data_input_stream->ReadValue<int32_t>().value());
+        ASSERT_EQ((int64_t)9223372036854775805, 
data_input_stream->ReadValue<int64_t>().value());
+        ASSERT_EQ(true, data_input_stream->ReadValue<bool>().value());
+        std::string str1 = "This is a very very very long sentence.";
+        ASSERT_EQ(str1, data_input_stream->ReadString().value());
+        std::string str2 = "我是一个粉刷匠~";  // 24 bytes
+        auto bytes = std::make_shared<Bytes>(str2, pool_.get());
+        auto read_bytes = std::make_shared<Bytes>(str2.length(), pool_.get());
+        ASSERT_OK(data_input_stream->ReadBytes(read_bytes.get()));
+        ASSERT_EQ(*bytes, *read_bytes);
+        // test GetPos
+        ASSERT_OK_AND_ASSIGN(int64_t in_pos, input_stream->GetPos());
+        ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, in_pos);
+        // read eof, return bad status
+        ASSERT_NOK(data_input_stream->ReadString());
+        // test seek
+        ASSERT_OK(data_input_stream->Seek(3));
+        ASSERT_EQ((int32_t)2147403647, 
data_input_stream->ReadValue<int32_t>().value());
+    }
+    void TearDown() override {}
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<Bytes> serialized_bytes_big_endian_;
+    std::shared_ptr<Bytes> serialized_bytes_little_endian_;
+};
+INSTANTIATE_TEST_SUITE_P(ByteOrder, DataInputOutputStreamTest,
+                         ::testing::Values(ByteOrder::PAIMON_BIG_ENDIAN,
+                                           ByteOrder::PAIMON_LITTLE_ENDIAN));
+
+TEST_P(DataInputOutputStreamTest, TestFileStream) {
+    ByteOrder byte_order = GetParam();
+    auto file_system = std::make_unique<LocalFileSystem>();
+    auto dir = UniqueTestDirectory::Create();
+    ASSERT_TRUE(dir);
+    std::string test_file = dir->Str() + "/test";
+    // prepare output stream
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> output_stream,
+                         file_system->Create(test_file, /*overwrite=*/true));
+    auto data_output_stream = 
std::make_unique<DataOutputStream>(output_stream);
+    data_output_stream->SetOrder(byte_order);
+    WriteValues(data_output_stream.get());
+    ASSERT_OK_AND_ASSIGN(int64_t out_pos, output_stream->GetPos());
+    ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, out_pos);
+    ASSERT_OK(output_stream->Close());
+    // check file content
+    ASSERT_OK_AND_ASSIGN(bool exist, file_system->Exists(test_file));
+    ASSERT_TRUE(exist);
+    std::string out_str;
+    ASSERT_OK(file_system->ReadFile(test_file, &out_str));
+    auto out_bytes = std::make_shared<Bytes>(out_str, pool_.get());
+    // print_hex(out_str);
+    if (byte_order == ByteOrder::PAIMON_BIG_ENDIAN) {
+        ASSERT_EQ(*serialized_bytes_big_endian_, *out_bytes);
+    } else {
+        ASSERT_EQ(*serialized_bytes_little_endian_, *out_bytes);
+    }
+
+    // prepare input stream
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, 
file_system->Open(test_file));
+    auto data_input_stream = std::make_unique<DataInputStream>(input_stream);
+    data_input_stream->SetOrder(byte_order);
+    CheckResult(input_stream.get(), data_input_stream.get());
+}
+
+TEST_P(DataInputOutputStreamTest, TestInMemoryByteArrayStream) {
+    ByteOrder byte_order = GetParam();
+    // prepare output stream
+    auto data_output_stream =
+        std::make_unique<MemorySegmentOutputStream>(/*segment_size=*/8, pool_);
+    data_output_stream->SetOrder(byte_order);
+    WriteValues(data_output_stream.get());
+    ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, data_output_stream->CurrentSize());
+    auto out_bytes = MemorySegmentUtils::CopyToBytes(
+        data_output_stream->Segments(), 0, data_output_stream->CurrentSize(), 
pool_.get());
+    if (byte_order == ByteOrder::PAIMON_BIG_ENDIAN) {
+        ASSERT_EQ(*serialized_bytes_big_endian_, *out_bytes);
+    } else {
+        ASSERT_EQ(*serialized_bytes_little_endian_, *out_bytes);
+    }
+    // prepare input stream
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(out_bytes->data(), 
out_bytes->size());
+    auto data_input_stream = std::make_unique<DataInputStream>(input_stream);
+    data_input_stream->SetOrder(byte_order);
+    CheckResult(input_stream.get(), data_input_stream.get());
+}
+
+TEST_P(DataInputOutputStreamTest, TestBufferedStream) {
+    ByteOrder byte_order = GetParam();
+    // prepare output stream
+    auto data_output_stream =
+        std::make_unique<MemorySegmentOutputStream>(/*segment_size=*/8, pool_);
+    data_output_stream->SetOrder(byte_order);
+    WriteValues(data_output_stream.get());
+    ASSERT_EQ(1 + 2 + 4 + 8 + 1 + 41 + 24, data_output_stream->CurrentSize());
+    auto out_bytes = MemorySegmentUtils::CopyToBytes(
+        data_output_stream->Segments(), 0, data_output_stream->CurrentSize(), 
pool_.get());
+    if (byte_order == ByteOrder::PAIMON_BIG_ENDIAN) {
+        ASSERT_EQ(*serialized_bytes_big_endian_, *out_bytes);
+    } else {
+        ASSERT_EQ(*serialized_bytes_little_endian_, *out_bytes);
+    }
+    // prepare input stream
+    auto input_stream =
+        std::make_unique<ByteArrayInputStream>(out_bytes->data(), 
out_bytes->size());
+    auto buffered_input_stream = std::make_shared<BufferedInputStream>(
+        std::move(input_stream), /*buffer_size=*/4, pool_.get());
+    auto data_input_stream = 
std::make_unique<DataInputStream>(buffered_input_stream);
+    data_input_stream->SetOrder(byte_order);
+    CheckResult(buffered_input_stream.get(), data_input_stream.get());
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/io/data_input_stream.cpp 
b/src/paimon/common/io/data_input_stream.cpp
new file mode 100644
index 0000000..63f8338
--- /dev/null
+++ b/src/paimon/common/io/data_input_stream.cpp
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/io/data_input_stream.h"
+
+#include <cassert>
+#include <type_traits>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/math.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+
+DataInputStream::DataInputStream(const std::shared_ptr<InputStream>& 
input_stream)
+    : input_stream_(input_stream) {
+    assert(input_stream_);
+}
+
+Status DataInputStream::Seek(int64_t offset) const {
+    return input_stream_->Seek(offset, SeekOrigin::FS_SEEK_SET);
+}
+
+template <typename T>
+Result<T> DataInputStream::ReadValue() const {
+    static_assert(std::is_trivially_copyable_v<T>, "T must be trivially 
copyable");
+    int32_t read_length = sizeof(T);
+    PAIMON_RETURN_NOT_OK(AssertBoundary(read_length));
+    T value;
+    PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length,
+                           
input_stream_->Read(reinterpret_cast<char*>(&value), read_length));
+    PAIMON_RETURN_NOT_OK(AssertReadLength(read_length, actual_read_length));
+    if (NeedSwap()) {
+        value = EndianSwapValue(value);
+    }
+    return value;
+}
+
+Status DataInputStream::ReadBytes(Bytes* bytes) const {
+    int32_t read_length = bytes->size();
+    PAIMON_RETURN_NOT_OK(AssertBoundary(read_length));
+    PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length,
+                           input_stream_->Read(bytes->data(), read_length));
+    PAIMON_RETURN_NOT_OK(AssertReadLength(read_length, actual_read_length));
+    return Status::OK();
+}
+
+Status DataInputStream::Read(char* data, uint32_t size) const {
+    PAIMON_RETURN_NOT_OK(AssertBoundary(size));
+    PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length, 
input_stream_->Read(data, size));
+    PAIMON_RETURN_NOT_OK(AssertReadLength(size, actual_read_length));
+    return Status::OK();
+}
+
+Result<std::string> DataInputStream::ReadString() const {
+    uint16_t read_length = 0;
+    PAIMON_ASSIGN_OR_RAISE(read_length, ReadValue<uint16_t>());
+    PAIMON_RETURN_NOT_OK(AssertBoundary(read_length));
+    std::string value(read_length, '\0');
+    PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_length,
+                           input_stream_->Read(value.data(), read_length));
+    PAIMON_RETURN_NOT_OK(AssertReadLength(read_length, actual_read_length));
+    return value;
+}
+
+Result<int64_t> DataInputStream::GetPos() const {
+    return input_stream_->GetPos();
+}
+
+Result<uint64_t> DataInputStream::Length() const {
+    return input_stream_->Length();
+}
+
+Status DataInputStream::AssertReadLength(int32_t read_length, int32_t 
actual_read_length) const {
+    if (read_length != actual_read_length) {
+        return Status::Invalid(
+            fmt::format("assert read length failed: read length not match, 
read length {}, actual "
+                        "read length {}",
+                        read_length, actual_read_length));
+    }
+    return Status::OK();
+}
+
+Status DataInputStream::AssertBoundary(int32_t need_length) const {
+    // TODO(jinli.zjw): Store current_pos and file_length as member variables 
to reduce the overhead
+    // of I/O calls.
+    PAIMON_ASSIGN_OR_RAISE(int64_t pos, input_stream_->GetPos());
+    PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
+    if (pos + need_length > static_cast<int64_t>(length)) {
+        return Status::Invalid(
+            fmt::format("DataInputStream assert boundary failed: need length 
{}, current position "
+                        "{}, exceed length {}",
+                        need_length, pos, length));
+    }
+    return Status::OK();
+}
+
+bool DataInputStream::NeedSwap() const {
+    return SystemByteOrder() != byte_order_;
+}
+
+template Result<bool> DataInputStream::ReadValue() const;
+template Result<char> DataInputStream::ReadValue() const;
+template Result<int8_t> DataInputStream::ReadValue() const;
+template Result<int16_t> DataInputStream::ReadValue() const;
+template Result<uint16_t> DataInputStream::ReadValue() const;
+template Result<int32_t> DataInputStream::ReadValue() const;
+template Result<int64_t> DataInputStream::ReadValue() const;
+template Result<float> DataInputStream::ReadValue() const;
+template Result<double> DataInputStream::ReadValue() const;
+}  // namespace paimon
diff --git a/src/paimon/common/io/data_output_stream.cpp 
b/src/paimon/common/io/data_output_stream.cpp
new file mode 100644
index 0000000..14d7463
--- /dev/null
+++ b/src/paimon/common/io/data_output_stream.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/io/data_output_stream.h"
+
+#include "fmt/format.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+
+namespace paimon {
+DataOutputStream::DataOutputStream(const std::shared_ptr<OutputStream>& 
output_stream)
+    : output_stream_(output_stream) {
+    assert(output_stream_);
+}
+
+Status DataOutputStream::WriteBytes(const std::shared_ptr<Bytes>& bytes) {
+    int32_t write_length = bytes->size();
+    PAIMON_ASSIGN_OR_RAISE(int32_t actual_write_length,
+                           output_stream_->Write(bytes->data(), write_length));
+    PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length));
+    return Status::OK();
+}
+
+Status DataOutputStream::WriteString(const std::string& value) {
+    uint16_t write_length = value.size();
+    PAIMON_RETURN_NOT_OK(WriteValue<uint16_t>(write_length));
+    PAIMON_ASSIGN_OR_RAISE(int32_t actual_write_length,
+                           output_stream_->Write(value.data(), write_length));
+    PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length));
+    return Status::OK();
+}
+
+Status DataOutputStream::AssertWriteLength(int32_t write_length,
+                                           int32_t actual_write_length) const {
+    if (write_length != actual_write_length) {
+        return Status::Invalid(fmt::format(
+            "assert write length failed: write length not match, write length 
{}, actual "
+            "write length {}",
+            write_length, actual_write_length));
+    }
+    return Status::OK();
+}
+
+bool DataOutputStream::NeedSwap() const {
+    return SystemByteOrder() != byte_order_;
+}
+
+template Status DataOutputStream::WriteValue(const bool&);
+template Status DataOutputStream::WriteValue(const char&);
+template Status DataOutputStream::WriteValue(const int16_t&);
+template Status DataOutputStream::WriteValue(const uint16_t&);
+template Status DataOutputStream::WriteValue(const int32_t&);
+template Status DataOutputStream::WriteValue(const int64_t&);
+}  // namespace paimon
diff --git a/src/paimon/common/io/data_output_stream.h 
b/src/paimon/common/io/data_output_stream.h
new file mode 100644
index 0000000..ace5976
--- /dev/null
+++ b/src/paimon/common/io/data_output_stream.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <type_traits>
+
+#include "paimon/common/utils/math.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Bytes;
+class OutputStream;
+
+// data output stream, support WriteValue() and WriteString() from 
OutputStream, also do big-endian
+// conversion to ensure cross-language compatibility
+class PAIMON_EXPORT DataOutputStream {
+ public:
+    explicit DataOutputStream(const std::shared_ptr<OutputStream>& 
output_stream);
+
+    template <typename T>
+    Status WriteValue(const T& value) {
+        static_assert(std::is_trivially_copyable_v<T>, "T must be trivially 
copyable");
+        T write_value = value;
+        if (NeedSwap()) {
+            write_value = EndianSwapValue(value);
+        }
+        int32_t write_length = sizeof(T);
+        PAIMON_ASSIGN_OR_RAISE(
+            int32_t actual_write_length,
+            output_stream_->Write(reinterpret_cast<char*>(&write_value), 
write_length));
+        PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, 
actual_write_length));
+        return Status::OK();
+    }
+
+    Status WriteBytes(const std::shared_ptr<Bytes>& bytes);
+
+    /// First write length (int16), then write string bytes.
+    Status WriteString(const std::string& value);
+
+    void SetOrder(ByteOrder order) {
+        byte_order_ = order;
+    }
+
+ private:
+    Status AssertWriteLength(int32_t write_length, int32_t 
actual_write_length) const;
+
+    bool NeedSwap() const;
+
+ private:
+    std::shared_ptr<OutputStream> output_stream_;
+
+    ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/memory_segment_output_stream.cpp 
b/src/paimon/common/io/memory_segment_output_stream.cpp
new file mode 100644
index 0000000..498b5ab
--- /dev/null
+++ b/src/paimon/common/io/memory_segment_output_stream.cpp
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/io/memory_segment_output_stream.h"
+
+#include <algorithm>
+
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+class MemoryPool;
+
+MemorySegmentOutputStream::MemorySegmentOutputStream(int32_t segment_size,
+                                                     const 
std::shared_ptr<MemoryPool>& pool)
+    : segment_size_(segment_size), pool_(pool) {
+    Advance();
+}
+
+void MemorySegmentOutputStream::Advance() {
+    current_segment_ = NextSegment();
+    position_in_segment_ = 0;
+}
+
+MemorySegment MemorySegmentOutputStream::NextSegment() {
+    MemorySegment memory_segment = 
MemorySegment::AllocateHeapMemory(segment_size_, pool_.get());
+    memory_segments_.push_back(memory_segment);
+    return memory_segment;
+}
+
+void MemorySegmentOutputStream::WriteBytes(const std::shared_ptr<Bytes>& 
bytes) {
+    auto segment = MemorySegment::Wrap(bytes);
+    Write(segment, 0, segment.Size());
+}
+
+void MemorySegmentOutputStream::WriteString(const std::string& str) {
+    WriteValue<int16_t>(str.length());
+    Write(str.data(), str.size());
+}
+
+void MemorySegmentOutputStream::Write(const char* data, uint32_t size) {
+    auto bytes = std::make_shared<Bytes>(size, pool_.get());
+    memcpy(bytes->data(), data, size);
+    auto segment = MemorySegment::Wrap(bytes);
+    Write(segment, 0, segment.Size());
+}
+
+void MemorySegmentOutputStream::Write(const MemorySegment& segment, int32_t 
offset, int32_t len) {
+    int32_t remaining = segment_size_ - position_in_segment_;
+    if (remaining >= len) {
+        segment.CopyTo(offset, &current_segment_, position_in_segment_, len);
+        position_in_segment_ += len;
+    } else {
+        if (remaining == 0) {
+            Advance();
+            remaining = segment_size_ - position_in_segment_;
+        }
+        while (true) {
+            int32_t to_put = std::min(remaining, len);
+            segment.CopyTo(offset, &current_segment_, position_in_segment_, 
to_put);
+            offset += to_put;
+            len -= to_put;
+
+            if (len > 0) {
+                position_in_segment_ = segment_size_;
+                Advance();
+                remaining = segment_size_ - position_in_segment_;
+            } else {
+                position_in_segment_ += to_put;
+                break;
+            }
+        }
+    }
+}
+
+int64_t MemorySegmentOutputStream::CurrentSize() const {
+    return segment_size_ * (memory_segments_.size() - 1) + 
CurrentPositionInSegment();
+}
+
+bool MemorySegmentOutputStream::NeedSwap() const {
+    return SystemByteOrder() != byte_order_;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/memory_segment_output_stream.h 
b/src/paimon/common/io/memory_segment_output_stream.h
new file mode 100644
index 0000000..dbe3816
--- /dev/null
+++ b/src/paimon/common/io/memory_segment_output_stream.h
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/common/utils/math.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/type_fwd.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Bytes;
+class MemoryPool;
+
+// TODO(xinyu.lxy): use DataOutputStream to do big-endian conversion
+class PAIMON_EXPORT MemorySegmentOutputStream {
+ public:
+    MemorySegmentOutputStream(int32_t segment_size, const 
std::shared_ptr<MemoryPool>& pool);
+
+    static constexpr int32_t DEFAULT_SEGMENT_SIZE = 64 * 1024;
+
+    void Write(const MemorySegment& segment, int32_t offset, int32_t len);
+
+    template <typename T>
+    void WriteValue(T v);
+
+    /// First write length (int16), then write string bytes.
+    void WriteString(const std::string& str);
+
+    void WriteBytes(const std::shared_ptr<Bytes>& bytes);
+
+    void Write(const char* data, uint32_t size);
+
+    int32_t CurrentPositionInSegment() const {
+        return position_in_segment_;
+    }
+
+    int64_t CurrentSize() const;
+
+    const std::vector<MemorySegment>& Segments() const {
+        return memory_segments_;
+    }
+
+    void SetOrder(ByteOrder order) {
+        byte_order_ = order;
+    }
+
+ private:
+    template <typename T>
+    void WriteValueImpl(T v);
+
+    void Advance();
+    MemorySegment NextSegment();
+
+    bool NeedSwap() const;
+
+ private:
+    int32_t segment_size_;
+    int32_t position_in_segment_;
+    std::shared_ptr<MemoryPool> pool_;
+    MemorySegment current_segment_;
+    std::vector<MemorySegment> memory_segments_;
+
+    ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN;
+};
+
+template <typename T>
+void MemorySegmentOutputStream::WriteValueImpl(T v) {
+    if (position_in_segment_ <= segment_size_ - 
static_cast<int32_t>(sizeof(T))) {
+        current_segment_.PutValue<T>(position_in_segment_, v);
+        position_in_segment_ += sizeof(T);
+    } else if (position_in_segment_ == segment_size_) {
+        Advance();
+        WriteValueImpl<T>(v);
+    } else {
+        for (size_t i = 0; i < sizeof(T); i++) {
+            // because of endian swap, just copy the bytes of input v
+            uint8_t onebyte = 0;
+            memcpy(&onebyte, (reinterpret_cast<uint8_t*>(&v)) + i, 
sizeof(onebyte));
+            WriteValueImpl<uint8_t>(onebyte);
+        }
+    }
+}
+
+template <typename T>
+void MemorySegmentOutputStream::WriteValue(T v) {
+    static_assert(std::is_trivially_copyable_v<T>, "T must be trivially 
copyable");
+    if (NeedSwap()) {
+        v = EndianSwapValue(v);
+    }
+    return WriteValueImpl(v);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/memory_segment_output_stream_test.cpp 
b/src/paimon/common/io/memory_segment_output_stream_test.cpp
new file mode 100644
index 0000000..61fbfe3
--- /dev/null
+++ b/src/paimon/common/io/memory_segment_output_stream_test.cpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/io/memory_segment_output_stream.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+
+class MemorySegmentOutputStreamTest : public ::testing::Test,
+                                      public 
::testing::WithParamInterface<ByteOrder> {};
+
+INSTANTIATE_TEST_SUITE_P(ByteOrder, MemorySegmentOutputStreamTest,
+                         ::testing::Values(ByteOrder::PAIMON_BIG_ENDIAN,
+                                           ByteOrder::PAIMON_LITTLE_ENDIAN));
+
+TEST_P(MemorySegmentOutputStreamTest, TestSimple) {
+    ByteOrder byte_order = GetParam();
+    auto pool = GetDefaultPool();
+    MemorySegmentOutputStream out(/*segment_size=*/8, pool);
+    out.SetOrder(byte_order);
+    out.WriteValue(static_cast<char>(127));            // 1 byte
+    out.WriteValue(static_cast<int16_t>(32067));       // 2 bytes
+    out.WriteValue(static_cast<int32_t>(2147403647));  // 4 bytes
+    // move to next segment
+    out.WriteValue(static_cast<int64_t>(9223372036854775805));  // 8 bytes
+    out.WriteValue(true);                                       // 1 byte
+    std::string str1 = "This is a very very very long sentence.";
+    out.WriteString(str1);  // 39 bytes + 2 bytes len
+    ASSERT_EQ(out.CurrentSize(), 1 + 1 + 2 + 4 + 8 + 41);
+    std::string str2 = "yes";
+    out.WriteString(str2);  // 3 bytes + 2 bytes len
+    std::string str3 = "I have a dream.";
+    out.WriteString(str3);  // 15 bytes + 2 bytes len
+    std::string str4 = "hello";
+    out.WriteValue<int32_t>(str4.size());  // 4 bytes
+    out.Write(str4.data(), str4.size());   // 5 bytes
+
+    ASSERT_EQ(out.CurrentSize(), 1 + 1 + 2 + 4 + 8 + 41 + 5 + 17 + 4 + 5);
+
+    auto bytes = MemorySegmentUtils::CopyToBytes(out.Segments(), 0, 
out.CurrentSize(), pool.get());
+
+    auto input_stream = std::make_shared<ByteArrayInputStream>(bytes->data(), 
bytes->size());
+    DataInputStream in(input_stream);
+    in.SetOrder(byte_order);
+    ASSERT_EQ(127, in.ReadValue<char>().value());
+    ASSERT_EQ(32067, in.ReadValue<int16_t>().value());
+    ASSERT_EQ(2147403647, in.ReadValue<int32_t>().value());
+    ASSERT_EQ(9223372036854775805l, in.ReadValue<int64_t>().value());
+    ASSERT_EQ(true, in.ReadValue<bool>().value());
+    ASSERT_EQ(str1, in.ReadString().value());
+    ASSERT_EQ(str2, in.ReadString().value());
+    ASSERT_EQ(str3, in.ReadString().value());
+    ASSERT_EQ(str4.size(), in.ReadValue<int32_t>().value());
+    std::string read_str4(str4.size(), '\0');
+    ASSERT_OK(in.Read(read_str4.data(), str4.size()));
+    ASSERT_EQ(read_str4, str4);
+    ASSERT_EQ(out.CurrentSize(), input_stream->GetPos().value());
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/io/offset_input_stream.cpp 
b/src/paimon/common/io/offset_input_stream.cpp
new file mode 100644
index 0000000..69593cd
--- /dev/null
+++ b/src/paimon/common/io/offset_input_stream.cpp
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/io/offset_input_stream.h"
+
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/macros.h"
+
+namespace paimon {
+Result<std::unique_ptr<OffsetInputStream>> OffsetInputStream::Create(
+    const std::shared_ptr<InputStream>& wrapped, int64_t length, int64_t 
offset) {
+    if (PAIMON_UNLIKELY(wrapped == nullptr)) {
+        return Status::Invalid("input stream is null pointer");
+    }
+    if (PAIMON_UNLIKELY(offset < 0)) {
+        return Status::Invalid(fmt::format("offset {} is less than 0", 
offset));
+    }
+    if (PAIMON_UNLIKELY(length < -1)) {
+        return Status::Invalid(fmt::format("length {} is less than -1", 
length));
+    }
+    PAIMON_ASSIGN_OR_RAISE(uint64_t total_length, wrapped->Length());
+    if (PAIMON_UNLIKELY((uint64_t)offset > total_length)) {
+        return Status::Invalid(
+            fmt::format("offset {} exceed total length {}", offset, 
total_length));
+    }
+    if (length == -1) {
+        // length == -1 means it's dynamic length, should read to the end
+        length = total_length - offset;
+    }
+    if (PAIMON_UNLIKELY((uint64_t)offset + (uint64_t)length > total_length)) {
+        return Status::Invalid(fmt::format("offset {} + length {} exceed total 
length {}", offset,
+                                           length, total_length));
+    }
+    PAIMON_RETURN_NOT_OK(wrapped->Seek(offset, SeekOrigin::FS_SEEK_SET));
+    return std::unique_ptr<OffsetInputStream>(
+        new OffsetInputStream(std::move(wrapped), length, offset));
+}
+
+OffsetInputStream::OffsetInputStream(const std::shared_ptr<InputStream>& 
wrapped, int64_t length,
+                                     int64_t offset)
+    : wrapped_(wrapped), length_(length), offset_(offset) {}
+
+Status OffsetInputStream::Seek(int64_t offset, SeekOrigin origin) {
+    switch (origin) {
+        case SeekOrigin::FS_SEEK_SET: {
+            inner_position_ = offset;
+            PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_));
+            return wrapped_->Seek(offset_ + inner_position_, 
SeekOrigin::FS_SEEK_SET);
+        }
+        case SeekOrigin::FS_SEEK_CUR: {
+            inner_position_ += offset;
+            PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_));
+            return wrapped_->Seek(offset, SeekOrigin::FS_SEEK_CUR);
+        }
+        case SeekOrigin::FS_SEEK_END: {
+            inner_position_ = length_ + offset;
+            PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_));
+            return wrapped_->Seek(offset_ + inner_position_, 
SeekOrigin::FS_SEEK_SET);
+        }
+        default:
+            return Status::Invalid(
+                "invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR, 
and FS_SEEK_END");
+    }
+    return Status::OK();
+}
+
+Result<int32_t> OffsetInputStream::Read(char* buffer, uint32_t size) {
+    PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_ + size));
+    inner_position_ += size;
+    return wrapped_->Read(buffer, size);
+}
+
+Result<int32_t> OffsetInputStream::Read(char* buffer, uint32_t size, uint64_t 
offset) {
+    PAIMON_RETURN_NOT_OK(AssertBoundary(offset));
+    PAIMON_RETURN_NOT_OK(AssertBoundary(offset + size));
+    return wrapped_->Read(buffer, size, offset_ + offset);
+}
+
+void OffsetInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+                                  std::function<void(Status)>&& callback) {
+    auto status = AssertBoundary(offset);
+    if (!status.ok()) {
+        callback(status);
+        return;
+    }
+    status = AssertBoundary(offset + size);
+    if (!status.ok()) {
+        callback(status);
+        return;
+    }
+    wrapped_->ReadAsync(buffer, size, offset_ + offset, std::move(callback));
+}
+
+Status OffsetInputStream::Close() {
+    return wrapped_->Close();
+}
+
+Result<std::string> OffsetInputStream::GetUri() const {
+    return wrapped_->GetUri();
+}
+
+Result<int64_t> OffsetInputStream::GetPos() const {
+    return inner_position_;
+}
+
+Result<uint64_t> OffsetInputStream::Length() const {
+    return length_;
+}
+
+Status OffsetInputStream::AssertBoundary(int32_t inner_pos) const {
+    if (inner_pos < 0 || inner_pos > length_) {
+        return Status::Invalid(
+            fmt::format("OffsetInputStream assert boundary failed: inner pos 
{} exceed length {}",
+                        inner_pos, length_));
+    }
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/offset_input_stream.h 
b/src/paimon/common/io/offset_input_stream.h
new file mode 100644
index 0000000..38f97ef
--- /dev/null
+++ b/src/paimon/common/io/offset_input_stream.h
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// A `InputStream` wrapping another `InputStream` with offset and length.
+class PAIMON_EXPORT OffsetInputStream : public InputStream {
+ public:
+    static Result<std::unique_ptr<OffsetInputStream>> Create(
+        const std::shared_ptr<InputStream>& wrapped, int64_t length, int64_t 
offset);
+    ~OffsetInputStream() override = default;
+
+    Status Seek(int64_t offset, SeekOrigin origin) override;
+
+    Result<int64_t> GetPos() const override;
+
+    Result<int32_t> Read(char* buffer, uint32_t size) override;
+
+    Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) 
override;
+
+    void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+                   std::function<void(Status)>&& callback) override;
+
+    Result<uint64_t> Length() const override;
+
+    Status Close() override;
+
+    Result<std::string> GetUri() const override;
+
+ private:
+    OffsetInputStream(const std::shared_ptr<InputStream>& wrapped, int64_t 
length, int64_t offset);
+    Status AssertBoundary(int32_t inner_pos) const;
+
+ private:
+    std::shared_ptr<InputStream> wrapped_;
+    const int64_t length_;
+    const int64_t offset_;
+    int64_t inner_position_ = 0;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/io/offset_input_stream_test.cpp 
b/src/paimon/common/io/offset_input_stream_test.cpp
new file mode 100644
index 0000000..3734650
--- /dev/null
+++ b/src/paimon/common/io/offset_input_stream_test.cpp
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/io/offset_input_stream.h"
+
+#include <memory>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(OffsetInputStreamTest, TestBasicConstruction) {
+    auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij", 
/*length=*/10);
+    ASSERT_OK_AND_ASSIGN(auto offset_stream, 
OffsetInputStream::Create(std::move(inner_stream),
+                                                                       
/*length=*/6, /*offset=*/2));
+
+    ASSERT_OK_AND_ASSIGN(auto length, offset_stream->Length());
+    ASSERT_EQ(6, length);
+
+    ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos());
+    ASSERT_EQ(0, pos);
+
+    ASSERT_OK_AND_ASSIGN(auto uri, offset_stream->GetUri());
+    ASSERT_EQ("", uri);
+}
+
+TEST(OffsetInputStreamTest, TestSeekOperations) {
+    auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij", 
/*length=*/10);
+    ASSERT_OK_AND_ASSIGN(auto offset_stream, 
OffsetInputStream::Create(std::move(inner_stream),
+                                                                       
/*length=*/6, /*offset=*/2));
+
+    // Test FS_SEEK_SET
+    ASSERT_OK(offset_stream->Seek(3, SeekOrigin::FS_SEEK_SET));
+    ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos());
+    ASSERT_EQ(3, pos);
+
+    // Test FS_SEEK_CUR
+    ASSERT_OK(offset_stream->Seek(1, SeekOrigin::FS_SEEK_CUR));
+    ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos());
+    ASSERT_EQ(4, pos);
+
+    // Test FS_SEEK_END
+    ASSERT_OK(offset_stream->Seek(-2, SeekOrigin::FS_SEEK_END));
+    ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos());
+    ASSERT_EQ(4, pos);
+
+    // Test boundary conditions
+    ASSERT_NOK(offset_stream->Seek(-10, SeekOrigin::FS_SEEK_SET));
+    ASSERT_NOK(offset_stream->Seek(10, SeekOrigin::FS_SEEK_SET));
+    ASSERT_NOK(offset_stream->Seek(10, SeekOrigin::FS_SEEK_CUR));
+    ASSERT_NOK(offset_stream->Seek(10, SeekOrigin::FS_SEEK_END));
+}
+
+TEST(OffsetInputStreamTest, TestReadOperations) {
+    auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij", 
/*length=*/10);
+    ASSERT_OK_AND_ASSIGN(auto offset_stream, 
OffsetInputStream::Create(std::move(inner_stream),
+                                                                       
/*length=*/6, /*offset=*/2));
+
+    // Test sequential read
+    std::string buffer(4, '\0');
+    ASSERT_OK_AND_ASSIGN(auto bytes_read, offset_stream->Read(buffer.data(), 
/*size=*/4));
+    ASSERT_EQ(4, bytes_read);
+    ASSERT_EQ("cdef", buffer);
+
+    ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos());
+    ASSERT_EQ(4, pos);
+
+    // Test read with offset
+    std::string buffer2(2, '\0');
+    ASSERT_OK_AND_ASSIGN(bytes_read, offset_stream->Read(buffer2.data(), 
/*size=*/2, /*offset=*/1));
+    ASSERT_EQ(2, bytes_read);
+    ASSERT_EQ("de", buffer2);
+
+    // Position should not change after offset read
+    ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos());
+    ASSERT_EQ(4, pos);
+
+    // Test close
+    ASSERT_OK(offset_stream->Close());
+}
+
+TEST(OffsetInputStreamTest, TestReadAsync) {
+    auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij", 
/*length=*/10);
+    ASSERT_OK_AND_ASSIGN(auto offset_stream, 
OffsetInputStream::Create(std::move(inner_stream),
+                                                                       
/*length=*/6, /*offset=*/2));
+
+    std::string buffer1(3, '\0'), buffer2(2, '\0');
+    bool callback_called1 = false, callback_called2 = false;
+    Status callback_status1, callback_status2;
+
+    auto callback1 = [&](Status status) {
+        callback_called1 = true;
+        callback_status1 = status;
+    };
+    auto callback2 = [&](Status status) {
+        callback_called2 = true;
+        callback_status2 = status;
+    };
+
+    offset_stream->ReadAsync(buffer1.data(), 3, 1, std::move(callback1));
+    offset_stream->ReadAsync(buffer2.data(), 2, 3, std::move(callback2));
+
+    ASSERT_TRUE(callback_called1);
+    ASSERT_OK(callback_status1);
+    ASSERT_EQ("def", buffer1);
+    ASSERT_TRUE(callback_called2);
+    ASSERT_OK(callback_status2);
+    ASSERT_EQ("fg", buffer2);
+
+    // Position should not change after offset read
+    ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos());
+    ASSERT_EQ(0, pos);
+}
+
+TEST(OffsetInputStreamTest, TestBoundaryValidation) {
+    auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij", 
/*length=*/10);
+    ASSERT_OK_AND_ASSIGN(auto offset_stream, 
OffsetInputStream::Create(std::move(inner_stream),
+                                                                       
/*length=*/6, /*offset=*/2));
+
+    // Test read beyond boundary
+    std::string buffer(10, '\0');
+    ASSERT_NOK_WITH_MSG(offset_stream->Read(buffer.data(), /*size=*/10),
+                        "assert boundary failed: inner pos 10 exceed length 
6");
+
+    // Test offset read beyond boundary
+    ASSERT_NOK_WITH_MSG(offset_stream->Read(buffer.data(), /*size=*/4, 
/*offset=*/5),
+                        "assert boundary failed: inner pos 9 exceed length 6");
+}
+
+TEST(OffsetInputStreamTest, TestReadWithUnspecifiedLength) {
+    auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij", 
/*length=*/10);
+    // Use -1 for length to test dynamic length calculation
+    ASSERT_OK_AND_ASSIGN(
+        auto offset_stream,
+        OffsetInputStream::Create(std::move(inner_stream), /*length=*/-1, 
/*offset=*/2));
+
+    // Test that length is calculated correctly
+    ASSERT_OK_AND_ASSIGN(auto length, offset_stream->Length());
+    // Should be total length (10) minus offset (2) = 8
+    ASSERT_EQ(8, length);
+
+    // Test sequential read within the calculated bounds
+    std::string buffer(4, '\0');
+    ASSERT_OK_AND_ASSIGN(auto bytes_read, offset_stream->Read(buffer.data(), 
/*size=*/4));
+    ASSERT_EQ(4, bytes_read);
+    ASSERT_EQ("cdef", buffer);
+
+    ASSERT_OK_AND_ASSIGN(auto pos, offset_stream->GetPos());
+    ASSERT_EQ(4, pos);
+
+    // Test read with offset within the calculated bounds
+    std::string buffer2(3, '\0');
+    ASSERT_OK_AND_ASSIGN(bytes_read, offset_stream->Read(buffer2.data(), 
/*size=*/3, /*offset=*/5));
+    ASSERT_EQ(3, bytes_read);
+    ASSERT_EQ("hij", buffer2);
+
+    // Position should not change after offset read
+    ASSERT_OK_AND_ASSIGN(pos, offset_stream->GetPos());
+    ASSERT_EQ(4, pos);
+
+    // Test boundary validation with dynamic length
+    std::string buffer3(10, '\0');
+    ASSERT_NOK_WITH_MSG(offset_stream->Read(buffer3.data(), /*size=*/10),
+                        "assert boundary failed: inner pos 14 exceed length 
8");
+}
+
+TEST(OffsetInputStreamTest, TestInvalidParameters) {
+    // Test null wrapped stream
+    ASSERT_NOK_WITH_MSG(OffsetInputStream::Create(nullptr, /*length=*/6, 
/*offset=*/2),
+                        "input stream is null pointer");
+
+    // Test negative offset
+    auto inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij", 
/*length=*/10);
+    ASSERT_NOK_WITH_MSG(
+        OffsetInputStream::Create(std::move(inner_stream), /*length=*/6, 
/*offset=*/-1),
+        "offset -1 is less than 0");
+
+    // Test length less than -1
+    inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij", 
/*length=*/10);
+    ASSERT_NOK_WITH_MSG(
+        OffsetInputStream::Create(std::move(inner_stream), /*length=*/-2, 
/*offset=*/2),
+        "length -2 is less than -1");
+
+    // Test length + offset beyond wrapped stream length
+    inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij", 
/*length=*/10);
+    ASSERT_NOK_WITH_MSG(
+        OffsetInputStream::Create(std::move(inner_stream), /*length=*/8, 
/*offset=*/7),
+        "offset 7 + length 8 exceed total length 10");
+
+    // Test dynamic length with offset beyond wrapped stream length
+    inner_stream = std::make_unique<ByteArrayInputStream>("abcdefghij", 
/*length=*/10);
+    ASSERT_NOK_WITH_MSG(
+        OffsetInputStream::Create(std::move(inner_stream), /*length=*/-1, 
/*offset=*/15),
+        "offset 15 exceed total length 10");
+}
+
+}  // namespace paimon::test


Reply via email to