This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new fc80e4bd feat(io): add streaming FileIO support (#641)
fc80e4bd is described below

commit fc80e4bdbafcd659e4b44fb9fb8ae7960a08c2d1
Author: Gang Wu <[email protected]>
AuthorDate: Fri May 8 22:18:40 2026 +0800

    feat(io): add streaming FileIO support (#641)
    
    Introduce InputFile/OutputFile stream APIs and Arrow IO adapters so
    bundled Avro/Parquet readers and writers can work with generic FileIO
    implementations.
---
 example/demo_example.cc                            |   2 +-
 src/iceberg/CMakeLists.txt                         |   5 +-
 src/iceberg/arrow/arrow_fs_file_io.cc              | 108 ----
 src/iceberg/arrow/arrow_io.cc                      | 590 +++++++++++++++++++++
 ...w_fs_file_io_internal.h => arrow_io_internal.h} |  44 +-
 .../{file_io_register.cc => arrow_io_register.cc}  |   4 +-
 .../{file_io_register.h => arrow_io_register.h}    |   2 +-
 .../arrow/{arrow_file_io.h => arrow_io_util.h}     |   0
 src/iceberg/arrow/s3/arrow_s3_file_io.cc           |   4 +-
 src/iceberg/avro/avro_reader.cc                    |  12 +-
 src/iceberg/avro/avro_writer.cc                    |   7 +-
 src/iceberg/file_io.cc                             | 103 ++++
 src/iceberg/file_io.h                              | 105 +++-
 src/iceberg/file_reader.h                          |   4 +-
 src/iceberg/file_writer.h                          |   4 +-
 src/iceberg/meson.build                            |   1 +
 src/iceberg/parquet/parquet_reader.cc              |  12 +-
 src/iceberg/parquet/parquet_writer.cc              |   7 +-
 src/iceberg/test/CMakeLists.txt                    |   2 +-
 src/iceberg/test/arrow_fs_file_io_test.cc          |  67 ---
 src/iceberg/test/arrow_io_test.cc                  | 486 +++++++++++++++++
 src/iceberg/test/arrow_s3_file_io_test.cc          |   2 +-
 src/iceberg/test/avro_test.cc                      |  18 +-
 src/iceberg/test/data_writer_test.cc               |   2 +-
 src/iceberg/test/delete_file_index_test.cc         |   2 +-
 src/iceberg/test/delete_loader_test.cc             |   2 +-
 src/iceberg/test/file_scan_task_test.cc            |  43 +-
 src/iceberg/test/gzip_decompress_test.cc           |   8 +-
 src/iceberg/test/in_memory_catalog_test.cc         |   2 +-
 src/iceberg/test/manifest_group_test.cc            |   2 +-
 src/iceberg/test/manifest_list_versions_test.cc    |   2 +-
 src/iceberg/test/manifest_reader_stats_test.cc     |   2 +-
 src/iceberg/test/manifest_reader_test.cc           |   2 +-
 src/iceberg/test/manifest_writer_versions_test.cc  |   2 +-
 src/iceberg/test/metadata_io_test.cc               |   2 +-
 src/iceberg/test/parquet_test.cc                   |  44 +-
 src/iceberg/test/rolling_manifest_writer_test.cc   |   2 +-
 src/iceberg/test/scan_test_base.h                  |   2 +-
 src/iceberg/test/std_io.h                          | 299 +++++++++--
 src/iceberg/test/update_location_test.cc           |   2 +-
 src/iceberg/test/update_partition_spec_test.cc     |   2 +-
 src/iceberg/test/update_test_base.h                |   2 +-
 42 files changed, 1700 insertions(+), 313 deletions(-)

diff --git a/example/demo_example.cc b/example/demo_example.cc
index 6869aa37..3c8745be 100644
--- a/example/demo_example.cc
+++ b/example/demo_example.cc
@@ -19,7 +19,7 @@
 
 #include <iostream>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/catalog/memory/in_memory_catalog.h"
 #include "iceberg/manifest/manifest_entry.h"
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 02099f6b..c4e193b8 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -36,6 +36,7 @@ set(ICEBERG_SOURCES
     expression/rewrite_not.cc
     expression/strict_metrics_evaluator.cc
     expression/term.cc
+    file_io.cc
     file_io_registry.cc
     file_reader.cc
     file_writer.cc
@@ -218,9 +219,9 @@ add_subdirectory(util)
 
 if(ICEBERG_BUILD_BUNDLE)
   set(ICEBERG_BUNDLE_SOURCES
-      arrow/arrow_fs_file_io.cc
+      arrow/arrow_io.cc
       arrow/s3/arrow_s3_file_io.cc
-      arrow/file_io_register.cc
+      arrow/arrow_io_register.cc
       arrow/metadata_column_util.cc
       avro/avro_data_util.cc
       avro/avro_direct_decoder.cc
diff --git a/src/iceberg/arrow/arrow_fs_file_io.cc 
b/src/iceberg/arrow/arrow_fs_file_io.cc
deleted file mode 100644
index 769fcfb1..00000000
--- a/src/iceberg/arrow/arrow_fs_file_io.cc
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <chrono>
-
-#include <arrow/filesystem/localfs.h>
-#include <arrow/filesystem/mockfs.h>
-
-#include "iceberg/arrow/arrow_file_io.h"
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/arrow/arrow_status_internal.h"
-#include "iceberg/util/macros.h"
-
-namespace iceberg::arrow {
-
-Result<std::string> ArrowFileSystemFileIO::ResolvePath(const std::string& 
file_location) {
-  if (file_location.find("://") != std::string::npos) {
-    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto path, 
arrow_fs_->PathFromUri(file_location));
-    return path;
-  }
-  return file_location;
-}
-
-/// \brief Read the content of the file at the given location.
-Result<std::string> ArrowFileSystemFileIO::ReadFile(const std::string& 
file_location,
-                                                    std::optional<size_t> 
length) {
-  ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
-  ::arrow::fs::FileInfo file_info(path);
-  if (length.has_value()) {
-    file_info.set_size(length.value());
-  }
-  std::string content;
-  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, 
arrow_fs_->OpenInputFile(file_info));
-  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
-
-  content.resize(file_size);
-  size_t remain = file_size;
-  size_t offset = 0;
-  while (remain > 0) {
-    size_t read_length = std::min(remain, static_cast<size_t>(1024 * 1024));
-    ICEBERG_ARROW_ASSIGN_OR_RETURN(
-        auto read_bytes,
-        file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
-    if (read_bytes == 0) {
-      return IOError("Unexpected EOF reading {}: got {} of {} bytes", 
file_location,
-                     offset, file_size);
-    }
-    remain -= read_bytes;
-    offset += read_bytes;
-  }
-
-  return content;
-}
-
-/// \brief Write the given content to the file at the given location.
-Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
-                                        std::string_view content) {
-  ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
-  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(path));
-  ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
-  ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
-  ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
-  return {};
-}
-
-/// \brief Delete a file at the given location.
-Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) {
-  ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
-  ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(path));
-  return {};
-}
-
-std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeMockFileIO() {
-  return std::make_unique<ArrowFileSystemFileIO>(
-      std::make_shared<::arrow::fs::internal::MockFileSystem>(
-          std::chrono::system_clock::now()));
-}
-
-std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeLocalFileIO() {
-  return std::make_unique<ArrowFileSystemFileIO>(
-      std::make_shared<::arrow::fs::LocalFileSystem>());
-}
-
-std::unique_ptr<FileIO> MakeMockFileIO() {
-  return ArrowFileSystemFileIO::MakeMockFileIO();
-}
-
-std::unique_ptr<FileIO> MakeLocalFileIO() {
-  return ArrowFileSystemFileIO::MakeLocalFileIO();
-}
-
-}  // namespace iceberg::arrow
diff --git a/src/iceberg/arrow/arrow_io.cc b/src/iceberg/arrow/arrow_io.cc
new file mode 100644
index 00000000..a515f338
--- /dev/null
+++ b/src/iceberg/arrow/arrow_io.cc
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <algorithm>
+#include <chrono>
+#include <limits>
+#include <mutex>
+#include <optional>
+
+#include <arrow/buffer.h>
+#include <arrow/filesystem/localfs.h>
+#include <arrow/filesystem/mockfs.h>
+#include <arrow/io/interfaces.h>
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_io_util.h"
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::arrow {
+
+namespace {
+
+Result<int64_t> ToInt64Length(size_t length) {
+  if (length > static_cast<size_t>(std::numeric_limits<int64_t>::max())) {
+    return InvalidArgument("File length {} exceeds int64_t max", length);
+  }
+  return static_cast<int64_t>(length);
+}
+
+::arrow::Status ToArrowStatus(const Error& error) {
+  switch (error.kind) {
+    case ErrorKind::kInvalid:
+    case ErrorKind::kInvalidArgument:
+      return ::arrow::Status::Invalid(error.message);
+    case ErrorKind::kNotImplemented:
+    case ErrorKind::kNotSupported:
+      return ::arrow::Status::NotImplemented(error.message);
+    default:
+      return ::arrow::Status::IOError(error.message);
+  }
+}
+
+::arrow::Result<int64_t> BytesToReadAt(int64_t position, int64_t nbytes, 
int64_t size) {
+  if (position < 0 || nbytes < 0) {
+    return ::arrow::Status::Invalid("ReadAt position and length must be 
non-negative");
+  }
+  if (position > size) {
+    return ::arrow::Status::IOError("Read out of bounds (offset = ", position,
+                                    ", size = ", nbytes, ") in file of size ", 
size);
+  }
+  return std::min(nbytes, size - position);
+}
+
+/// Adapts the generic Iceberg input stream API to Arrow's RandomAccessFile 
API.
+///
+/// Avro and Parquet readers in the bundle layer consume Arrow IO streams. This
+/// fallback keeps those readers usable with non-Arrow FileIO implementations 
without
+/// exposing Arrow filesystem details through the generic FileIO interface.
+class InputStreamAdapter : public ::arrow::io::RandomAccessFile {
+ public:
+  InputStreamAdapter(std::unique_ptr<SeekableInputStream> input, int64_t size)
+      : input_(std::move(input)), size_(size) {
+    RandomAccessFile::set_mode(::arrow::io::FileMode::READ);
+  }
+
+  ::arrow::Status Close() override {
+    std::lock_guard lock(mutex_);
+    if (closed_) {
+      return ::arrow::Status::OK();
+    }
+    auto status = input_->Close();
+    if (!status.has_value()) {
+      return ToArrowStatus(status.error());
+    }
+    closed_ = true;
+    return ::arrow::Status::OK();
+  }
+
+  ::arrow::Result<int64_t> Tell() const override {
+    std::lock_guard lock(mutex_);
+    ARROW_RETURN_NOT_OK(CheckOpenLocked());
+    auto position = input_->Position();
+    if (!position.has_value()) {
+      return ToArrowStatus(position.error());
+    }
+    if (position.value() < 0) {
+      return ::arrow::Status::IOError("FileIO input stream returned negative 
position");
+    }
+    return position.value();
+  }
+
+  bool closed() const override {
+    std::lock_guard lock(mutex_);
+    return closed_;
+  }
+
+  ::arrow::Status Seek(int64_t position) override {
+    std::lock_guard lock(mutex_);
+    ARROW_RETURN_NOT_OK(CheckOpenLocked());
+    auto status = input_->Seek(position);
+    if (!status.has_value()) {
+      return ToArrowStatus(status.error());
+    }
+    return ::arrow::Status::OK();
+  }
+
+  ::arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+    if (nbytes < 0) {
+      return ::arrow::Status::Invalid("Cannot read a negative number of 
bytes");
+    }
+    std::lock_guard lock(mutex_);
+    ARROW_RETURN_NOT_OK(CheckOpenLocked());
+    if (nbytes == 0) {
+      return 0;
+    }
+    auto data = reinterpret_cast<std::byte*>(out);
+    auto result = input_->Read(std::span(data, static_cast<size_t>(nbytes)));
+    if (!result.has_value()) {
+      return ToArrowStatus(result.error());
+    }
+    if (result.value() < 0 || result.value() > nbytes) {
+      return ::arrow::Status::IOError("FileIO input stream returned invalid 
byte count");
+    }
+    return result.value();
+  }
+
+  ::arrow::Result<std::shared_ptr<::arrow::Buffer>> Read(int64_t nbytes) 
override {
+    if (nbytes < 0) {
+      return ::arrow::Status::Invalid("Cannot read a negative number of 
bytes");
+    }
+    ARROW_ASSIGN_OR_RAISE(auto buffer, 
::arrow::AllocateResizableBuffer(nbytes));
+    ARROW_ASSIGN_OR_RAISE(auto bytes_read, Read(nbytes, 
buffer->mutable_data()));
+    ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, /*shrink_to_fit=*/false));
+    return std::shared_ptr<::arrow::Buffer>(std::move(buffer));
+  }
+
+  ::arrow::Result<int64_t> GetSize() override { return size_; }
+
+  ::arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) 
override {
+    std::lock_guard lock(mutex_);
+    ARROW_RETURN_NOT_OK(CheckOpenLocked());
+    ARROW_ASSIGN_OR_RAISE(auto bytes_to_read, BytesToReadAt(position, nbytes, 
size_));
+    if (bytes_to_read == 0) {
+      return 0;
+    }
+    auto data = reinterpret_cast<std::byte*>(out);
+    auto status =
+        input_->ReadFully(position, std::span(data, 
static_cast<size_t>(bytes_to_read)));
+    if (!status.has_value()) {
+      return ToArrowStatus(status.error());
+    }
+    return bytes_to_read;
+  }
+
+  ::arrow::Result<std::shared_ptr<::arrow::Buffer>> ReadAt(int64_t position,
+                                                           int64_t nbytes) 
override {
+    {
+      std::lock_guard lock(mutex_);
+      ARROW_RETURN_NOT_OK(CheckOpenLocked());
+    }
+    ARROW_ASSIGN_OR_RAISE(auto bytes_to_read, BytesToReadAt(position, nbytes, 
size_));
+    ARROW_ASSIGN_OR_RAISE(auto buffer, 
::arrow::AllocateResizableBuffer(bytes_to_read));
+    if (bytes_to_read == 0) {
+      return std::shared_ptr<::arrow::Buffer>(std::move(buffer));
+    }
+    std::lock_guard lock(mutex_);
+    ARROW_RETURN_NOT_OK(CheckOpenLocked());
+    auto status = input_->ReadFully(
+        position, 
std::span(reinterpret_cast<std::byte*>(buffer->mutable_data()),
+                            static_cast<size_t>(bytes_to_read)));
+    if (!status.has_value()) {
+      return ToArrowStatus(status.error());
+    }
+    return std::shared_ptr<::arrow::Buffer>(std::move(buffer));
+  }
+
+ private:
+  ::arrow::Status CheckOpenLocked() const {
+    if (closed_) {
+      return ::arrow::Status::IOError("Operation on closed FileIO input 
stream");
+    }
+    return ::arrow::Status::OK();
+  }
+
+  std::unique_ptr<SeekableInputStream> input_;
+  int64_t size_;
+  bool closed_ = false;
+  mutable std::mutex mutex_;
+};
+
+/// Adapts the generic Iceberg output stream API to Arrow's OutputStream API.
+///
+/// Avro and Parquet writers in the bundle layer consume Arrow IO streams. This
+/// fallback keeps those writers usable with non-Arrow FileIO implementations 
without
+/// requiring them to downcast to ArrowFileSystemFileIO.
+class OutputStreamAdapter : public ::arrow::io::OutputStream {
+ public:
+  explicit OutputStreamAdapter(std::unique_ptr<PositionOutputStream> output)
+      : output_(std::move(output)) {
+    OutputStream::set_mode(::arrow::io::FileMode::WRITE);
+  }
+
+  ::arrow::Status Close() override {
+    std::lock_guard lock(mutex_);
+    if (closed_) {
+      return ::arrow::Status::OK();
+    }
+    auto status = output_->Close();
+    if (!status.has_value()) {
+      return ToArrowStatus(status.error());
+    }
+    closed_ = true;
+    return ::arrow::Status::OK();
+  }
+
+  ::arrow::Result<int64_t> Tell() const override {
+    std::lock_guard lock(mutex_);
+    ARROW_RETURN_NOT_OK(CheckOpenLocked());
+    auto position = output_->Position();
+    if (!position.has_value()) {
+      return ToArrowStatus(position.error());
+    }
+    if (position.value() < 0) {
+      return ::arrow::Status::IOError("FileIO output stream returned negative 
position");
+    }
+    return position.value();
+  }
+
+  bool closed() const override {
+    std::lock_guard lock(mutex_);
+    return closed_;
+  }
+
+  ::arrow::Status Write(const void* data, int64_t nbytes) override {
+    if (nbytes < 0) {
+      return ::arrow::Status::Invalid("Cannot write a negative number of 
bytes");
+    }
+    std::lock_guard lock(mutex_);
+    ARROW_RETURN_NOT_OK(CheckOpenLocked());
+    if (nbytes == 0) {
+      return ::arrow::Status::OK();
+    }
+    auto status = output_->Write(
+        std::span(reinterpret_cast<const std::byte*>(data), 
static_cast<size_t>(nbytes)));
+    if (!status.has_value()) {
+      return ToArrowStatus(status.error());
+    }
+    return ::arrow::Status::OK();
+  }
+
+  ::arrow::Status Flush() override {
+    std::lock_guard lock(mutex_);
+    ARROW_RETURN_NOT_OK(CheckOpenLocked());
+    auto status = output_->Flush();
+    if (!status.has_value()) {
+      return ToArrowStatus(status.error());
+    }
+    return ::arrow::Status::OK();
+  }
+
+ private:
+  ::arrow::Status CheckOpenLocked() const {
+    if (closed_) {
+      return ::arrow::Status::IOError("Operation on closed FileIO output 
stream");
+    }
+    return ::arrow::Status::OK();
+  }
+
+  std::unique_ptr<PositionOutputStream> output_;
+  bool closed_ = false;
+  mutable std::mutex mutex_;
+};
+
+class ArrowSeekableInputStream : public SeekableInputStream {
+ public:
+  explicit 
ArrowSeekableInputStream(std::shared_ptr<::arrow::io::RandomAccessFile> input)
+      : input_(std::move(input)) {}
+
+  Result<int64_t> Position() const override {
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto position, input_->Tell());
+    return position;
+  }
+
+  Status Seek(int64_t position) override {
+    ICEBERG_ARROW_RETURN_NOT_OK(input_->Seek(position));
+    return {};
+  }
+
+  Result<int64_t> Read(std::span<std::byte> out) override {
+    ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(out.size()));
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto bytes_read, input_->Read(size, 
out.data()));
+    if (bytes_read < 0 || bytes_read > size) {
+      return IOError("Arrow input stream returned invalid byte count");
+    }
+    return bytes_read;
+  }
+
+  Status ReadFully(int64_t position, std::span<std::byte> out) override {
+    if (position < 0) {
+      return InvalidArgument("Cannot read from negative position {}", 
position);
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(out.size()));
+    if (size == 0) {
+      return {};
+    }
+    if (position > std::numeric_limits<int64_t>::max() - size) {
+      return InvalidArgument(
+          "Read range starting at {} with length {} exceeds int64_t max", 
position, size);
+    }
+
+    Status read_status = {};
+    int64_t bytes_read = 0;
+    while (bytes_read < size) {
+      auto* data = out.data() + bytes_read;
+      auto remaining = size - bytes_read;
+      auto read_result = input_->ReadAt(position + bytes_read, remaining, 
data);
+      if (!read_result.ok()) {
+        read_status =
+            std::unexpected<Error>{{.kind = ToErrorKind(read_result.status()),
+                                    .message = 
read_result.status().ToString()}};
+        break;
+      }
+      auto read = read_result.ValueOrDie();
+      if (read < 0 || read > remaining) {
+        read_status = IOError("Arrow input stream returned invalid byte 
count");
+        break;
+      }
+      if (read == 0) {
+        read_status =
+            IOError("Unexpected EOF reading at offset {}", position + 
bytes_read);
+        break;
+      }
+      bytes_read += read;
+    }
+    return read_status;
+  }
+
+  Status Close() override {
+    if (input_->closed()) {
+      return {};
+    }
+    ICEBERG_ARROW_RETURN_NOT_OK(input_->Close());
+    return {};
+  }
+
+ private:
+  std::shared_ptr<::arrow::io::RandomAccessFile> input_;
+};
+
+class ArrowPositionOutputStream : public PositionOutputStream {
+ public:
+  explicit 
ArrowPositionOutputStream(std::shared_ptr<::arrow::io::OutputStream> output)
+      : output_(std::move(output)) {}
+
+  Result<int64_t> Position() const override {
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto position, output_->Tell());
+    return position;
+  }
+
+  Status Write(std::span<const std::byte> data) override {
+    ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(data.size()));
+    ICEBERG_ARROW_RETURN_NOT_OK(output_->Write(data.data(), size));
+    return {};
+  }
+
+  Status Flush() override {
+    ICEBERG_ARROW_RETURN_NOT_OK(output_->Flush());
+    return {};
+  }
+
+  Status Close() override {
+    if (output_->closed()) {
+      return {};
+    }
+    ICEBERG_ARROW_RETURN_NOT_OK(output_->Close());
+    return {};
+  }
+
+ private:
+  std::shared_ptr<::arrow::io::OutputStream> output_;
+};
+
+class ArrowInputFile : public InputFile {
+ public:
+  ArrowInputFile(std::shared_ptr<::arrow::fs::FileSystem> fs, std::string 
location,
+                 std::string path, std::optional<int64_t> file_size)
+      : fs_(std::move(fs)),
+        location_(std::move(location)),
+        path_(std::move(path)),
+        file_size_(file_size) {}
+
+  std::string_view location() const override { return location_; }
+
+  Result<int64_t> Size() const override {
+    if (file_size_.has_value()) {
+      return *file_size_;
+    }
+    ::arrow::fs::FileInfo file_info(path_, ::arrow::fs::FileType::File);
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, fs_->OpenInputFile(file_info));
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto size, input->GetSize());
+    return size;
+  }
+
+  Result<std::unique_ptr<SeekableInputStream>> Open() override {
+    ::arrow::fs::FileInfo file_info(path_, ::arrow::fs::FileType::File);
+    if (file_size_.has_value()) {
+      file_info.set_size(*file_size_);
+    }
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, fs_->OpenInputFile(file_info));
+    return std::make_unique<ArrowSeekableInputStream>(std::move(input));
+  }
+
+ private:
+  std::shared_ptr<::arrow::fs::FileSystem> fs_;
+  std::string location_;
+  std::string path_;
+  std::optional<int64_t> file_size_;
+};
+
+class ArrowOutputFile : public OutputFile {
+ public:
+  ArrowOutputFile(std::shared_ptr<::arrow::fs::FileSystem> fs, std::string 
location,
+                  std::string path)
+      : fs_(std::move(fs)), location_(std::move(location)), 
path_(std::move(path)) {}
+
+  std::string_view location() const override { return location_; }
+
+  Result<std::unique_ptr<PositionOutputStream>> Create() override {
+    return Create(/*overwrite=*/false);
+  }
+
+  Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() override {
+    return Create(/*overwrite=*/true);
+  }
+
+ private:
+  Result<std::unique_ptr<PositionOutputStream>> Create(bool overwrite) {
+    if (!overwrite) {
+      ICEBERG_ARROW_ASSIGN_OR_RETURN(auto info, fs_->GetFileInfo(path_));
+      if (info.type() != ::arrow::fs::FileType::NotFound) {
+        return AlreadyExists("File already exists: {}", location_);
+      }
+    }
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, fs_->OpenOutputStream(path_));
+    return std::make_unique<ArrowPositionOutputStream>(std::move(output));
+  }
+
+  std::shared_ptr<::arrow::fs::FileSystem> fs_;
+  std::string location_;
+  std::string path_;
+};
+
+}  // namespace
+
+Result<std::string> ArrowFileSystemFileIO::ResolvePath(const std::string& 
file_location) {
+  if (file_location.find("://") != std::string::npos) {
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto path, 
arrow_fs_->PathFromUri(file_location));
+    return path;
+  }
+  return file_location;
+}
+
+Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenArrowInputStream(
+    const std::shared_ptr<FileIO>& io, const std::string& path,
+    std::optional<size_t> length) {
+  ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+
+  if (auto arrow_io = std::dynamic_pointer_cast<ArrowFileSystemFileIO>(io)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto resolved_path, arrow_io->ResolvePath(path));
+    ::arrow::fs::FileInfo file_info(resolved_path, 
::arrow::fs::FileType::File);
+    if (length.has_value()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(*length));
+      file_info.set_size(size);
+    }
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input,
+                                   
arrow_io->arrow_fs_->OpenInputFile(file_info));
+    return input;
+  }
+
+  int64_t size;
+  std::unique_ptr<InputFile> input_file;
+  if (length.has_value()) {
+    ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path, *length));
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path));
+  }
+  ICEBERG_ASSIGN_OR_RAISE(size, input_file->Size());
+  if (size < 0) {
+    return Invalid("Invalid negative file size {} for {}", size, path);
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto input, input_file->Open());
+  return std::make_shared<InputStreamAdapter>(std::move(input), size);
+}
+
+Result<std::shared_ptr<::arrow::io::OutputStream>> OpenArrowOutputStream(
+    const std::shared_ptr<FileIO>& io, const std::string& path, bool 
overwrite) {
+  ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+
+  if (auto arrow_io = std::dynamic_pointer_cast<ArrowFileSystemFileIO>(io)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto resolved_path, arrow_io->ResolvePath(path));
+    if (!overwrite) {
+      ICEBERG_ARROW_ASSIGN_OR_RETURN(auto info,
+                                     
arrow_io->arrow_fs_->GetFileInfo(resolved_path));
+      if (info.type() != ::arrow::fs::FileType::NotFound) {
+        return AlreadyExists("File already exists: {}", path);
+      }
+    }
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output,
+                                   
arrow_io->arrow_fs_->OpenOutputStream(resolved_path));
+    return output;
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto output_file, io->NewOutputFile(path));
+  std::unique_ptr<PositionOutputStream> output;
+  if (overwrite) {
+    ICEBERG_ASSIGN_OR_RAISE(output, output_file->CreateOrOverwrite());
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(output, output_file->Create());
+  }
+  return std::make_shared<OutputStreamAdapter>(std::move(output));
+}
+
+Result<std::unique_ptr<InputFile>> ArrowFileSystemFileIO::NewInputFile(
+    std::string file_location) {
+  ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
+  return std::make_unique<ArrowInputFile>(arrow_fs_, std::move(file_location),
+                                          std::move(path), std::nullopt);
+}
+
+Result<std::unique_ptr<InputFile>> ArrowFileSystemFileIO::NewInputFile(
+    std::string file_location, size_t length) {
+  ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(length));
+  ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
+  return std::make_unique<ArrowInputFile>(arrow_fs_, std::move(file_location),
+                                          std::move(path), size);
+}
+
+Result<std::unique_ptr<OutputFile>> ArrowFileSystemFileIO::NewOutputFile(
+    std::string file_location) {
+  ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
+  return std::make_unique<ArrowOutputFile>(arrow_fs_, std::move(file_location),
+                                           std::move(path));
+}
+
+/// \brief Delete a file at the given location.
+Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) {
+  ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
+  ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(path));
+  return {};
+}
+
+std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeMockFileIO() {
+  return std::make_unique<ArrowFileSystemFileIO>(
+      std::make_shared<::arrow::fs::internal::MockFileSystem>(
+          std::chrono::system_clock::now()));
+}
+
+std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeLocalFileIO() {
+  return std::make_unique<ArrowFileSystemFileIO>(
+      std::make_shared<::arrow::fs::LocalFileSystem>());
+}
+
+std::unique_ptr<FileIO> MakeMockFileIO() {
+  return ArrowFileSystemFileIO::MakeMockFileIO();
+}
+
+std::unique_ptr<FileIO> MakeLocalFileIO() {
+  return ArrowFileSystemFileIO::MakeLocalFileIO();
+}
+
+}  // namespace iceberg::arrow
diff --git a/src/iceberg/arrow/arrow_fs_file_io_internal.h 
b/src/iceberg/arrow/arrow_io_internal.h
similarity index 50%
rename from src/iceberg/arrow/arrow_fs_file_io_internal.h
rename to src/iceberg/arrow/arrow_io_internal.h
index 92a99150..4f170a8a 100644
--- a/src/iceberg/arrow/arrow_fs_file_io_internal.h
+++ b/src/iceberg/arrow/arrow_io_internal.h
@@ -19,15 +19,37 @@
 
 #pragma once
 
+#include <cstddef>
 #include <memory>
+#include <optional>
+#include <string>
 
-#include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/type_fwd.h>
+#include <arrow/io/type_fwd.h>
 
 #include "iceberg/file_io.h"
 #include "iceberg/iceberg_bundle_export.h"
 
 namespace iceberg::arrow {
 
+/// \brief Open a FileIO input as an Arrow input stream.
+///
+/// Uses ArrowFileSystemFileIO's native Arrow stream directly when possible 
and falls
+/// back to a FileIO stream adapter otherwise. The fallback requires FileIO to
+/// implement NewInputFile.
+ICEBERG_BUNDLE_EXPORT Result<std::shared_ptr<::arrow::io::RandomAccessFile>>
+OpenArrowInputStream(const std::shared_ptr<FileIO>& io, const std::string& 
path,
+                     std::optional<size_t> length = std::nullopt);
+
+/// \brief Open a FileIO output as an Arrow output stream.
+///
+/// Uses ArrowFileSystemFileIO's native Arrow stream directly when possible 
and falls
+/// back to a FileIO stream adapter otherwise. The fallback requires FileIO to
+/// implement NewOutputFile.
+ICEBERG_BUNDLE_EXPORT Result<std::shared_ptr<::arrow::io::OutputStream>>
+OpenArrowOutputStream(const std::shared_ptr<FileIO>& io, const std::string& 
path,
+                      bool overwrite = true);
+
 /// \brief A concrete implementation of FileIO for Arrow file system.
 class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
  public:
@@ -42,12 +64,15 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public 
FileIO {
 
   ~ArrowFileSystemFileIO() override = default;
 
-  /// \brief Read the content of the file at the given location.
-  Result<std::string> ReadFile(const std::string& file_location,
-                               std::optional<size_t> length) override;
+  /// \brief Create an input file handle for the given location.
+  Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location) 
override;
+
+  /// \brief Create an input file handle for the given location with a known 
length.
+  Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location,
+                                                  size_t length) override;
 
-  /// \brief Write the given content to the file at the given location.
-  Status WriteFile(const std::string& file_location, std::string_view content) 
override;
+  /// \brief Create an output file handle for the given location.
+  Result<std::unique_ptr<OutputFile>> NewOutputFile(std::string file_location) 
override;
 
   /// \brief Delete a file at the given location.
   Status DeleteFile(const std::string& file_location) override;
@@ -56,6 +81,13 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public 
FileIO {
   const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return 
arrow_fs_; }
 
  private:
+  friend Result<std::shared_ptr<::arrow::io::RandomAccessFile>> 
OpenArrowInputStream(
+      const std::shared_ptr<FileIO>& io, const std::string& path,
+      std::optional<size_t> length);
+
+  friend Result<std::shared_ptr<::arrow::io::OutputStream>> 
OpenArrowOutputStream(
+      const std::shared_ptr<FileIO>& io, const std::string& path, bool 
overwrite);
+
   /// \brief Resolve a file location to a filesystem path.
   Result<std::string> ResolvePath(const std::string& file_location);
 
diff --git a/src/iceberg/arrow/file_io_register.cc 
b/src/iceberg/arrow/arrow_io_register.cc
similarity index 95%
rename from src/iceberg/arrow/file_io_register.cc
rename to src/iceberg/arrow/arrow_io_register.cc
index 1140e49b..43273c0a 100644
--- a/src/iceberg/arrow/file_io_register.cc
+++ b/src/iceberg/arrow/arrow_io_register.cc
@@ -15,12 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "iceberg/arrow/file_io_register.h"
+#include "iceberg/arrow/arrow_io_register.h"
 
 #include <mutex>
 #include <string>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/file_io_registry.h"
 
 namespace iceberg::arrow {
diff --git a/src/iceberg/arrow/file_io_register.h 
b/src/iceberg/arrow/arrow_io_register.h
similarity index 96%
rename from src/iceberg/arrow/file_io_register.h
rename to src/iceberg/arrow/arrow_io_register.h
index 1b4622bd..f28b7a56 100644
--- a/src/iceberg/arrow/file_io_register.h
+++ b/src/iceberg/arrow/arrow_io_register.h
@@ -19,7 +19,7 @@
 
 #pragma once
 
-/// \file iceberg/arrow/file_io_register.h
+/// \file iceberg/arrow/arrow_io_register.h
 /// \brief Provide functions to register Arrow FileIO implementations.
 
 #include "iceberg/iceberg_bundle_export.h"
diff --git a/src/iceberg/arrow/arrow_file_io.h 
b/src/iceberg/arrow/arrow_io_util.h
similarity index 100%
rename from src/iceberg/arrow/arrow_file_io.h
rename to src/iceberg/arrow/arrow_io_util.h
diff --git a/src/iceberg/arrow/s3/arrow_s3_file_io.cc 
b/src/iceberg/arrow/s3/arrow_s3_file_io.cc
index 808415d0..cffd9584 100644
--- a/src/iceberg/arrow/s3/arrow_s3_file_io.cc
+++ b/src/iceberg/arrow/s3/arrow_s3_file_io.cc
@@ -27,8 +27,8 @@
 #  include <arrow/filesystem/s3fs.h>
 #endif
 
-#include "iceberg/arrow/arrow_file_io.h"
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/arrow/arrow_status_internal.h"
 #include "iceberg/arrow/s3/s3_properties.h"
 #include "iceberg/util/macros.h"
diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc
index f4985d9a..1d431c46 100644
--- a/src/iceberg/avro/avro_reader.cc
+++ b/src/iceberg/avro/avro_reader.cc
@@ -31,7 +31,7 @@
 #include <avro/Generic.hh>
 #include <avro/GenericDatum.hh>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/arrow/arrow_status_internal.h"
 #include "iceberg/arrow/metadata_column_util_internal.h"
 #include "iceberg/avro/avro_data_util_internal.h"
@@ -42,7 +42,6 @@
 #include "iceberg/metadata_columns.h"
 #include "iceberg/name_mapping.h"
 #include "iceberg/schema_internal.h"
-#include "iceberg/util/checked_cast.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg::avro {
@@ -51,13 +50,8 @@ namespace {
 
 Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const 
ReaderOptions& options,
                                                            int64_t 
buffer_size) {
-  ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
-  if (options.length) {
-    file_info.set_size(options.length.value());
-  }
-
-  auto io = 
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
-  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, 
io->fs()->OpenInputFile(file_info));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto file, arrow::OpenArrowInputStream(options.io, options.path, 
options.length));
   return std::make_unique<AvroInputStream>(file, buffer_size);
 }
 
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index 32ce3f63..63fc3146 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -29,7 +29,7 @@
 #include <avro/Generic.hh>
 #include <avro/GenericDatum.hh>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/arrow/arrow_status_internal.h"
 #include "iceberg/avro/avro_data_util_internal.h"
 #include "iceberg/avro/avro_direct_encoder_internal.h"
@@ -40,7 +40,6 @@
 #include "iceberg/metrics_config.h"
 #include "iceberg/schema.h"
 #include "iceberg/schema_internal.h"
-#include "iceberg/util/checked_cast.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg::avro {
@@ -49,8 +48,8 @@ namespace {
 
 Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const 
WriterOptions& options,
                                                              int64_t 
buffer_size) {
-  auto io = 
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
-  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, 
io->fs()->OpenOutputStream(options.path));
+  ICEBERG_ASSIGN_OR_RAISE(auto output,
+                          arrow::OpenArrowOutputStream(options.io, 
options.path));
   return std::make_unique<AvroOutputStream>(output, buffer_size);
 }
 
diff --git a/src/iceberg/file_io.cc b/src/iceberg/file_io.cc
new file mode 100644
index 00000000..d76ffeb6
--- /dev/null
+++ b/src/iceberg/file_io.cc
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/file_io.h"
+
+#include <limits>
+#include <utility>
+
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+Status FinishWithCloseStatus(Status operation_status, Status close_status) {
+  if (!operation_status.has_value()) {
+    auto error = operation_status.error();
+    if (!close_status.has_value()) {
+      error.message += "; additionally failed to close stream: ";
+      error.message += close_status.error().message;
+    }
+    return std::unexpected<Error>(std::move(error));
+  }
+  return close_status;
+}
+
+}  // namespace
+
+Result<std::unique_ptr<InputFile>> FileIO::NewInputFile(std::string 
file_location) {
+  return NotImplemented("NewInputFile not implemented for {}", file_location);
+}
+
+Result<std::unique_ptr<InputFile>> FileIO::NewInputFile(std::string 
file_location,
+                                                        size_t /*length*/) {
+  return NewInputFile(std::move(file_location));
+}
+
+Result<std::unique_ptr<OutputFile>> FileIO::NewOutputFile(std::string 
file_location) {
+  return NotImplemented("NewOutputFile not implemented for {}", file_location);
+}
+
+Result<std::string> FileIO::ReadFile(const std::string& file_location,
+                                     std::optional<size_t> length) {
+  int64_t read_size;
+  std::unique_ptr<InputFile> input_file;
+  if (length.has_value()) {
+    if (*length > static_cast<size_t>(std::numeric_limits<int64_t>::max())) {
+      return InvalidArgument("Requested read length {} exceeds int64_t max", 
*length);
+    }
+    ICEBERG_ASSIGN_OR_RAISE(input_file, NewInputFile(file_location, *length));
+    read_size = static_cast<int64_t>(*length);
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(input_file, NewInputFile(file_location));
+    ICEBERG_ASSIGN_OR_RAISE(read_size, input_file->Size());
+  }
+  if (read_size < 0) {
+    return Invalid("Invalid negative file size {} for {}", read_size, 
file_location);
+  }
+
+  auto size = static_cast<size_t>(read_size);
+  std::string content(size, '\0');
+  ICEBERG_ASSIGN_OR_RAISE(auto stream, input_file->Open());
+  Status read_status = {};
+  if (size > 0) {
+    auto bytes = std::as_writable_bytes(std::span(content.data(), 
content.size()));
+    read_status = stream->ReadFully(/*position=*/0, bytes);
+  }
+  ICEBERG_RETURN_UNEXPECTED(
+      FinishWithCloseStatus(std::move(read_status), stream->Close()));
+  return content;
+}
+
+Status FileIO::WriteFile(const std::string& file_location, std::string_view 
content) {
+  ICEBERG_ASSIGN_OR_RAISE(auto output_file, NewOutputFile(file_location));
+  ICEBERG_ASSIGN_OR_RAISE(auto stream, output_file->CreateOrOverwrite());
+  Status status = {};
+  if (!content.empty()) {
+    auto bytes = std::as_bytes(std::span(content.data(), content.size()));
+    status = stream->Write(bytes);
+  }
+  if (status.has_value()) {
+    status = stream->Flush();
+  }
+  return FinishWithCloseStatus(std::move(status), stream->Close());
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h
index 259da755..e772b533 100644
--- a/src/iceberg/file_io.h
+++ b/src/iceberg/file_io.h
@@ -19,7 +19,11 @@
 
 #pragma once
 
+#include <cstddef>
+#include <cstdint>
+#include <memory>
 #include <optional>
+#include <span>
 #include <string>
 #include <string_view>
 
@@ -28,11 +32,82 @@
 
 namespace iceberg {
 
+/// \brief Seekable byte stream for reading file contents.
+class ICEBERG_EXPORT SeekableInputStream {
+ public:
+  virtual ~SeekableInputStream() = default;
+
+  /// \brief Return the current read position.
+  virtual Result<int64_t> Position() const = 0;
+
+  /// \brief Seek to an absolute byte position.
+  virtual Status Seek(int64_t position) = 0;
+
+  /// \brief Read up to out.size() bytes from the current position.
+  virtual Result<int64_t> Read(std::span<std::byte> out) = 0;
+
+  /// \brief Read exactly out.size() bytes from an absolute position.
+  ///
+  /// Fails if fewer than out.size() bytes are available. The current stream 
position
+  /// after this call is unspecified; callers should Seek before subsequent
+  /// position-dependent reads.
+  virtual Status ReadFully(int64_t position, std::span<std::byte> out) = 0;
+
+  /// \brief Close the stream. Implementations should allow repeated Close 
calls.
+  virtual Status Close() = 0;
+};
+
+/// \brief Positioned byte stream for writing file contents.
+class ICEBERG_EXPORT PositionOutputStream {
+ public:
+  virtual ~PositionOutputStream() = default;
+
+  /// \brief Return the current write position.
+  virtual Result<int64_t> Position() const = 0;
+
+  /// \brief Write all bytes in data at the current position.
+  virtual Status Write(std::span<const std::byte> data) = 0;
+
+  /// \brief Flush buffered data to the underlying store.
+  virtual Status Flush() = 0;
+
+  /// \brief Close the stream. Implementations should allow repeated Close 
calls.
+  virtual Status Close() = 0;
+};
+
+/// \brief Handle for opening a readable file.
+class ICEBERG_EXPORT InputFile {
+ public:
+  virtual ~InputFile() = default;
+
+  /// \brief File location represented by this handle.
+  virtual std::string_view location() const = 0;
+
+  /// \brief Return the total file size in bytes.
+  virtual Result<int64_t> Size() const = 0;
+
+  /// \brief Open a new independent input stream.
+  virtual Result<std::unique_ptr<SeekableInputStream>> Open() = 0;
+};
+
+/// \brief Handle for creating a writable file.
+class ICEBERG_EXPORT OutputFile {
+ public:
+  virtual ~OutputFile() = default;
+
+  /// \brief File location represented by this handle.
+  virtual std::string_view location() const = 0;
+
+  /// \brief Create a new output stream and fail if the file already exists.
+  virtual Result<std::unique_ptr<PositionOutputStream>> Create() = 0;
+
+  /// \brief Create a new output stream, replacing any existing file.
+  virtual Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() = 
0;
+};
+
 /// \brief Pluggable module for reading, writing, and deleting files.
 ///
-/// This module only handle metadata files, not data files. The metadata files
-/// are typically small and are used to store schema, partition information,
-/// and other metadata about the table.
+/// This module handles metadata and data file bytes for table IO.
 ///
 /// Note that these functions are not atomic. For example, if a write fails,
 /// the file may be partially written. Implementations should be careful to
@@ -42,6 +117,19 @@ class ICEBERG_EXPORT FileIO {
   FileIO() = default;
   virtual ~FileIO() = default;
 
+  /// \brief Create an input file handle for the given location.
+  virtual Result<std::unique_ptr<InputFile>> NewInputFile(std::string 
file_location);
+
+  /// \brief Create an input file handle for the given location with a known 
length.
+  ///
+  /// The length is a caller-provided content length hint. Implementations may 
use it to
+  /// avoid an extra metadata lookup.
+  virtual Result<std::unique_ptr<InputFile>> NewInputFile(std::string 
file_location,
+                                                          size_t length);
+
+  /// \brief Create an output file handle for the given location.
+  virtual Result<std::unique_ptr<OutputFile>> NewOutputFile(std::string 
file_location);
+
   /// \brief Read the content of the file at the given location.
   ///
   /// \param file_location The location of the file to read.
@@ -50,21 +138,14 @@ class ICEBERG_EXPORT FileIO {
   /// \return The content of the file if the read succeeded, an error code if 
the read
   /// failed.
   virtual Result<std::string> ReadFile(const std::string& file_location,
-                                       std::optional<size_t> length) {
-    // We provide a default implementation to avoid Windows linker error 
LNK2019.
-    return NotImplemented("ReadFile not implemented");
-  }
+                                       std::optional<size_t> length);
 
   /// \brief Write the given content to the file at the given location.
   ///
   /// \param file_location The location of the file to write.
   /// \param content The content to write to the file.
-  /// \param overwrite If true, overwrite the file if it exists. If false, 
fail if the
-  /// file exists.
   /// \return void if the write succeeded, an error code if the write failed.
-  virtual Status WriteFile(const std::string& file_location, std::string_view 
content) {
-    return NotImplemented("WriteFile not implemented");
-  }
+  virtual Status WriteFile(const std::string& file_location, std::string_view 
content);
 
   /// \brief Delete a file at the given location.
   ///
diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h
index 923ac6bd..c31d9b29 100644
--- a/src/iceberg/file_reader.h
+++ b/src/iceberg/file_reader.h
@@ -95,9 +95,7 @@ struct ICEBERG_EXPORT ReaderOptions {
   std::optional<size_t> length;
   /// \brief The split to read.
   std::optional<Split> split;
-  /// \brief FileIO instance to open the file. Reader implementations should 
down cast it
-  /// to the specific FileIO implementation. By default, the `iceberg-bundle` 
library uses
-  /// `ArrowFileSystemFileIO` as the default implementation.
+  /// \brief FileIO instance to open the file.
   std::shared_ptr<class FileIO> io;
   /// \brief The projection schema to read from the file. This field is 
required.
   std::shared_ptr<class Schema> projection;
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index f3352d8f..a49b5228 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -73,9 +73,7 @@ struct ICEBERG_EXPORT WriterOptions {
   std::string path;
   /// \brief The schema of the data to write.
   std::shared_ptr<Schema> schema;
-  /// \brief FileIO instance to open the file. Writer implementations should 
down cast it
-  /// to the specific FileIO implementation. By default, the `iceberg-bundle` 
library uses
-  /// `ArrowFileSystemFileIO` as the default implementation.
+  /// \brief FileIO instance to create the file.
   std::shared_ptr<class FileIO> io;
   /// \brief Metadata to write to the file.
   std::unordered_map<std::string, std::string> metadata;
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 41a5c2dd..c2947f3f 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -58,6 +58,7 @@ iceberg_sources = files(
     'expression/rewrite_not.cc',
     'expression/strict_metrics_evaluator.cc',
     'expression/term.cc',
+    'file_io.cc',
     'file_io_registry.cc',
     'file_reader.cc',
     'file_writer.cc',
diff --git a/src/iceberg/parquet/parquet_reader.cc 
b/src/iceberg/parquet/parquet_reader.cc
index 0e2808f5..775644a9 100644
--- a/src/iceberg/parquet/parquet_reader.cc
+++ b/src/iceberg/parquet/parquet_reader.cc
@@ -32,7 +32,7 @@
 #include <parquet/file_reader.h>
 #include <parquet/properties.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/arrow/arrow_status_internal.h"
 #include "iceberg/arrow/metadata_column_util_internal.h"
 #include "iceberg/parquet/parquet_data_util_internal.h"
@@ -41,7 +41,6 @@
 #include "iceberg/result.h"
 #include "iceberg/schema_internal.h"
 #include "iceberg/schema_util.h"
-#include "iceberg/util/checked_cast.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg::parquet {
@@ -50,14 +49,7 @@ namespace {
 
 Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenInputStream(
     const ReaderOptions& options) {
-  ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
-  if (options.length) {
-    file_info.set_size(options.length.value());
-  }
-
-  auto io = 
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
-  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, 
io->fs()->OpenInputFile(file_info));
-  return input;
+  return arrow::OpenArrowInputStream(options.io, options.path, options.length);
 }
 
 Result<SchemaProjection> BuildProjection(::parquet::arrow::FileReader* reader,
diff --git a/src/iceberg/parquet/parquet_writer.cc 
b/src/iceberg/parquet/parquet_writer.cc
index a68e9e61..7e2d3d15 100644
--- a/src/iceberg/parquet/parquet_writer.cc
+++ b/src/iceberg/parquet/parquet_writer.cc
@@ -29,10 +29,9 @@
 #include <parquet/file_writer.h>
 #include <parquet/properties.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/arrow/arrow_status_internal.h"
 #include "iceberg/schema_internal.h"
-#include "iceberg/util/checked_cast.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg::parquet {
@@ -41,9 +40,7 @@ namespace {
 
 Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
     const WriterOptions& options) {
-  auto io = 
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
-  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, 
io->fs()->OpenOutputStream(options.path));
-  return output;
+  return arrow::OpenArrowOutputStream(options.io, options.path);
 }
 
 Result<::arrow::Compression::type> ParseCompression(const WriterProperties& 
properties) {
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 6b98951a..1d80b29a 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -156,7 +156,7 @@ if(ICEBERG_BUILD_BUNDLE)
   add_iceberg_test(arrow_test
                    USE_BUNDLE
                    SOURCES
-                   arrow_fs_file_io_test.cc
+                   arrow_io_test.cc
                    arrow_test.cc
                    gzip_decompress_test.cc
                    metadata_io_test.cc
diff --git a/src/iceberg/test/arrow_fs_file_io_test.cc 
b/src/iceberg/test/arrow_fs_file_io_test.cc
deleted file mode 100644
index eacda2f7..00000000
--- a/src/iceberg/test/arrow_fs_file_io_test.cc
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <arrow/filesystem/localfs.h>
-#include <gtest/gtest.h>
-
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/test/matchers.h"
-#include "iceberg/test/temp_file_test_base.h"
-
-namespace iceberg {
-
-class LocalFileIOTest : public TempFileTestBase {
- protected:
-  void SetUp() override {
-    TempFileTestBase::SetUp();
-    file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
-        std::make_shared<::arrow::fs::LocalFileSystem>());
-    temp_filepath_ = CreateNewTempFilePath();
-  }
-
-  std::shared_ptr<iceberg::FileIO> file_io_;
-  std::string temp_filepath_;
-};
-
-TEST_F(LocalFileIOTest, ReadWriteFile) {
-  auto read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
-  EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
-  EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file"));
-
-  auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
-  EXPECT_THAT(write_res, IsOk());
-
-  read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
-  EXPECT_THAT(read_res, IsOk());
-  EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
-}
-
-TEST_F(LocalFileIOTest, DeleteFile) {
-  auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
-  EXPECT_THAT(write_res, IsOk());
-
-  auto del_res = file_io_->DeleteFile(temp_filepath_);
-  EXPECT_THAT(del_res, IsOk());
-
-  del_res = file_io_->DeleteFile(temp_filepath_);
-  EXPECT_THAT(del_res, IsError(ErrorKind::kIOError));
-  EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
-}
-
-}  // namespace iceberg
diff --git a/src/iceberg/test/arrow_io_test.cc 
b/src/iceberg/test/arrow_io_test.cc
new file mode 100644
index 00000000..0c885d07
--- /dev/null
+++ b/src/iceberg/test/arrow_io_test.cc
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <algorithm>
+#include <array>
+#include <memory>
+#include <string>
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <arrow/status.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/std_io.h"
+#include "iceberg/test/temp_file_test_base.h"
+
+namespace iceberg {
+
+namespace {
+
+struct CloseState {
+  bool closed = false;
+};
+
+class ReadFailureInputStream : public SeekableInputStream {
+ public:
+  explicit ReadFailureInputStream(std::shared_ptr<CloseState> state)
+      : state_(std::move(state)) {}
+
+  Result<int64_t> Position() const override { return 0; }
+
+  Status Seek(int64_t /*position*/) override { return {}; }
+
+  Result<int64_t> Read(std::span<std::byte> /*out*/) override { return 0; }
+
+  Status ReadFully(int64_t /*position*/, std::span<std::byte> /*out*/) 
override {
+    return IOError("read failed");
+  }
+
+  Status Close() override {
+    state_->closed = true;
+    return IOError("close failed");
+  }
+
+ private:
+  std::shared_ptr<CloseState> state_;
+};
+
+class ReadFailureInputFile : public InputFile {
+ public:
+  explicit ReadFailureInputFile(std::shared_ptr<CloseState> state)
+      : state_(std::move(state)) {}
+
+  std::string_view location() const override { return "read-failure"; }
+
+  Result<int64_t> Size() const override { return 4; }
+
+  Result<std::unique_ptr<SeekableInputStream>> Open() override {
+    return std::make_unique<ReadFailureInputStream>(state_);
+  }
+
+ private:
+  std::shared_ptr<CloseState> state_;
+};
+
+class ReadFailureFileIO : public FileIO {
+ public:
+  explicit ReadFailureFileIO(std::shared_ptr<CloseState> state)
+      : state_(std::move(state)) {}
+
+  Result<std::unique_ptr<InputFile>> NewInputFile(
+      std::string /*file_location*/) override {
+    return std::make_unique<ReadFailureInputFile>(state_);
+  }
+
+ private:
+  std::shared_ptr<CloseState> state_;
+};
+
+class WriteFailureOutputStream : public PositionOutputStream {
+ public:
+  explicit WriteFailureOutputStream(std::shared_ptr<CloseState> state)
+      : state_(std::move(state)) {}
+
+  Result<int64_t> Position() const override { return 0; }
+
+  Status Write(std::span<const std::byte> /*data*/) override {
+    return IOError("write failed");
+  }
+
+  Status Flush() override { return {}; }
+
+  Status Close() override {
+    state_->closed = true;
+    return IOError("close failed");
+  }
+
+ private:
+  std::shared_ptr<CloseState> state_;
+};
+
+class WriteFailureOutputFile : public OutputFile {
+ public:
+  explicit WriteFailureOutputFile(std::shared_ptr<CloseState> state)
+      : state_(std::move(state)) {}
+
+  std::string_view location() const override { return "write-failure"; }
+
+  Result<std::unique_ptr<PositionOutputStream>> Create() override {
+    return std::make_unique<WriteFailureOutputStream>(state_);
+  }
+
+  Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() override {
+    return std::make_unique<WriteFailureOutputStream>(state_);
+  }
+
+ private:
+  std::shared_ptr<CloseState> state_;
+};
+
+class WriteFailureFileIO : public FileIO {
+ public:
+  explicit WriteFailureFileIO(std::shared_ptr<CloseState> state)
+      : state_(std::move(state)) {}
+
+  Result<std::unique_ptr<OutputFile>> NewOutputFile(
+      std::string /*file_location*/) override {
+    return std::make_unique<WriteFailureOutputFile>(state_);
+  }
+
+ private:
+  std::shared_ptr<CloseState> state_;
+};
+
+struct PermissiveReadState {
+  std::string data;
+  bool closed = false;
+  int64_t position = 0;
+};
+
+class PermissiveInputStream : public SeekableInputStream {
+ public:
+  explicit PermissiveInputStream(std::shared_ptr<PermissiveReadState> state)
+      : state_(std::move(state)) {}
+
+  Result<int64_t> Position() const override { return state_->position; }
+
+  Status Seek(int64_t position) override {
+    if (position < 0) {
+      return InvalidArgument("Cannot seek to negative position {}", position);
+    }
+    state_->position = position;
+    return {};
+  }
+
+  Result<int64_t> Read(std::span<std::byte> out) override {
+    auto position = static_cast<size_t>(state_->position);
+    if (position >= state_->data.size()) {
+      return 0;
+    }
+    auto bytes_to_read = std::min(out.size(), state_->data.size() - position);
+    std::copy_n(reinterpret_cast<const std::byte*>(state_->data.data() + 
position),
+                bytes_to_read, out.data());
+    state_->position += static_cast<int64_t>(bytes_to_read);
+    return static_cast<int64_t>(bytes_to_read);
+  }
+
+  Status ReadFully(int64_t position, std::span<std::byte> out) override {
+    if (position < 0) {
+      return InvalidArgument("Cannot read from negative position {}", 
position);
+    }
+    auto offset = static_cast<size_t>(position);
+    if (offset > state_->data.size() || out.size() > state_->data.size() - 
offset) {
+      return IOError("Unexpected EOF");
+    }
+    std::copy_n(reinterpret_cast<const std::byte*>(state_->data.data() + 
offset),
+                out.size(), out.data());
+    return {};
+  }
+
+  Status Close() override {
+    state_->closed = true;
+    return {};
+  }
+
+ private:
+  std::shared_ptr<PermissiveReadState> state_;
+};
+
+class PermissiveInputFile : public InputFile {
+ public:
+  explicit PermissiveInputFile(std::shared_ptr<PermissiveReadState> state)
+      : state_(std::move(state)) {}
+
+  std::string_view location() const override { return "permissive-input"; }
+
+  Result<int64_t> Size() const override {
+    return static_cast<int64_t>(state_->data.size());
+  }
+
+  Result<std::unique_ptr<SeekableInputStream>> Open() override {
+    return std::make_unique<PermissiveInputStream>(state_);
+  }
+
+ private:
+  std::shared_ptr<PermissiveReadState> state_;
+};
+
+class PermissiveInputFileIO : public FileIO {
+ public:
+  explicit PermissiveInputFileIO(std::shared_ptr<PermissiveReadState> state)
+      : state_(std::move(state)) {}
+
+  Result<std::unique_ptr<InputFile>> NewInputFile(
+      std::string /*file_location*/) override {
+    return std::make_unique<PermissiveInputFile>(state_);
+  }
+
+ private:
+  std::shared_ptr<PermissiveReadState> state_;
+};
+
+struct PermissiveWriteState {
+  std::string data;
+  bool closed = false;
+};
+
+class PermissiveOutputStream : public PositionOutputStream {
+ public:
+  explicit PermissiveOutputStream(std::shared_ptr<PermissiveWriteState> state)
+      : state_(std::move(state)) {}
+
+  Result<int64_t> Position() const override {
+    return static_cast<int64_t>(state_->data.size());
+  }
+
+  Status Write(std::span<const std::byte> data) override {
+    state_->data.append(reinterpret_cast<const char*>(data.data()), 
data.size());
+    return {};
+  }
+
+  Status Flush() override { return {}; }
+
+  Status Close() override {
+    state_->closed = true;
+    return {};
+  }
+
+ private:
+  std::shared_ptr<PermissiveWriteState> state_;
+};
+
+class PermissiveOutputFile : public OutputFile {
+ public:
+  explicit PermissiveOutputFile(std::shared_ptr<PermissiveWriteState> state)
+      : state_(std::move(state)) {}
+
+  std::string_view location() const override { return "permissive-output"; }
+
+  Result<std::unique_ptr<PositionOutputStream>> Create() override {
+    return std::make_unique<PermissiveOutputStream>(state_);
+  }
+
+  Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() override {
+    return std::make_unique<PermissiveOutputStream>(state_);
+  }
+
+ private:
+  std::shared_ptr<PermissiveWriteState> state_;
+};
+
+class PermissiveOutputFileIO : public FileIO {
+ public:
+  explicit PermissiveOutputFileIO(std::shared_ptr<PermissiveWriteState> state)
+      : state_(std::move(state)) {}
+
+  Result<std::unique_ptr<OutputFile>> NewOutputFile(
+      std::string /*file_location*/) override {
+    return std::make_unique<PermissiveOutputFile>(state_);
+  }
+
+ private:
+  std::shared_ptr<PermissiveWriteState> state_;
+};
+
+}  // namespace
+
+class LocalFileIOTest : public TempFileTestBase {
+ protected:
+  void SetUp() override {
+    TempFileTestBase::SetUp();
+    file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
+        std::make_shared<::arrow::fs::LocalFileSystem>());
+    temp_filepath_ = CreateNewTempFilePath();
+  }
+
+  std::shared_ptr<iceberg::FileIO> file_io_;
+  std::string temp_filepath_;
+};
+
+TEST_F(LocalFileIOTest, ReadWriteFile) {
+  auto read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
+  EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
+  EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file"));
+
+  auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
+  EXPECT_THAT(write_res, IsOk());
+
+  read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
+  EXPECT_THAT(read_res, IsOk());
+  EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
+}
+
+TEST_F(LocalFileIOTest, DeleteFile) {
+  auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
+  EXPECT_THAT(write_res, IsOk());
+
+  auto del_res = file_io_->DeleteFile(temp_filepath_);
+  EXPECT_THAT(del_res, IsOk());
+
+  del_res = file_io_->DeleteFile(temp_filepath_);
+  EXPECT_THAT(del_res, IsError(ErrorKind::kIOError));
+  EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
+}
+
+void VerifyReadFullyReadsFromAbsolutePosition(const std::shared_ptr<FileIO>& 
file_io,
+                                              const std::string& path) {
+  ASSERT_THAT(file_io->WriteFile(path, "abcdef"), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto input_file, file_io->NewInputFile(path));
+  ICEBERG_UNWRAP_OR_FAIL(auto stream, input_file->Open());
+  ASSERT_THAT(stream->Seek(5), IsOk());
+
+  std::array<std::byte, 2> buffer;
+  ASSERT_THAT(stream->ReadFully(1, buffer), IsOk());
+
+  std::string data(reinterpret_cast<const char*>(buffer.data()), 
buffer.size());
+  EXPECT_EQ(data, "bc");
+
+  ASSERT_THAT(stream->Seek(5), IsOk());
+  std::array<std::byte, 1> next;
+  ICEBERG_UNWRAP_OR_FAIL(auto bytes_read, stream->Read(next));
+  ASSERT_EQ(bytes_read, 1);
+  EXPECT_EQ(next[0], std::byte{'f'});
+}
+
+TEST_F(LocalFileIOTest, ReadFullyReadsFromAbsolutePosition) {
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyReadFullyReadsFromAbsolutePosition(file_io_, temp_filepath_));
+}
+
+TEST_F(LocalFileIOTest, StdReadFullyReadsFromAbsolutePosition) {
+  auto file_io = std::make_shared<test::StdFileIO>();
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyReadFullyReadsFromAbsolutePosition(file_io, temp_filepath_));
+}
+
+TEST_F(LocalFileIOTest, StdReadKeepsPositionAvailableAtEof) {
+  auto file_io = std::make_shared<test::StdFileIO>();
+  ASSERT_THAT(file_io->WriteFile(temp_filepath_, "abc"), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto input_file, 
file_io->NewInputFile(temp_filepath_));
+  ICEBERG_UNWRAP_OR_FAIL(auto stream, input_file->Open());
+
+  std::array<std::byte, 8> buffer;
+  ICEBERG_UNWRAP_OR_FAIL(auto bytes_read, stream->Read(buffer));
+  EXPECT_EQ(bytes_read, 3);
+  EXPECT_THAT(stream->Position(), HasValue(::testing::Eq(3)));
+
+  ICEBERG_UNWRAP_OR_FAIL(bytes_read, stream->Read(buffer));
+  EXPECT_EQ(bytes_read, 0);
+  EXPECT_THAT(stream->Position(), HasValue(::testing::Eq(3)));
+}
+
+TEST(FileIOAdapterTest, InputAdapterRejectsReadsAfterClose) {
+  auto state = std::make_shared<PermissiveReadState>();
+  state->data = "abc";
+  auto file_io = std::make_shared<PermissiveInputFileIO>(state);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto input, arrow::OpenArrowInputStream(file_io, 
"input"));
+  ASSERT_TRUE(input->Close().ok());
+  ASSERT_TRUE(input->Close().ok());
+  ASSERT_TRUE(state->closed);
+
+  std::array<std::byte, 1> out;
+  auto result = input->Read(static_cast<int64_t>(out.size()), out.data());
+  auto read_at_result = input->ReadAt(0, static_cast<int64_t>(out.size()), 
out.data());
+
+  EXPECT_FALSE(result.ok());
+  EXPECT_THAT(result.status().ToString(), ::testing::HasSubstr("closed"));
+  EXPECT_FALSE(read_at_result.ok());
+  EXPECT_THAT(read_at_result.status().ToString(), 
::testing::HasSubstr("closed"));
+  EXPECT_EQ(state->position, 0);
+}
+
+TEST(FileIOAdapterTest, InputAdapterRejectsReadAtBeyondKnownSize) {
+  auto state = std::make_shared<PermissiveReadState>();
+  state->data = "abc";
+  auto file_io = std::make_shared<PermissiveInputFileIO>(state);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto input, arrow::OpenArrowInputStream(file_io, 
"input"));
+
+  std::array<std::byte, 1> out;
+  auto read_at_end = input->ReadAt(3, static_cast<int64_t>(out.size()), 
out.data());
+  auto read_past_end = input->ReadAt(4, static_cast<int64_t>(out.size()), 
out.data());
+
+  ASSERT_TRUE(read_at_end.ok());
+  EXPECT_EQ(read_at_end.ValueOrDie(), 0);
+  EXPECT_FALSE(read_past_end.ok());
+  EXPECT_THAT(read_past_end.status().ToString(), ::testing::HasSubstr("out of 
bounds"));
+}
+
+TEST(FileIOAdapterTest, InputAdapterUsesInputFileSizeWithLengthHint) {
+  auto state = std::make_shared<PermissiveReadState>();
+  state->data = "abc";
+  auto file_io = std::make_shared<PermissiveInputFileIO>(state);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto input, arrow::OpenArrowInputStream(file_io, 
"input", 99));
+  auto size = input->GetSize();
+
+  ASSERT_TRUE(size.ok()) << size.status().ToString();
+  EXPECT_EQ(size.ValueOrDie(), 3);
+}
+
+TEST(FileIOAdapterTest, OutputAdapterRejectsWritesAfterClose) {
+  auto state = std::make_shared<PermissiveWriteState>();
+  auto file_io = std::make_shared<PermissiveOutputFileIO>(state);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto output, arrow::OpenArrowOutputStream(file_io, 
"output"));
+  ASSERT_TRUE(output->Close().ok());
+  ASSERT_TRUE(output->Close().ok());
+  ASSERT_TRUE(state->closed);
+
+  auto status = output->Write("x", 1);
+  auto flush_status = output->Flush();
+
+  EXPECT_FALSE(status.ok());
+  EXPECT_THAT(status.ToString(), ::testing::HasSubstr("closed"));
+  EXPECT_FALSE(flush_status.ok());
+  EXPECT_THAT(flush_status.ToString(), ::testing::HasSubstr("closed"));
+  EXPECT_TRUE(state->data.empty());
+}
+
+TEST(FileIOTest, ReadFileReturnsReadErrorWithCloseContext) {
+  auto state = std::make_shared<CloseState>();
+  ReadFailureFileIO file_io(state);
+
+  auto result = file_io.ReadFile("read-failure", std::nullopt);
+
+  EXPECT_TRUE(state->closed);
+  EXPECT_THAT(result, IsError(ErrorKind::kIOError));
+  EXPECT_THAT(result, HasErrorMessage("read failed"));
+  EXPECT_THAT(result, HasErrorMessage("close failed"));
+}
+
+TEST(FileIOTest, WriteFileReturnsWriteErrorWithCloseContext) {
+  auto state = std::make_shared<CloseState>();
+  WriteFailureFileIO file_io(state);
+
+  auto result = file_io.WriteFile("write-failure", "data");
+
+  EXPECT_TRUE(state->closed);
+  EXPECT_THAT(result, IsError(ErrorKind::kIOError));
+  EXPECT_THAT(result, HasErrorMessage("write failed"));
+  EXPECT_THAT(result, HasErrorMessage("close failed"));
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/arrow_s3_file_io_test.cc 
b/src/iceberg/test/arrow_s3_file_io_test.cc
index d890ad10..b1caff1e 100644
--- a/src/iceberg/test/arrow_s3_file_io_test.cc
+++ b/src/iceberg/test/arrow_s3_file_io_test.cc
@@ -26,7 +26,7 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/arrow/s3/s3_properties.h"
 #include "iceberg/test/matchers.h"
 
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index 82da97ea..b74fe829 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -23,12 +23,13 @@
 #include <arrow/array.h>
 #include <arrow/array/array_base.h>
 #include <arrow/c/bridge.h>
+#include <arrow/filesystem/filesystem.h>
 #include <arrow/json/from_string.h>
 #include <avro/DataFile.hh>
 #include <avro/Generic.hh>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/avro/avro_stream_internal.h"
 #include "iceberg/avro/avro_writer.h"
@@ -37,16 +38,19 @@
 #include "iceberg/schema.h"
 #include "iceberg/schema_internal.h"
 #include "iceberg/test/matchers.h"
+#include "iceberg/test/std_io.h"
+#include "iceberg/test/temp_file_test_base.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
 
 namespace iceberg::avro {
 
-class AvroReaderTest : public ::testing::Test {
+class AvroReaderTest : public TempFileTestBase {
  protected:
   static void SetUpTestSuite() { RegisterAll(); }
 
   void SetUp() override {
+    TempFileTestBase::SetUp();
     file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
     temp_avro_file_ = "avro_reader_test.avro";
   }
@@ -187,6 +191,16 @@ TEST_F(AvroReaderTest, ReadTwoFields) {
   ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
 }
 
+TEST_F(AvroReaderTest, RoundTripWithGenericFileIO) {
+  file_io_ = std::make_shared<iceberg::test::StdFileIO>();
+  temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro");
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+      SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())});
+
+  ASSERT_NO_FATAL_FAILURE(WriteAndVerify(schema, R"([[1, "Foo"], [2, 
"Bar"]])"));
+}
+
 TEST_F(AvroReaderTest, ReadReorderedFieldsWithNulls) {
   CreateSimpleAvroFile();
   auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
diff --git a/src/iceberg/test/data_writer_test.cc 
b/src/iceberg/test/data_writer_test.cc
index a3a8fc08..14b7bf62 100644
--- a/src/iceberg/test/data_writer_test.cc
+++ b/src/iceberg/test/data_writer_test.cc
@@ -25,7 +25,7 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/data/equality_delete_writer.h"
 #include "iceberg/data/position_delete_writer.h"
diff --git a/src/iceberg/test/delete_file_index_test.cc 
b/src/iceberg/test/delete_file_index_test.cc
index b99a2816..0c8c8821 100644
--- a/src/iceberg/test/delete_file_index_test.cc
+++ b/src/iceberg/test/delete_file_index_test.cc
@@ -29,7 +29,7 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/manifest/manifest_list.h"
diff --git a/src/iceberg/test/delete_loader_test.cc 
b/src/iceberg/test/delete_loader_test.cc
index 6dcd564b..c365b8ba 100644
--- a/src/iceberg/test/delete_loader_test.cc
+++ b/src/iceberg/test/delete_loader_test.cc
@@ -25,7 +25,7 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/data/equality_delete_writer.h"
 #include "iceberg/data/position_delete_writer.h"
 #include "iceberg/deletes/position_delete_index.h"
diff --git a/src/iceberg/test/file_scan_task_test.cc 
b/src/iceberg/test/file_scan_task_test.cc
index ba0c41b3..55bc6a11 100644
--- a/src/iceberg/test/file_scan_task_test.cc
+++ b/src/iceberg/test/file_scan_task_test.cc
@@ -27,7 +27,7 @@
 #include <parquet/arrow/writer.h>
 #include <parquet/metadata.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/file_format.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/parquet/parquet_register.h"
@@ -36,7 +36,6 @@
 #include "iceberg/test/matchers.h"
 #include "iceberg/test/temp_file_test_base.h"
 #include "iceberg/type.h"
-#include "iceberg/util/checked_cast.h"
 
 namespace iceberg {
 
@@ -68,12 +67,14 @@ class FileScanTaskTest : public TempFileTestBase {
                                         .ValueOrDie()})
                      .ValueOrDie();
 
-    auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
-    auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
+    ICEBERG_UNWRAP_OR_FAIL(auto outfile,
+                           arrow::OpenArrowOutputStream(file_io_, 
temp_parquet_file_));
 
     ASSERT_TRUE(::parquet::arrow::WriteTable(*table, 
::arrow::default_memory_pool(),
                                              outfile, chunk_size)
                     .ok());
+    ASSERT_TRUE(outfile->Close().ok());
+    RefreshParquetFileSize();
   }
 
   // Helper to create a valid but empty Parquet file.
@@ -84,11 +85,28 @@ class FileScanTaskTest : public TempFileTestBase {
                         ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, 
{"1"}))});
     auto empty_table = ::arrow::Table::FromRecordBatches(arrow_schema, 
{}).ValueOrDie();
 
-    auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
-    auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
+    ICEBERG_UNWRAP_OR_FAIL(auto outfile,
+                           arrow::OpenArrowOutputStream(file_io_, 
temp_parquet_file_));
     ASSERT_TRUE(::parquet::arrow::WriteTable(*empty_table, 
::arrow::default_memory_pool(),
                                              outfile, 1024)
                     .ok());
+    ASSERT_TRUE(outfile->Close().ok());
+    RefreshParquetFileSize();
+  }
+
+  void RefreshParquetFileSize() {
+    ICEBERG_UNWRAP_OR_FAIL(auto input_file, 
file_io_->NewInputFile(temp_parquet_file_));
+    ICEBERG_UNWRAP_OR_FAIL(auto size, input_file->Size());
+    ASSERT_GT(size, 0);
+    parquet_file_size_ = size;
+  }
+
+  std::shared_ptr<DataFile> MakeDataFile() const {
+    auto data_file = std::make_shared<DataFile>();
+    data_file->file_path = temp_parquet_file_;
+    data_file->file_format = FileFormatType::kParquet;
+    data_file->file_size_in_bytes = parquet_file_size_;
+    return data_file;
   }
 
   // Helper method to verify the content of the next batch from an 
ArrowArrayStream.
@@ -124,12 +142,11 @@ class FileScanTaskTest : public TempFileTestBase {
 
   std::shared_ptr<FileIO> file_io_;
   std::string temp_parquet_file_;
+  int64_t parquet_file_size_ = 0;
 };
 
 TEST_F(FileScanTaskTest, ReadFullSchema) {
-  auto data_file = std::make_shared<DataFile>();
-  data_file->file_path = temp_parquet_file_;
-  data_file->file_format = FileFormatType::kParquet;
+  auto data_file = MakeDataFile();
 
   auto projected_schema = std::make_shared<Schema>(
       std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
@@ -146,9 +163,7 @@ TEST_F(FileScanTaskTest, ReadFullSchema) {
 }
 
 TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
-  auto data_file = std::make_shared<DataFile>();
-  data_file->file_path = temp_parquet_file_;
-  data_file->file_format = FileFormatType::kParquet;
+  auto data_file = MakeDataFile();
 
   auto projected_schema = std::make_shared<Schema>(
       std::vector<SchemaField>{SchemaField::MakeOptional(2, "name", string()),
@@ -166,9 +181,7 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
 
 TEST_F(FileScanTaskTest, ReadEmptyFile) {
   CreateEmptyParquetFile();
-  auto data_file = std::make_shared<DataFile>();
-  data_file->file_path = temp_parquet_file_;
-  data_file->file_format = FileFormatType::kParquet;
+  auto data_file = MakeDataFile();
 
   auto projected_schema = std::make_shared<Schema>(
       std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
diff --git a/src/iceberg/test/gzip_decompress_test.cc 
b/src/iceberg/test/gzip_decompress_test.cc
index 3415c46b..b855e4a0 100644
--- a/src/iceberg/test/gzip_decompress_test.cc
+++ b/src/iceberg/test/gzip_decompress_test.cc
@@ -23,7 +23,7 @@
 #include <arrow/util/compression.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/file_io.h"
 #include "iceberg/test/matchers.h"
 #include "iceberg/test/temp_file_test_base.h"
@@ -69,7 +69,11 @@ TEST_F(GZipTest, GZipDecompressedString) {
   ASSERT_TRUE(compressed_stream->Flush().ok());
   ASSERT_TRUE(compressed_stream->Close().ok());
 
-  auto result = io_->ReadFile(temp_filepath_, test_string.size());
+  ICEBERG_UNWRAP_OR_FAIL(auto input_file, io_->NewInputFile(temp_filepath_));
+  ICEBERG_UNWRAP_OR_FAIL(auto compressed_size, input_file->Size());
+  ASSERT_GE(compressed_size, 0);
+
+  auto result = io_->ReadFile(temp_filepath_, 
static_cast<size_t>(compressed_size));
   EXPECT_THAT(result, IsOk());
 
   auto gzip_decompressor = std::make_unique<GZipDecompressor>();
diff --git a/src/iceberg/test/in_memory_catalog_test.cc 
b/src/iceberg/test/in_memory_catalog_test.cc
index 78f67dda..1a65098e 100644
--- a/src/iceberg/test/in_memory_catalog_test.cc
+++ b/src/iceberg/test/in_memory_catalog_test.cc
@@ -27,7 +27,7 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/partition_spec.h"
 #include "iceberg/schema.h"
 #include "iceberg/sort_order.h"
diff --git a/src/iceberg/test/manifest_group_test.cc 
b/src/iceberg/test/manifest_group_test.cc
index 34ff9993..017f9803 100644
--- a/src/iceberg/test/manifest_group_test.cc
+++ b/src/iceberg/test/manifest_group_test.cc
@@ -29,7 +29,7 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/expression/expressions.h"
 #include "iceberg/manifest/manifest_entry.h"
diff --git a/src/iceberg/test/manifest_list_versions_test.cc 
b/src/iceberg/test/manifest_list_versions_test.cc
index 9c16a02e..b173d56e 100644
--- a/src/iceberg/test/manifest_list_versions_test.cc
+++ b/src/iceberg/test/manifest_list_versions_test.cc
@@ -25,7 +25,7 @@
 #include <arrow/record_batch.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/file_reader.h"
 #include "iceberg/file_writer.h"
diff --git a/src/iceberg/test/manifest_reader_stats_test.cc 
b/src/iceberg/test/manifest_reader_stats_test.cc
index a94dca12..6c0e005b 100644
--- a/src/iceberg/test/manifest_reader_stats_test.cc
+++ b/src/iceberg/test/manifest_reader_stats_test.cc
@@ -25,7 +25,7 @@
 
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/expression/expressions.h"
 #include "iceberg/manifest/manifest_entry.h"
diff --git a/src/iceberg/test/manifest_reader_test.cc 
b/src/iceberg/test/manifest_reader_test.cc
index 3e93f6ff..3f85729a 100644
--- a/src/iceberg/test/manifest_reader_test.cc
+++ b/src/iceberg/test/manifest_reader_test.cc
@@ -28,7 +28,7 @@
 
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/expression/expressions.h"
 #include "iceberg/manifest/manifest_entry.h"
diff --git a/src/iceberg/test/manifest_writer_versions_test.cc 
b/src/iceberg/test/manifest_writer_versions_test.cc
index fc61980a..99022452 100644
--- a/src/iceberg/test/manifest_writer_versions_test.cc
+++ b/src/iceberg/test/manifest_writer_versions_test.cc
@@ -25,7 +25,7 @@
 
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/constants.h"
 #include "iceberg/file_format.h"
diff --git a/src/iceberg/test/metadata_io_test.cc 
b/src/iceberg/test/metadata_io_test.cc
index 72a23ecb..c780dc18 100644
--- a/src/iceberg/test/metadata_io_test.cc
+++ b/src/iceberg/test/metadata_io_test.cc
@@ -27,7 +27,7 @@
 #include <gtest/gtest.h>
 #include <nlohmann/json.hpp>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/file_io.h"
 #include "iceberg/json_serde_internal.h"
 #include "iceberg/partition_spec.h"
diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc
index 0d983db5..65a4602d 100644
--- a/src/iceberg/test/parquet_test.cc
+++ b/src/iceberg/test/parquet_test.cc
@@ -21,6 +21,7 @@
 
 #include <arrow/array.h>
 #include <arrow/c/bridge.h>
+#include <arrow/filesystem/filesystem.h>
 #include <arrow/json/from_string.h>
 #include <arrow/record_batch.h>
 #include <arrow/table.h>
@@ -30,7 +31,7 @@
 #include <parquet/arrow/writer.h>
 #include <parquet/metadata.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/arrow/arrow_status_internal.h"
 #include "iceberg/file_reader.h"
 #include "iceberg/file_writer.h"
@@ -41,6 +42,8 @@
 #include "iceberg/schema_field.h"
 #include "iceberg/schema_internal.h"
 #include "iceberg/test/matchers.h"
+#include "iceberg/test/std_io.h"
+#include "iceberg/test/temp_file_test_base.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
 #include "iceberg/util/macros.h"
@@ -123,11 +126,12 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, 
std::shared_ptr<Schema> s
 
 }  // namespace
 
-class ParquetReaderTest : public ::testing::Test {
+class ParquetReaderTest : public TempFileTestBase {
  protected:
   static void SetUpTestSuite() { parquet::RegisterAll(); }
 
   void SetUp() override {
+    TempFileTestBase::SetUp();
     file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
     temp_parquet_file_ = "parquet_reader_test.parquet";
   }
@@ -232,6 +236,42 @@ TEST_F(ParquetReaderTest, ReadTwoFields) {
   ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
 }
 
+TEST_F(ParquetReaderTest, RoundTripWithGenericFileIO) {
+  auto file_io = std::make_shared<iceberg::test::StdFileIO>();
+  auto path = CreateNewTempFilePathWithSuffix(".parquet");
+
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(2, "name", 
string())});
+  ArrowSchema arrow_c_schema;
+  ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
+  auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+  auto array =
+      
::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()),
+                                         R"([[1, "Foo"], [2, "Bar"]])")
+          .ValueOrDie();
+
+  WriterProperties writer_properties;
+  writer_properties.Set(WriterProperties::kParquetCompression,
+                        std::string("uncompressed"));
+  auto writer_result = WriterFactoryRegistry::Open(
+      FileFormatType::kParquet, {.path = path,
+                                 .schema = schema,
+                                 .io = file_io,
+                                 .properties = std::move(writer_properties)});
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+  ASSERT_THAT(WriteArray(array, *writer), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto length, writer->length());
+
+  std::shared_ptr<::arrow::Array> out;
+  auto read_status = ReadArray(
+      out, {.path = path, .length = length, .io = file_io, .projection = 
schema},
+      nullptr);
+  ASSERT_THAT(read_status, IsOk());
+  ASSERT_TRUE(out->Equals(*array));
+}
+
 TEST_F(ParquetReaderTest, ReadReorderedFieldsWithNulls) {
   CreateSimpleParquetFile();
 
diff --git a/src/iceberg/test/rolling_manifest_writer_test.cc 
b/src/iceberg/test/rolling_manifest_writer_test.cc
index b996eb16..2eae5a15 100644
--- a/src/iceberg/test/rolling_manifest_writer_test.cc
+++ b/src/iceberg/test/rolling_manifest_writer_test.cc
@@ -28,7 +28,7 @@
 
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/file_format.h"
 #include "iceberg/manifest/manifest_entry.h"
diff --git a/src/iceberg/test/scan_test_base.h 
b/src/iceberg/test/scan_test_base.h
index 1a2f5648..5bd1222e 100644
--- a/src/iceberg/test/scan_test_base.h
+++ b/src/iceberg/test/scan_test_base.h
@@ -30,7 +30,7 @@
 
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/manifest/manifest_list.h"
diff --git a/src/iceberg/test/std_io.h b/src/iceberg/test/std_io.h
index 3b58267d..3866bddf 100644
--- a/src/iceberg/test/std_io.h
+++ b/src/iceberg/test/std_io.h
@@ -19,78 +19,297 @@
 
 #pragma once
 
+#include <cstddef>
+#include <cstdint>
 #include <filesystem>
 #include <fstream>
+#include <ios>
+#include <limits>
+#include <memory>
 #include <optional>
-#include <sstream>
+#include <span>
 #include <string>
 #include <string_view>
+#include <system_error>
+#include <utility>
 
 #include "iceberg/file_io.h"
 #include "iceberg/result.h"
+#include "iceberg/util/macros.h"
 
 namespace iceberg::test {
 
-/// \brief Simple local filesystem FileIO implementation for testing
-///
-/// This class provides a basic FileIO implementation that reads and writes
-/// files to the local filesystem using standard C++ file streams.
-class StdFileIO : public FileIO {
+namespace detail {
+
+inline Result<std::streamsize> ToStreamSize(size_t size) {
+  if (size > static_cast<size_t>(std::numeric_limits<std::streamsize>::max())) 
{
+    return InvalidArgument("Buffer size {} exceeds streamsize max", size);
+  }
+  return static_cast<std::streamsize>(size);
+}
+
+inline Result<int64_t> ToInt64FileSize(uintmax_t size, std::string_view 
location) {
+  if (size > static_cast<uintmax_t>(std::numeric_limits<int64_t>::max())) {
+    return Invalid("File size for {} exceeds int64_t max", location);
+  }
+  return static_cast<int64_t>(size);
+}
+
+}  // namespace detail
+
+class StdSeekableInputStream : public SeekableInputStream {
  public:
-  Result<std::string> ReadFile(const std::string& file_location,
-                               std::optional<size_t> length) override {
-    std::ifstream file(file_location, std::ios::binary);
-    if (!file.is_open()) {
-      return IOError("Failed to open file for reading: {}", file_location);
-    }
-
-    if (length.has_value()) {
-      std::string content(length.value(), '\0');
-      file.read(content.data(), length.value());
-      if (!file) {
-        return IOError("Failed to read {} bytes from file: {}", length.value(),
-                       file_location);
+  explicit StdSeekableInputStream(std::string location)
+      : location_(std::move(location)), file_(location_, std::ios::binary) {}
+
+  bool is_open() const { return file_.is_open(); }
+
+  Result<int64_t> Position() const override {
+    auto position = file_.tellg();
+    if (position < 0) {
+      return IOError("Failed to get read position for: {}", location_);
+    }
+    return static_cast<int64_t>(position);
+  }
+
+  Status Seek(int64_t position) override {
+    if (position < 0) {
+      return InvalidArgument("Cannot seek to negative position {}", position);
+    }
+    file_.clear();
+    file_.seekg(position);
+    if (!file_) {
+      return IOError("Failed to seek to {} in file: {}", position, location_);
+    }
+    return {};
+  }
+
+  Result<int64_t> Read(std::span<std::byte> out) override {
+    if (out.empty()) {
+      return 0;
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto read_size, detail::ToStreamSize(out.size()));
+    file_.read(reinterpret_cast<char*>(out.data()), read_size);
+    auto bytes_read = file_.gcount();
+    if (!file_) {
+      if (file_.bad() || !file_.eof()) {
+        return IOError("Failed to read from file: {}", location_);
       }
-      return content;
-    } else {
-      std::stringstream buffer;
-      buffer << file.rdbuf();
-      if (!file && !file.eof()) {
-        return IOError("Failed to read file: {}", file_location);
+      file_.clear();
+    }
+    if (bytes_read < 0) {
+      return IOError("Failed to read from file: {}", location_);
+    }
+    return static_cast<int64_t>(bytes_read);
+  }
+
+  Status ReadFully(int64_t position, std::span<std::byte> out) override {
+    if (position < 0) {
+      return InvalidArgument("Cannot read from negative position {}", 
position);
+    }
+    if (out.empty()) {
+      return {};
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto original_position, Position());
+    auto seek_status = Seek(position);
+    if (!seek_status.has_value()) {
+      static_cast<void>(Seek(original_position));
+      return seek_status;
+    }
+
+    Status read_status = {};
+    size_t total_read = 0;
+    while (total_read < out.size()) {
+      auto read_result = Read(out.subspan(total_read));
+      if (!read_result.has_value()) {
+        read_status = std::unexpected<Error>(read_result.error());
+        break;
+      }
+      if (read_result.value() == 0) {
+        read_status =
+            IOError("Failed to read {} bytes from file: {}", out.size(), 
location_);
+        break;
       }
-      return buffer.str();
+      total_read += static_cast<size_t>(read_result.value());
+    }
+
+    auto restore_status = Seek(original_position);
+    ICEBERG_RETURN_UNEXPECTED(read_status);
+    return restore_status;
+  }
+
+  Status Close() override {
+    if (!file_.is_open()) {
+      return {};
+    }
+    file_.close();
+    if (!file_) {
+      return IOError("Failed to close file: {}", location_);
+    }
+    return {};
+  }
+
+ private:
+  std::string location_;
+  mutable std::ifstream file_;
+};
+
+class StdPositionOutputStream : public PositionOutputStream {
+ public:
+  explicit StdPositionOutputStream(std::string location)
+      : location_(std::move(location)),
+        file_(location_, std::ios::binary | std::ios::out | std::ios::trunc) {}
+
+  bool is_open() const { return file_.is_open(); }
+
+  Result<int64_t> Position() const override {
+    auto position = file_.tellp();
+    if (position < 0) {
+      return IOError("Failed to get write position for: {}", location_);
+    }
+    return static_cast<int64_t>(position);
+  }
+
+  Status Write(std::span<const std::byte> data) override {
+    if (data.empty()) {
+      return {};
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto write_size, 
detail::ToStreamSize(data.size()));
+    file_.write(reinterpret_cast<const char*>(data.data()), write_size);
+    if (!file_) {
+      return IOError("Failed to write to file: {}", location_);
     }
+    return {};
   }
 
-  Status WriteFile(const std::string& file_location, std::string_view content) 
override {
-    // Create parent directories if they don't exist
-    std::filesystem::path path(file_location);
+  Status Flush() override {
+    file_.flush();
+    if (!file_) {
+      return IOError("Failed to flush file: {}", location_);
+    }
+    return {};
+  }
+
+  Status Close() override {
+    if (!file_.is_open()) {
+      return {};
+    }
+    file_.close();
+    if (!file_) {
+      return IOError("Failed to close file: {}", location_);
+    }
+    return {};
+  }
+
+ private:
+  std::string location_;
+  mutable std::ofstream file_;
+};
+
+class StdInputFile : public InputFile {
+ public:
+  explicit StdInputFile(std::string location,
+                        std::optional<int64_t> file_size = std::nullopt)
+      : location_(std::move(location)), file_size_(file_size) {}
+
+  std::string_view location() const override { return location_; }
+
+  Result<int64_t> Size() const override {
+    if (file_size_.has_value()) {
+      return *file_size_;
+    }
+    std::error_code ec;
+    auto size = std::filesystem::file_size(location_, ec);
+    if (ec) {
+      return IOError("Failed to get file size for {}: {}", location_, 
ec.message());
+    }
+    return detail::ToInt64FileSize(size, location_);
+  }
+
+  Result<std::unique_ptr<SeekableInputStream>> Open() override {
+    auto stream = std::make_unique<StdSeekableInputStream>(location_);
+    if (!stream->is_open()) {
+      return IOError("Failed to open file for reading: {}", location_);
+    }
+    return stream;
+  }
+
+ private:
+  std::string location_;
+  std::optional<int64_t> file_size_;
+};
+
+class StdOutputFile : public OutputFile {
+ public:
+  explicit StdOutputFile(std::string location) : 
location_(std::move(location)) {}
+
+  std::string_view location() const override { return location_; }
+
+  Result<std::unique_ptr<PositionOutputStream>> Create() override {
+    return Create(/*overwrite=*/false);
+  }
+
+  Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() override {
+    return Create(/*overwrite=*/true);
+  }
+
+ private:
+  Result<std::unique_ptr<PositionOutputStream>> Create(bool overwrite) {
+    std::filesystem::path path(location_);
+    std::error_code ec;
+    auto exists = std::filesystem::exists(path, ec);
+    if (ec) {
+      return IOError("Failed to check file existence for {}: {}", location_,
+                     ec.message());
+    }
+    if (!overwrite && exists) {
+      return AlreadyExists("File already exists: {}", location_);
+    }
     if (path.has_parent_path()) {
-      std::error_code ec;
       std::filesystem::create_directories(path.parent_path(), ec);
       if (ec) {
-        return IOError("Failed to create parent directories for: {}", 
file_location);
+        return IOError("Failed to create parent directories for {}: {}", 
location_,
+                       ec.message());
       }
     }
-
-    std::ofstream file(file_location, std::ios::binary);
-    if (!file.is_open()) {
-      return IOError("Failed to open file for writing: {}", file_location);
+    auto stream = std::make_unique<StdPositionOutputStream>(location_);
+    if (!stream->is_open()) {
+      return IOError("Failed to open file for writing: {}", location_);
     }
+    return stream;
+  }
 
-    file.write(content.data(), content.size());
-    if (!file) {
-      return IOError("Failed to write to file: {}", file_location);
+  std::string location_;
+};
+
+/// \brief Simple local filesystem FileIO implementation for testing
+///
+/// This class provides a basic FileIO implementation that reads and writes
+/// files to the local filesystem using standard C++ file streams.
+class StdFileIO : public FileIO {
+ public:
+  Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location) 
override {
+    return std::make_unique<StdInputFile>(std::move(file_location));
+  }
+
+  Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location,
+                                                  size_t length) override {
+    if (length > static_cast<size_t>(std::numeric_limits<int64_t>::max())) {
+      return InvalidArgument("File length {} exceeds int64_t max", length);
     }
+    return std::make_unique<StdInputFile>(std::move(file_location),
+                                          static_cast<int64_t>(length));
+  }
 
-    return {};
+  Result<std::unique_ptr<OutputFile>> NewOutputFile(std::string file_location) 
override {
+    return std::make_unique<StdOutputFile>(std::move(file_location));
   }
 
   Status DeleteFile(const std::string& file_location) override {
     std::error_code ec;
     if (!std::filesystem::remove(file_location, ec)) {
       if (ec) {
-        return IOError("Failed to delete file: {}", file_location);
+        return IOError("Failed to delete file {}: {}", file_location, 
ec.message());
       }
       return IOError("File does not exist: {}", file_location);
     }
diff --git a/src/iceberg/test/update_location_test.cc 
b/src/iceberg/test/update_location_test.cc
index 53b347b5..a208209b 100644
--- a/src/iceberg/test/update_location_test.cc
+++ b/src/iceberg/test/update_location_test.cc
@@ -26,7 +26,7 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/result.h"
 #include "iceberg/test/matchers.h"
 #include "iceberg/test/update_test_base.h"
diff --git a/src/iceberg/test/update_partition_spec_test.cc 
b/src/iceberg/test/update_partition_spec_test.cc
index 632c4a55..e310c0d1 100644
--- a/src/iceberg/test/update_partition_spec_test.cc
+++ b/src/iceberg/test/update_partition_spec_test.cc
@@ -28,7 +28,7 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/catalog/memory/in_memory_catalog.h"
 #include "iceberg/expression/expressions.h"
 #include "iceberg/partition_spec.h"
diff --git a/src/iceberg/test/update_test_base.h 
b/src/iceberg/test/update_test_base.h
index 310feb37..b0ad3d20 100644
--- a/src/iceberg/test/update_test_base.h
+++ b/src/iceberg/test/update_test_base.h
@@ -26,7 +26,7 @@
 #include <arrow/filesystem/mockfs.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/catalog/memory/in_memory_catalog.h"
 #include "iceberg/result.h"
 #include "iceberg/snapshot.h"

Reply via email to