This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new fc80e4bd feat(io): add streaming FileIO support (#641)
fc80e4bd is described below
commit fc80e4bdbafcd659e4b44fb9fb8ae7960a08c2d1
Author: Gang Wu <[email protected]>
AuthorDate: Fri May 8 22:18:40 2026 +0800
feat(io): add streaming FileIO support (#641)
Introduce InputFile/OutputFile stream APIs and Arrow IO adapters so
bundled Avro/Parquet readers and writers can work with generic FileIO
implementations.
---
example/demo_example.cc | 2 +-
src/iceberg/CMakeLists.txt | 5 +-
src/iceberg/arrow/arrow_fs_file_io.cc | 108 ----
src/iceberg/arrow/arrow_io.cc | 590 +++++++++++++++++++++
...w_fs_file_io_internal.h => arrow_io_internal.h} | 44 +-
.../{file_io_register.cc => arrow_io_register.cc} | 4 +-
.../{file_io_register.h => arrow_io_register.h} | 2 +-
.../arrow/{arrow_file_io.h => arrow_io_util.h} | 0
src/iceberg/arrow/s3/arrow_s3_file_io.cc | 4 +-
src/iceberg/avro/avro_reader.cc | 12 +-
src/iceberg/avro/avro_writer.cc | 7 +-
src/iceberg/file_io.cc | 103 ++++
src/iceberg/file_io.h | 105 +++-
src/iceberg/file_reader.h | 4 +-
src/iceberg/file_writer.h | 4 +-
src/iceberg/meson.build | 1 +
src/iceberg/parquet/parquet_reader.cc | 12 +-
src/iceberg/parquet/parquet_writer.cc | 7 +-
src/iceberg/test/CMakeLists.txt | 2 +-
src/iceberg/test/arrow_fs_file_io_test.cc | 67 ---
src/iceberg/test/arrow_io_test.cc | 486 +++++++++++++++++
src/iceberg/test/arrow_s3_file_io_test.cc | 2 +-
src/iceberg/test/avro_test.cc | 18 +-
src/iceberg/test/data_writer_test.cc | 2 +-
src/iceberg/test/delete_file_index_test.cc | 2 +-
src/iceberg/test/delete_loader_test.cc | 2 +-
src/iceberg/test/file_scan_task_test.cc | 43 +-
src/iceberg/test/gzip_decompress_test.cc | 8 +-
src/iceberg/test/in_memory_catalog_test.cc | 2 +-
src/iceberg/test/manifest_group_test.cc | 2 +-
src/iceberg/test/manifest_list_versions_test.cc | 2 +-
src/iceberg/test/manifest_reader_stats_test.cc | 2 +-
src/iceberg/test/manifest_reader_test.cc | 2 +-
src/iceberg/test/manifest_writer_versions_test.cc | 2 +-
src/iceberg/test/metadata_io_test.cc | 2 +-
src/iceberg/test/parquet_test.cc | 44 +-
src/iceberg/test/rolling_manifest_writer_test.cc | 2 +-
src/iceberg/test/scan_test_base.h | 2 +-
src/iceberg/test/std_io.h | 299 +++++++++--
src/iceberg/test/update_location_test.cc | 2 +-
src/iceberg/test/update_partition_spec_test.cc | 2 +-
src/iceberg/test/update_test_base.h | 2 +-
42 files changed, 1700 insertions(+), 313 deletions(-)
diff --git a/example/demo_example.cc b/example/demo_example.cc
index 6869aa37..3c8745be 100644
--- a/example/demo_example.cc
+++ b/example/demo_example.cc
@@ -19,7 +19,7 @@
#include <iostream>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/catalog/memory/in_memory_catalog.h"
#include "iceberg/manifest/manifest_entry.h"
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 02099f6b..c4e193b8 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -36,6 +36,7 @@ set(ICEBERG_SOURCES
expression/rewrite_not.cc
expression/strict_metrics_evaluator.cc
expression/term.cc
+ file_io.cc
file_io_registry.cc
file_reader.cc
file_writer.cc
@@ -218,9 +219,9 @@ add_subdirectory(util)
if(ICEBERG_BUILD_BUNDLE)
set(ICEBERG_BUNDLE_SOURCES
- arrow/arrow_fs_file_io.cc
+ arrow/arrow_io.cc
arrow/s3/arrow_s3_file_io.cc
- arrow/file_io_register.cc
+ arrow/arrow_io_register.cc
arrow/metadata_column_util.cc
avro/avro_data_util.cc
avro/avro_direct_decoder.cc
diff --git a/src/iceberg/arrow/arrow_fs_file_io.cc
b/src/iceberg/arrow/arrow_fs_file_io.cc
deleted file mode 100644
index 769fcfb1..00000000
--- a/src/iceberg/arrow/arrow_fs_file_io.cc
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <chrono>
-
-#include <arrow/filesystem/localfs.h>
-#include <arrow/filesystem/mockfs.h>
-
-#include "iceberg/arrow/arrow_file_io.h"
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/arrow/arrow_status_internal.h"
-#include "iceberg/util/macros.h"
-
-namespace iceberg::arrow {
-
-Result<std::string> ArrowFileSystemFileIO::ResolvePath(const std::string&
file_location) {
- if (file_location.find("://") != std::string::npos) {
- ICEBERG_ARROW_ASSIGN_OR_RETURN(auto path,
arrow_fs_->PathFromUri(file_location));
- return path;
- }
- return file_location;
-}
-
-/// \brief Read the content of the file at the given location.
-Result<std::string> ArrowFileSystemFileIO::ReadFile(const std::string&
file_location,
- std::optional<size_t>
length) {
- ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
- ::arrow::fs::FileInfo file_info(path);
- if (length.has_value()) {
- file_info.set_size(length.value());
- }
- std::string content;
- ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file,
arrow_fs_->OpenInputFile(file_info));
- ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
-
- content.resize(file_size);
- size_t remain = file_size;
- size_t offset = 0;
- while (remain > 0) {
- size_t read_length = std::min(remain, static_cast<size_t>(1024 * 1024));
- ICEBERG_ARROW_ASSIGN_OR_RETURN(
- auto read_bytes,
- file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
- if (read_bytes == 0) {
- return IOError("Unexpected EOF reading {}: got {} of {} bytes",
file_location,
- offset, file_size);
- }
- remain -= read_bytes;
- offset += read_bytes;
- }
-
- return content;
-}
-
-/// \brief Write the given content to the file at the given location.
-Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
- std::string_view content) {
- ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
- ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(path));
- ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
- ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
- ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
- return {};
-}
-
-/// \brief Delete a file at the given location.
-Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) {
- ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
- ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(path));
- return {};
-}
-
-std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeMockFileIO() {
- return std::make_unique<ArrowFileSystemFileIO>(
- std::make_shared<::arrow::fs::internal::MockFileSystem>(
- std::chrono::system_clock::now()));
-}
-
-std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeLocalFileIO() {
- return std::make_unique<ArrowFileSystemFileIO>(
- std::make_shared<::arrow::fs::LocalFileSystem>());
-}
-
-std::unique_ptr<FileIO> MakeMockFileIO() {
- return ArrowFileSystemFileIO::MakeMockFileIO();
-}
-
-std::unique_ptr<FileIO> MakeLocalFileIO() {
- return ArrowFileSystemFileIO::MakeLocalFileIO();
-}
-
-} // namespace iceberg::arrow
diff --git a/src/iceberg/arrow/arrow_io.cc b/src/iceberg/arrow/arrow_io.cc
new file mode 100644
index 00000000..a515f338
--- /dev/null
+++ b/src/iceberg/arrow/arrow_io.cc
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <algorithm>
+#include <chrono>
+#include <limits>
+#include <mutex>
+#include <optional>
+
+#include <arrow/buffer.h>
+#include <arrow/filesystem/localfs.h>
+#include <arrow/filesystem/mockfs.h>
+#include <arrow/io/interfaces.h>
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_io_util.h"
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::arrow {
+
+namespace {
+
+Result<int64_t> ToInt64Length(size_t length) {
+ if (length > static_cast<size_t>(std::numeric_limits<int64_t>::max())) {
+ return InvalidArgument("File length {} exceeds int64_t max", length);
+ }
+ return static_cast<int64_t>(length);
+}
+
+::arrow::Status ToArrowStatus(const Error& error) {
+ switch (error.kind) {
+ case ErrorKind::kInvalid:
+ case ErrorKind::kInvalidArgument:
+ return ::arrow::Status::Invalid(error.message);
+ case ErrorKind::kNotImplemented:
+ case ErrorKind::kNotSupported:
+ return ::arrow::Status::NotImplemented(error.message);
+ default:
+ return ::arrow::Status::IOError(error.message);
+ }
+}
+
+::arrow::Result<int64_t> BytesToReadAt(int64_t position, int64_t nbytes,
int64_t size) {
+ if (position < 0 || nbytes < 0) {
+ return ::arrow::Status::Invalid("ReadAt position and length must be
non-negative");
+ }
+ if (position > size) {
+ return ::arrow::Status::IOError("Read out of bounds (offset = ", position,
+ ", size = ", nbytes, ") in file of size ",
size);
+ }
+ return std::min(nbytes, size - position);
+}
+
+/// Adapts the generic Iceberg input stream API to Arrow's RandomAccessFile
API.
+///
+/// Avro and Parquet readers in the bundle layer consume Arrow IO streams. This
+/// fallback keeps those readers usable with non-Arrow FileIO implementations
without
+/// exposing Arrow filesystem details through the generic FileIO interface.
+class InputStreamAdapter : public ::arrow::io::RandomAccessFile {
+ public:
+ InputStreamAdapter(std::unique_ptr<SeekableInputStream> input, int64_t size)
+ : input_(std::move(input)), size_(size) {
+ RandomAccessFile::set_mode(::arrow::io::FileMode::READ);
+ }
+
+ ::arrow::Status Close() override {
+ std::lock_guard lock(mutex_);
+ if (closed_) {
+ return ::arrow::Status::OK();
+ }
+ auto status = input_->Close();
+ if (!status.has_value()) {
+ return ToArrowStatus(status.error());
+ }
+ closed_ = true;
+ return ::arrow::Status::OK();
+ }
+
+ ::arrow::Result<int64_t> Tell() const override {
+ std::lock_guard lock(mutex_);
+ ARROW_RETURN_NOT_OK(CheckOpenLocked());
+ auto position = input_->Position();
+ if (!position.has_value()) {
+ return ToArrowStatus(position.error());
+ }
+ if (position.value() < 0) {
+ return ::arrow::Status::IOError("FileIO input stream returned negative
position");
+ }
+ return position.value();
+ }
+
+ bool closed() const override {
+ std::lock_guard lock(mutex_);
+ return closed_;
+ }
+
+ ::arrow::Status Seek(int64_t position) override {
+ std::lock_guard lock(mutex_);
+ ARROW_RETURN_NOT_OK(CheckOpenLocked());
+ auto status = input_->Seek(position);
+ if (!status.has_value()) {
+ return ToArrowStatus(status.error());
+ }
+ return ::arrow::Status::OK();
+ }
+
+ ::arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+ if (nbytes < 0) {
+ return ::arrow::Status::Invalid("Cannot read a negative number of
bytes");
+ }
+ std::lock_guard lock(mutex_);
+ ARROW_RETURN_NOT_OK(CheckOpenLocked());
+ if (nbytes == 0) {
+ return 0;
+ }
+ auto data = reinterpret_cast<std::byte*>(out);
+ auto result = input_->Read(std::span(data, static_cast<size_t>(nbytes)));
+ if (!result.has_value()) {
+ return ToArrowStatus(result.error());
+ }
+ if (result.value() < 0 || result.value() > nbytes) {
+ return ::arrow::Status::IOError("FileIO input stream returned invalid
byte count");
+ }
+ return result.value();
+ }
+
+ ::arrow::Result<std::shared_ptr<::arrow::Buffer>> Read(int64_t nbytes)
override {
+ if (nbytes < 0) {
+ return ::arrow::Status::Invalid("Cannot read a negative number of
bytes");
+ }
+ ARROW_ASSIGN_OR_RAISE(auto buffer,
::arrow::AllocateResizableBuffer(nbytes));
+ ARROW_ASSIGN_OR_RAISE(auto bytes_read, Read(nbytes,
buffer->mutable_data()));
+ ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, /*shrink_to_fit=*/false));
+ return std::shared_ptr<::arrow::Buffer>(std::move(buffer));
+ }
+
+ ::arrow::Result<int64_t> GetSize() override { return size_; }
+
+ ::arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out)
override {
+ std::lock_guard lock(mutex_);
+ ARROW_RETURN_NOT_OK(CheckOpenLocked());
+ ARROW_ASSIGN_OR_RAISE(auto bytes_to_read, BytesToReadAt(position, nbytes,
size_));
+ if (bytes_to_read == 0) {
+ return 0;
+ }
+ auto data = reinterpret_cast<std::byte*>(out);
+ auto status =
+ input_->ReadFully(position, std::span(data,
static_cast<size_t>(bytes_to_read)));
+ if (!status.has_value()) {
+ return ToArrowStatus(status.error());
+ }
+ return bytes_to_read;
+ }
+
+ ::arrow::Result<std::shared_ptr<::arrow::Buffer>> ReadAt(int64_t position,
+ int64_t nbytes)
override {
+ {
+ std::lock_guard lock(mutex_);
+ ARROW_RETURN_NOT_OK(CheckOpenLocked());
+ }
+ ARROW_ASSIGN_OR_RAISE(auto bytes_to_read, BytesToReadAt(position, nbytes,
size_));
+ ARROW_ASSIGN_OR_RAISE(auto buffer,
::arrow::AllocateResizableBuffer(bytes_to_read));
+ if (bytes_to_read == 0) {
+ return std::shared_ptr<::arrow::Buffer>(std::move(buffer));
+ }
+ std::lock_guard lock(mutex_);
+ ARROW_RETURN_NOT_OK(CheckOpenLocked());
+ auto status = input_->ReadFully(
+ position,
std::span(reinterpret_cast<std::byte*>(buffer->mutable_data()),
+ static_cast<size_t>(bytes_to_read)));
+ if (!status.has_value()) {
+ return ToArrowStatus(status.error());
+ }
+ return std::shared_ptr<::arrow::Buffer>(std::move(buffer));
+ }
+
+ private:
+ ::arrow::Status CheckOpenLocked() const {
+ if (closed_) {
+ return ::arrow::Status::IOError("Operation on closed FileIO input
stream");
+ }
+ return ::arrow::Status::OK();
+ }
+
+ std::unique_ptr<SeekableInputStream> input_;
+ int64_t size_;
+ bool closed_ = false;
+ mutable std::mutex mutex_;
+};
+
+/// Adapts the generic Iceberg output stream API to Arrow's OutputStream API.
+///
+/// Avro and Parquet writers in the bundle layer consume Arrow IO streams. This
+/// fallback keeps those writers usable with non-Arrow FileIO implementations
without
+/// requiring them to downcast to ArrowFileSystemFileIO.
+class OutputStreamAdapter : public ::arrow::io::OutputStream {
+ public:
+ explicit OutputStreamAdapter(std::unique_ptr<PositionOutputStream> output)
+ : output_(std::move(output)) {
+ OutputStream::set_mode(::arrow::io::FileMode::WRITE);
+ }
+
+ ::arrow::Status Close() override {
+ std::lock_guard lock(mutex_);
+ if (closed_) {
+ return ::arrow::Status::OK();
+ }
+ auto status = output_->Close();
+ if (!status.has_value()) {
+ return ToArrowStatus(status.error());
+ }
+ closed_ = true;
+ return ::arrow::Status::OK();
+ }
+
+ ::arrow::Result<int64_t> Tell() const override {
+ std::lock_guard lock(mutex_);
+ ARROW_RETURN_NOT_OK(CheckOpenLocked());
+ auto position = output_->Position();
+ if (!position.has_value()) {
+ return ToArrowStatus(position.error());
+ }
+ if (position.value() < 0) {
+ return ::arrow::Status::IOError("FileIO output stream returned negative
position");
+ }
+ return position.value();
+ }
+
+ bool closed() const override {
+ std::lock_guard lock(mutex_);
+ return closed_;
+ }
+
+ ::arrow::Status Write(const void* data, int64_t nbytes) override {
+ if (nbytes < 0) {
+ return ::arrow::Status::Invalid("Cannot write a negative number of
bytes");
+ }
+ std::lock_guard lock(mutex_);
+ ARROW_RETURN_NOT_OK(CheckOpenLocked());
+ if (nbytes == 0) {
+ return ::arrow::Status::OK();
+ }
+ auto status = output_->Write(
+ std::span(reinterpret_cast<const std::byte*>(data),
static_cast<size_t>(nbytes)));
+ if (!status.has_value()) {
+ return ToArrowStatus(status.error());
+ }
+ return ::arrow::Status::OK();
+ }
+
+ ::arrow::Status Flush() override {
+ std::lock_guard lock(mutex_);
+ ARROW_RETURN_NOT_OK(CheckOpenLocked());
+ auto status = output_->Flush();
+ if (!status.has_value()) {
+ return ToArrowStatus(status.error());
+ }
+ return ::arrow::Status::OK();
+ }
+
+ private:
+ ::arrow::Status CheckOpenLocked() const {
+ if (closed_) {
+ return ::arrow::Status::IOError("Operation on closed FileIO output
stream");
+ }
+ return ::arrow::Status::OK();
+ }
+
+ std::unique_ptr<PositionOutputStream> output_;
+ bool closed_ = false;
+ mutable std::mutex mutex_;
+};
+
+class ArrowSeekableInputStream : public SeekableInputStream {
+ public:
+ explicit
ArrowSeekableInputStream(std::shared_ptr<::arrow::io::RandomAccessFile> input)
+ : input_(std::move(input)) {}
+
+ Result<int64_t> Position() const override {
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto position, input_->Tell());
+ return position;
+ }
+
+ Status Seek(int64_t position) override {
+ ICEBERG_ARROW_RETURN_NOT_OK(input_->Seek(position));
+ return {};
+ }
+
+ Result<int64_t> Read(std::span<std::byte> out) override {
+ ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(out.size()));
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto bytes_read, input_->Read(size,
out.data()));
+ if (bytes_read < 0 || bytes_read > size) {
+ return IOError("Arrow input stream returned invalid byte count");
+ }
+ return bytes_read;
+ }
+
+ Status ReadFully(int64_t position, std::span<std::byte> out) override {
+ if (position < 0) {
+ return InvalidArgument("Cannot read from negative position {}",
position);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(out.size()));
+ if (size == 0) {
+ return {};
+ }
+ if (position > std::numeric_limits<int64_t>::max() - size) {
+ return InvalidArgument(
+ "Read range starting at {} with length {} exceeds int64_t max",
position, size);
+ }
+
+ Status read_status = {};
+ int64_t bytes_read = 0;
+ while (bytes_read < size) {
+ auto* data = out.data() + bytes_read;
+ auto remaining = size - bytes_read;
+ auto read_result = input_->ReadAt(position + bytes_read, remaining,
data);
+ if (!read_result.ok()) {
+ read_status =
+ std::unexpected<Error>{{.kind = ToErrorKind(read_result.status()),
+ .message =
read_result.status().ToString()}};
+ break;
+ }
+ auto read = read_result.ValueOrDie();
+ if (read < 0 || read > remaining) {
+ read_status = IOError("Arrow input stream returned invalid byte
count");
+ break;
+ }
+ if (read == 0) {
+ read_status =
+ IOError("Unexpected EOF reading at offset {}", position +
bytes_read);
+ break;
+ }
+ bytes_read += read;
+ }
+ return read_status;
+ }
+
+ Status Close() override {
+ if (input_->closed()) {
+ return {};
+ }
+ ICEBERG_ARROW_RETURN_NOT_OK(input_->Close());
+ return {};
+ }
+
+ private:
+ std::shared_ptr<::arrow::io::RandomAccessFile> input_;
+};
+
+class ArrowPositionOutputStream : public PositionOutputStream {
+ public:
+ explicit
ArrowPositionOutputStream(std::shared_ptr<::arrow::io::OutputStream> output)
+ : output_(std::move(output)) {}
+
+ Result<int64_t> Position() const override {
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto position, output_->Tell());
+ return position;
+ }
+
+ Status Write(std::span<const std::byte> data) override {
+ ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(data.size()));
+ ICEBERG_ARROW_RETURN_NOT_OK(output_->Write(data.data(), size));
+ return {};
+ }
+
+ Status Flush() override {
+ ICEBERG_ARROW_RETURN_NOT_OK(output_->Flush());
+ return {};
+ }
+
+ Status Close() override {
+ if (output_->closed()) {
+ return {};
+ }
+ ICEBERG_ARROW_RETURN_NOT_OK(output_->Close());
+ return {};
+ }
+
+ private:
+ std::shared_ptr<::arrow::io::OutputStream> output_;
+};
+
+class ArrowInputFile : public InputFile {
+ public:
+ ArrowInputFile(std::shared_ptr<::arrow::fs::FileSystem> fs, std::string
location,
+ std::string path, std::optional<int64_t> file_size)
+ : fs_(std::move(fs)),
+ location_(std::move(location)),
+ path_(std::move(path)),
+ file_size_(file_size) {}
+
+ std::string_view location() const override { return location_; }
+
+ Result<int64_t> Size() const override {
+ if (file_size_.has_value()) {
+ return *file_size_;
+ }
+ ::arrow::fs::FileInfo file_info(path_, ::arrow::fs::FileType::File);
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, fs_->OpenInputFile(file_info));
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto size, input->GetSize());
+ return size;
+ }
+
+ Result<std::unique_ptr<SeekableInputStream>> Open() override {
+ ::arrow::fs::FileInfo file_info(path_, ::arrow::fs::FileType::File);
+ if (file_size_.has_value()) {
+ file_info.set_size(*file_size_);
+ }
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, fs_->OpenInputFile(file_info));
+ return std::make_unique<ArrowSeekableInputStream>(std::move(input));
+ }
+
+ private:
+ std::shared_ptr<::arrow::fs::FileSystem> fs_;
+ std::string location_;
+ std::string path_;
+ std::optional<int64_t> file_size_;
+};
+
+class ArrowOutputFile : public OutputFile {
+ public:
+ ArrowOutputFile(std::shared_ptr<::arrow::fs::FileSystem> fs, std::string
location,
+ std::string path)
+ : fs_(std::move(fs)), location_(std::move(location)),
path_(std::move(path)) {}
+
+ std::string_view location() const override { return location_; }
+
+ Result<std::unique_ptr<PositionOutputStream>> Create() override {
+ return Create(/*overwrite=*/false);
+ }
+
+ Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() override {
+ return Create(/*overwrite=*/true);
+ }
+
+ private:
+ Result<std::unique_ptr<PositionOutputStream>> Create(bool overwrite) {
+ if (!overwrite) {
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto info, fs_->GetFileInfo(path_));
+ if (info.type() != ::arrow::fs::FileType::NotFound) {
+ return AlreadyExists("File already exists: {}", location_);
+ }
+ }
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, fs_->OpenOutputStream(path_));
+ return std::make_unique<ArrowPositionOutputStream>(std::move(output));
+ }
+
+ std::shared_ptr<::arrow::fs::FileSystem> fs_;
+ std::string location_;
+ std::string path_;
+};
+
+} // namespace
+
+Result<std::string> ArrowFileSystemFileIO::ResolvePath(const std::string&
file_location) {
+ if (file_location.find("://") != std::string::npos) {
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto path,
arrow_fs_->PathFromUri(file_location));
+ return path;
+ }
+ return file_location;
+}
+
+Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenArrowInputStream(
+ const std::shared_ptr<FileIO>& io, const std::string& path,
+ std::optional<size_t> length) {
+ ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+
+ if (auto arrow_io = std::dynamic_pointer_cast<ArrowFileSystemFileIO>(io)) {
+ ICEBERG_ASSIGN_OR_RAISE(auto resolved_path, arrow_io->ResolvePath(path));
+ ::arrow::fs::FileInfo file_info(resolved_path,
::arrow::fs::FileType::File);
+ if (length.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(*length));
+ file_info.set_size(size);
+ }
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input,
+
arrow_io->arrow_fs_->OpenInputFile(file_info));
+ return input;
+ }
+
+ int64_t size;
+ std::unique_ptr<InputFile> input_file;
+ if (length.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path, *length));
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(input_file, io->NewInputFile(path));
+ }
+ ICEBERG_ASSIGN_OR_RAISE(size, input_file->Size());
+ if (size < 0) {
+ return Invalid("Invalid negative file size {} for {}", size, path);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto input, input_file->Open());
+ return std::make_shared<InputStreamAdapter>(std::move(input), size);
+}
+
+Result<std::shared_ptr<::arrow::io::OutputStream>> OpenArrowOutputStream(
+ const std::shared_ptr<FileIO>& io, const std::string& path, bool
overwrite) {
+ ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+
+ if (auto arrow_io = std::dynamic_pointer_cast<ArrowFileSystemFileIO>(io)) {
+ ICEBERG_ASSIGN_OR_RAISE(auto resolved_path, arrow_io->ResolvePath(path));
+ if (!overwrite) {
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto info,
+
arrow_io->arrow_fs_->GetFileInfo(resolved_path));
+ if (info.type() != ::arrow::fs::FileType::NotFound) {
+ return AlreadyExists("File already exists: {}", path);
+ }
+ }
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output,
+
arrow_io->arrow_fs_->OpenOutputStream(resolved_path));
+ return output;
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto output_file, io->NewOutputFile(path));
+ std::unique_ptr<PositionOutputStream> output;
+ if (overwrite) {
+ ICEBERG_ASSIGN_OR_RAISE(output, output_file->CreateOrOverwrite());
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(output, output_file->Create());
+ }
+ return std::make_shared<OutputStreamAdapter>(std::move(output));
+}
+
+Result<std::unique_ptr<InputFile>> ArrowFileSystemFileIO::NewInputFile(
+ std::string file_location) {
+ ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
+ return std::make_unique<ArrowInputFile>(arrow_fs_, std::move(file_location),
+ std::move(path), std::nullopt);
+}
+
+Result<std::unique_ptr<InputFile>> ArrowFileSystemFileIO::NewInputFile(
+ std::string file_location, size_t length) {
+ ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(length));
+ ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
+ return std::make_unique<ArrowInputFile>(arrow_fs_, std::move(file_location),
+ std::move(path), size);
+}
+
+Result<std::unique_ptr<OutputFile>> ArrowFileSystemFileIO::NewOutputFile(
+ std::string file_location) {
+ ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
+ return std::make_unique<ArrowOutputFile>(arrow_fs_, std::move(file_location),
+ std::move(path));
+}
+
+/// \brief Delete a file at the given location.
+Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) {
+ ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
+ ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(path));
+ return {};
+}
+
+std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeMockFileIO() {
+ return std::make_unique<ArrowFileSystemFileIO>(
+ std::make_shared<::arrow::fs::internal::MockFileSystem>(
+ std::chrono::system_clock::now()));
+}
+
+std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeLocalFileIO() {
+ return std::make_unique<ArrowFileSystemFileIO>(
+ std::make_shared<::arrow::fs::LocalFileSystem>());
+}
+
+std::unique_ptr<FileIO> MakeMockFileIO() {
+ return ArrowFileSystemFileIO::MakeMockFileIO();
+}
+
+std::unique_ptr<FileIO> MakeLocalFileIO() {
+ return ArrowFileSystemFileIO::MakeLocalFileIO();
+}
+
+} // namespace iceberg::arrow
diff --git a/src/iceberg/arrow/arrow_fs_file_io_internal.h
b/src/iceberg/arrow/arrow_io_internal.h
similarity index 50%
rename from src/iceberg/arrow/arrow_fs_file_io_internal.h
rename to src/iceberg/arrow/arrow_io_internal.h
index 92a99150..4f170a8a 100644
--- a/src/iceberg/arrow/arrow_fs_file_io_internal.h
+++ b/src/iceberg/arrow/arrow_io_internal.h
@@ -19,15 +19,37 @@
#pragma once
+#include <cstddef>
#include <memory>
+#include <optional>
+#include <string>
-#include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/type_fwd.h>
+#include <arrow/io/type_fwd.h>
#include "iceberg/file_io.h"
#include "iceberg/iceberg_bundle_export.h"
namespace iceberg::arrow {
+/// \brief Open a FileIO input as an Arrow input stream.
+///
+/// Uses ArrowFileSystemFileIO's native Arrow stream directly when possible
and falls
+/// back to a FileIO stream adapter otherwise. The fallback requires FileIO to
+/// implement NewInputFile.
+ICEBERG_BUNDLE_EXPORT Result<std::shared_ptr<::arrow::io::RandomAccessFile>>
+OpenArrowInputStream(const std::shared_ptr<FileIO>& io, const std::string&
path,
+ std::optional<size_t> length = std::nullopt);
+
+/// \brief Open a FileIO output as an Arrow output stream.
+///
+/// Uses ArrowFileSystemFileIO's native Arrow stream directly when possible
and falls
+/// back to a FileIO stream adapter otherwise. The fallback requires FileIO to
+/// implement NewOutputFile.
+ICEBERG_BUNDLE_EXPORT Result<std::shared_ptr<::arrow::io::OutputStream>>
+OpenArrowOutputStream(const std::shared_ptr<FileIO>& io, const std::string&
path,
+ bool overwrite = true);
+
/// \brief A concrete implementation of FileIO for Arrow file system.
class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
public:
@@ -42,12 +64,15 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public
FileIO {
~ArrowFileSystemFileIO() override = default;
- /// \brief Read the content of the file at the given location.
- Result<std::string> ReadFile(const std::string& file_location,
- std::optional<size_t> length) override;
+ /// \brief Create an input file handle for the given location.
+ Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location)
override;
+
+ /// \brief Create an input file handle for the given location with a known
length.
+ Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location,
+ size_t length) override;
- /// \brief Write the given content to the file at the given location.
- Status WriteFile(const std::string& file_location, std::string_view content)
override;
+ /// \brief Create an output file handle for the given location.
+ Result<std::unique_ptr<OutputFile>> NewOutputFile(std::string file_location)
override;
/// \brief Delete a file at the given location.
Status DeleteFile(const std::string& file_location) override;
@@ -56,6 +81,13 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public
FileIO {
const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return
arrow_fs_; }
private:
+ friend Result<std::shared_ptr<::arrow::io::RandomAccessFile>>
OpenArrowInputStream(
+ const std::shared_ptr<FileIO>& io, const std::string& path,
+ std::optional<size_t> length);
+
+ friend Result<std::shared_ptr<::arrow::io::OutputStream>>
OpenArrowOutputStream(
+ const std::shared_ptr<FileIO>& io, const std::string& path, bool
overwrite);
+
/// \brief Resolve a file location to a filesystem path.
Result<std::string> ResolvePath(const std::string& file_location);
diff --git a/src/iceberg/arrow/file_io_register.cc
b/src/iceberg/arrow/arrow_io_register.cc
similarity index 95%
rename from src/iceberg/arrow/file_io_register.cc
rename to src/iceberg/arrow/arrow_io_register.cc
index 1140e49b..43273c0a 100644
--- a/src/iceberg/arrow/file_io_register.cc
+++ b/src/iceberg/arrow/arrow_io_register.cc
@@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-#include "iceberg/arrow/file_io_register.h"
+#include "iceberg/arrow/arrow_io_register.h"
#include <mutex>
#include <string>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/file_io_registry.h"
namespace iceberg::arrow {
diff --git a/src/iceberg/arrow/file_io_register.h
b/src/iceberg/arrow/arrow_io_register.h
similarity index 96%
rename from src/iceberg/arrow/file_io_register.h
rename to src/iceberg/arrow/arrow_io_register.h
index 1b4622bd..f28b7a56 100644
--- a/src/iceberg/arrow/file_io_register.h
+++ b/src/iceberg/arrow/arrow_io_register.h
@@ -19,7 +19,7 @@
#pragma once
-/// \file iceberg/arrow/file_io_register.h
+/// \file iceberg/arrow/arrow_io_register.h
/// \brief Provide functions to register Arrow FileIO implementations.
#include "iceberg/iceberg_bundle_export.h"
diff --git a/src/iceberg/arrow/arrow_file_io.h
b/src/iceberg/arrow/arrow_io_util.h
similarity index 100%
rename from src/iceberg/arrow/arrow_file_io.h
rename to src/iceberg/arrow/arrow_io_util.h
diff --git a/src/iceberg/arrow/s3/arrow_s3_file_io.cc
b/src/iceberg/arrow/s3/arrow_s3_file_io.cc
index 808415d0..cffd9584 100644
--- a/src/iceberg/arrow/s3/arrow_s3_file_io.cc
+++ b/src/iceberg/arrow/s3/arrow_s3_file_io.cc
@@ -27,8 +27,8 @@
# include <arrow/filesystem/s3fs.h>
#endif
-#include "iceberg/arrow/arrow_file_io.h"
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/arrow/s3/s3_properties.h"
#include "iceberg/util/macros.h"
diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc
index f4985d9a..1d431c46 100644
--- a/src/iceberg/avro/avro_reader.cc
+++ b/src/iceberg/avro/avro_reader.cc
@@ -31,7 +31,7 @@
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/arrow/metadata_column_util_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
@@ -42,7 +42,6 @@
#include "iceberg/metadata_columns.h"
#include "iceberg/name_mapping.h"
#include "iceberg/schema_internal.h"
-#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
namespace iceberg::avro {
@@ -51,13 +50,8 @@ namespace {
Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const
ReaderOptions& options,
int64_t
buffer_size) {
- ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
- if (options.length) {
- file_info.set_size(options.length.value());
- }
-
- auto io =
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
- ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file,
io->fs()->OpenInputFile(file_info));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto file, arrow::OpenArrowInputStream(options.io, options.path,
options.length));
return std::make_unique<AvroInputStream>(file, buffer_size);
}
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index 32ce3f63..63fc3146 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -29,7 +29,7 @@
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_direct_encoder_internal.h"
@@ -40,7 +40,6 @@
#include "iceberg/metrics_config.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
-#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
namespace iceberg::avro {
@@ -49,8 +48,8 @@ namespace {
Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const
WriterOptions& options,
int64_t
buffer_size) {
- auto io =
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
- ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output,
io->fs()->OpenOutputStream(options.path));
+ ICEBERG_ASSIGN_OR_RAISE(auto output,
+ arrow::OpenArrowOutputStream(options.io,
options.path));
return std::make_unique<AvroOutputStream>(output, buffer_size);
}
diff --git a/src/iceberg/file_io.cc b/src/iceberg/file_io.cc
new file mode 100644
index 00000000..d76ffeb6
--- /dev/null
+++ b/src/iceberg/file_io.cc
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/file_io.h"
+
+#include <limits>
+#include <utility>
+
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+Status FinishWithCloseStatus(Status operation_status, Status close_status) {
+ if (!operation_status.has_value()) {
+ auto error = operation_status.error();
+ if (!close_status.has_value()) {
+ error.message += "; additionally failed to close stream: ";
+ error.message += close_status.error().message;
+ }
+ return std::unexpected<Error>(std::move(error));
+ }
+ return close_status;
+}
+
+} // namespace
+
+Result<std::unique_ptr<InputFile>> FileIO::NewInputFile(std::string
file_location) {
+ return NotImplemented("NewInputFile not implemented for {}", file_location);
+}
+
+Result<std::unique_ptr<InputFile>> FileIO::NewInputFile(std::string
file_location,
+ size_t /*length*/) {
+ return NewInputFile(std::move(file_location));
+}
+
+Result<std::unique_ptr<OutputFile>> FileIO::NewOutputFile(std::string
file_location) {
+ return NotImplemented("NewOutputFile not implemented for {}", file_location);
+}
+
+Result<std::string> FileIO::ReadFile(const std::string& file_location,
+ std::optional<size_t> length) {
+ int64_t read_size;
+ std::unique_ptr<InputFile> input_file;
+ if (length.has_value()) {
+ if (*length > static_cast<size_t>(std::numeric_limits<int64_t>::max())) {
+ return InvalidArgument("Requested read length {} exceeds int64_t max",
*length);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(input_file, NewInputFile(file_location, *length));
+ read_size = static_cast<int64_t>(*length);
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(input_file, NewInputFile(file_location));
+ ICEBERG_ASSIGN_OR_RAISE(read_size, input_file->Size());
+ }
+ if (read_size < 0) {
+ return Invalid("Invalid negative file size {} for {}", read_size,
file_location);
+ }
+
+ auto size = static_cast<size_t>(read_size);
+ std::string content(size, '\0');
+ ICEBERG_ASSIGN_OR_RAISE(auto stream, input_file->Open());
+ Status read_status = {};
+ if (size > 0) {
+ auto bytes = std::as_writable_bytes(std::span(content.data(),
content.size()));
+ read_status = stream->ReadFully(/*position=*/0, bytes);
+ }
+ ICEBERG_RETURN_UNEXPECTED(
+ FinishWithCloseStatus(std::move(read_status), stream->Close()));
+ return content;
+}
+
+Status FileIO::WriteFile(const std::string& file_location, std::string_view
content) {
+ ICEBERG_ASSIGN_OR_RAISE(auto output_file, NewOutputFile(file_location));
+ ICEBERG_ASSIGN_OR_RAISE(auto stream, output_file->CreateOrOverwrite());
+ Status status = {};
+ if (!content.empty()) {
+ auto bytes = std::as_bytes(std::span(content.data(), content.size()));
+ status = stream->Write(bytes);
+ }
+ if (status.has_value()) {
+ status = stream->Flush();
+ }
+ return FinishWithCloseStatus(std::move(status), stream->Close());
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h
index 259da755..e772b533 100644
--- a/src/iceberg/file_io.h
+++ b/src/iceberg/file_io.h
@@ -19,7 +19,11 @@
#pragma once
+#include <cstddef>
+#include <cstdint>
+#include <memory>
#include <optional>
+#include <span>
#include <string>
#include <string_view>
@@ -28,11 +32,82 @@
namespace iceberg {
+/// \brief Seekable byte stream for reading file contents.
+class ICEBERG_EXPORT SeekableInputStream {
+ public:
+ virtual ~SeekableInputStream() = default;
+
+ /// \brief Return the current read position.
+ virtual Result<int64_t> Position() const = 0;
+
+ /// \brief Seek to an absolute byte position.
+ virtual Status Seek(int64_t position) = 0;
+
+ /// \brief Read up to out.size() bytes from the current position.
+ virtual Result<int64_t> Read(std::span<std::byte> out) = 0;
+
+ /// \brief Read exactly out.size() bytes from an absolute position.
+ ///
+ /// Fails if fewer than out.size() bytes are available. The current stream
position
+ /// after this call is unspecified; callers should Seek before subsequent
+ /// position-dependent reads.
+ virtual Status ReadFully(int64_t position, std::span<std::byte> out) = 0;
+
+ /// \brief Close the stream. Implementations should allow repeated Close
calls.
+ virtual Status Close() = 0;
+};
+
+/// \brief Positioned byte stream for writing file contents.
+class ICEBERG_EXPORT PositionOutputStream {
+ public:
+ virtual ~PositionOutputStream() = default;
+
+ /// \brief Return the current write position.
+ virtual Result<int64_t> Position() const = 0;
+
+ /// \brief Write all bytes in data at the current position.
+ virtual Status Write(std::span<const std::byte> data) = 0;
+
+ /// \brief Flush buffered data to the underlying store.
+ virtual Status Flush() = 0;
+
+ /// \brief Close the stream. Implementations should allow repeated Close
calls.
+ virtual Status Close() = 0;
+};
+
+/// \brief Handle for opening a readable file.
+class ICEBERG_EXPORT InputFile {
+ public:
+ virtual ~InputFile() = default;
+
+ /// \brief File location represented by this handle.
+ virtual std::string_view location() const = 0;
+
+ /// \brief Return the total file size in bytes.
+ virtual Result<int64_t> Size() const = 0;
+
+ /// \brief Open a new independent input stream.
+ virtual Result<std::unique_ptr<SeekableInputStream>> Open() = 0;
+};
+
+/// \brief Handle for creating a writable file.
+class ICEBERG_EXPORT OutputFile {
+ public:
+ virtual ~OutputFile() = default;
+
+ /// \brief File location represented by this handle.
+ virtual std::string_view location() const = 0;
+
+ /// \brief Create a new output stream and fail if the file already exists.
+ virtual Result<std::unique_ptr<PositionOutputStream>> Create() = 0;
+
+ /// \brief Create a new output stream, replacing any existing file.
+ virtual Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() =
0;
+};
+
/// \brief Pluggable module for reading, writing, and deleting files.
///
-/// This module only handle metadata files, not data files. The metadata files
-/// are typically small and are used to store schema, partition information,
-/// and other metadata about the table.
+/// This module handles metadata and data file bytes for table IO.
///
/// Note that these functions are not atomic. For example, if a write fails,
/// the file may be partially written. Implementations should be careful to
@@ -42,6 +117,19 @@ class ICEBERG_EXPORT FileIO {
FileIO() = default;
virtual ~FileIO() = default;
+ /// \brief Create an input file handle for the given location.
+ virtual Result<std::unique_ptr<InputFile>> NewInputFile(std::string
file_location);
+
+ /// \brief Create an input file handle for the given location with a known
length.
+ ///
+ /// The length is a caller-provided content length hint. Implementations may
use it to
+ /// avoid an extra metadata lookup.
+ virtual Result<std::unique_ptr<InputFile>> NewInputFile(std::string
file_location,
+ size_t length);
+
+ /// \brief Create an output file handle for the given location.
+ virtual Result<std::unique_ptr<OutputFile>> NewOutputFile(std::string
file_location);
+
/// \brief Read the content of the file at the given location.
///
/// \param file_location The location of the file to read.
@@ -50,21 +138,14 @@ class ICEBERG_EXPORT FileIO {
/// \return The content of the file if the read succeeded, an error code if
the read
/// failed.
virtual Result<std::string> ReadFile(const std::string& file_location,
- std::optional<size_t> length) {
- // We provide a default implementation to avoid Windows linker error
LNK2019.
- return NotImplemented("ReadFile not implemented");
- }
+ std::optional<size_t> length);
/// \brief Write the given content to the file at the given location.
///
/// \param file_location The location of the file to write.
/// \param content The content to write to the file.
- /// \param overwrite If true, overwrite the file if it exists. If false,
fail if the
- /// file exists.
/// \return void if the write succeeded, an error code if the write failed.
- virtual Status WriteFile(const std::string& file_location, std::string_view
content) {
- return NotImplemented("WriteFile not implemented");
- }
+ virtual Status WriteFile(const std::string& file_location, std::string_view
content);
/// \brief Delete a file at the given location.
///
diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h
index 923ac6bd..c31d9b29 100644
--- a/src/iceberg/file_reader.h
+++ b/src/iceberg/file_reader.h
@@ -95,9 +95,7 @@ struct ICEBERG_EXPORT ReaderOptions {
std::optional<size_t> length;
/// \brief The split to read.
std::optional<Split> split;
- /// \brief FileIO instance to open the file. Reader implementations should
down cast it
- /// to the specific FileIO implementation. By default, the `iceberg-bundle`
library uses
- /// `ArrowFileSystemFileIO` as the default implementation.
+ /// \brief FileIO instance to open the file.
std::shared_ptr<class FileIO> io;
/// \brief The projection schema to read from the file. This field is
required.
std::shared_ptr<class Schema> projection;
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index f3352d8f..a49b5228 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -73,9 +73,7 @@ struct ICEBERG_EXPORT WriterOptions {
std::string path;
/// \brief The schema of the data to write.
std::shared_ptr<Schema> schema;
- /// \brief FileIO instance to open the file. Writer implementations should
down cast it
- /// to the specific FileIO implementation. By default, the `iceberg-bundle`
library uses
- /// `ArrowFileSystemFileIO` as the default implementation.
+ /// \brief FileIO instance to create the file.
std::shared_ptr<class FileIO> io;
/// \brief Metadata to write to the file.
std::unordered_map<std::string, std::string> metadata;
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 41a5c2dd..c2947f3f 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -58,6 +58,7 @@ iceberg_sources = files(
'expression/rewrite_not.cc',
'expression/strict_metrics_evaluator.cc',
'expression/term.cc',
+ 'file_io.cc',
'file_io_registry.cc',
'file_reader.cc',
'file_writer.cc',
diff --git a/src/iceberg/parquet/parquet_reader.cc
b/src/iceberg/parquet/parquet_reader.cc
index 0e2808f5..775644a9 100644
--- a/src/iceberg/parquet/parquet_reader.cc
+++ b/src/iceberg/parquet/parquet_reader.cc
@@ -32,7 +32,7 @@
#include <parquet/file_reader.h>
#include <parquet/properties.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/arrow/metadata_column_util_internal.h"
#include "iceberg/parquet/parquet_data_util_internal.h"
@@ -41,7 +41,6 @@
#include "iceberg/result.h"
#include "iceberg/schema_internal.h"
#include "iceberg/schema_util.h"
-#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
namespace iceberg::parquet {
@@ -50,14 +49,7 @@ namespace {
Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenInputStream(
const ReaderOptions& options) {
- ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
- if (options.length) {
- file_info.set_size(options.length.value());
- }
-
- auto io =
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
- ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input,
io->fs()->OpenInputFile(file_info));
- return input;
+ return arrow::OpenArrowInputStream(options.io, options.path, options.length);
}
Result<SchemaProjection> BuildProjection(::parquet::arrow::FileReader* reader,
diff --git a/src/iceberg/parquet/parquet_writer.cc
b/src/iceberg/parquet/parquet_writer.cc
index a68e9e61..7e2d3d15 100644
--- a/src/iceberg/parquet/parquet_writer.cc
+++ b/src/iceberg/parquet/parquet_writer.cc
@@ -29,10 +29,9 @@
#include <parquet/file_writer.h>
#include <parquet/properties.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/schema_internal.h"
-#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
namespace iceberg::parquet {
@@ -41,9 +40,7 @@ namespace {
Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
const WriterOptions& options) {
- auto io =
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
- ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output,
io->fs()->OpenOutputStream(options.path));
- return output;
+ return arrow::OpenArrowOutputStream(options.io, options.path);
}
Result<::arrow::Compression::type> ParseCompression(const WriterProperties&
properties) {
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 6b98951a..1d80b29a 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -156,7 +156,7 @@ if(ICEBERG_BUILD_BUNDLE)
add_iceberg_test(arrow_test
USE_BUNDLE
SOURCES
- arrow_fs_file_io_test.cc
+ arrow_io_test.cc
arrow_test.cc
gzip_decompress_test.cc
metadata_io_test.cc
diff --git a/src/iceberg/test/arrow_fs_file_io_test.cc
b/src/iceberg/test/arrow_fs_file_io_test.cc
deleted file mode 100644
index eacda2f7..00000000
--- a/src/iceberg/test/arrow_fs_file_io_test.cc
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <arrow/filesystem/localfs.h>
-#include <gtest/gtest.h>
-
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/test/matchers.h"
-#include "iceberg/test/temp_file_test_base.h"
-
-namespace iceberg {
-
-class LocalFileIOTest : public TempFileTestBase {
- protected:
- void SetUp() override {
- TempFileTestBase::SetUp();
- file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
- std::make_shared<::arrow::fs::LocalFileSystem>());
- temp_filepath_ = CreateNewTempFilePath();
- }
-
- std::shared_ptr<iceberg::FileIO> file_io_;
- std::string temp_filepath_;
-};
-
-TEST_F(LocalFileIOTest, ReadWriteFile) {
- auto read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
- EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
- EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file"));
-
- auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
- EXPECT_THAT(write_res, IsOk());
-
- read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
- EXPECT_THAT(read_res, IsOk());
- EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
-}
-
-TEST_F(LocalFileIOTest, DeleteFile) {
- auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
- EXPECT_THAT(write_res, IsOk());
-
- auto del_res = file_io_->DeleteFile(temp_filepath_);
- EXPECT_THAT(del_res, IsOk());
-
- del_res = file_io_->DeleteFile(temp_filepath_);
- EXPECT_THAT(del_res, IsError(ErrorKind::kIOError));
- EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
-}
-
-} // namespace iceberg
diff --git a/src/iceberg/test/arrow_io_test.cc
b/src/iceberg/test/arrow_io_test.cc
new file mode 100644
index 00000000..0c885d07
--- /dev/null
+++ b/src/iceberg/test/arrow_io_test.cc
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <algorithm>
+#include <array>
+#include <memory>
+#include <string>
+
+#include <arrow/filesystem/localfs.h>
+#include <arrow/result.h>
+#include <arrow/status.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/std_io.h"
+#include "iceberg/test/temp_file_test_base.h"
+
+namespace iceberg {
+
+namespace {
+
+struct CloseState {
+ bool closed = false;
+};
+
+class ReadFailureInputStream : public SeekableInputStream {
+ public:
+ explicit ReadFailureInputStream(std::shared_ptr<CloseState> state)
+ : state_(std::move(state)) {}
+
+ Result<int64_t> Position() const override { return 0; }
+
+ Status Seek(int64_t /*position*/) override { return {}; }
+
+ Result<int64_t> Read(std::span<std::byte> /*out*/) override { return 0; }
+
+ Status ReadFully(int64_t /*position*/, std::span<std::byte> /*out*/)
override {
+ return IOError("read failed");
+ }
+
+ Status Close() override {
+ state_->closed = true;
+ return IOError("close failed");
+ }
+
+ private:
+ std::shared_ptr<CloseState> state_;
+};
+
+class ReadFailureInputFile : public InputFile {
+ public:
+ explicit ReadFailureInputFile(std::shared_ptr<CloseState> state)
+ : state_(std::move(state)) {}
+
+ std::string_view location() const override { return "read-failure"; }
+
+ Result<int64_t> Size() const override { return 4; }
+
+ Result<std::unique_ptr<SeekableInputStream>> Open() override {
+ return std::make_unique<ReadFailureInputStream>(state_);
+ }
+
+ private:
+ std::shared_ptr<CloseState> state_;
+};
+
+class ReadFailureFileIO : public FileIO {
+ public:
+ explicit ReadFailureFileIO(std::shared_ptr<CloseState> state)
+ : state_(std::move(state)) {}
+
+ Result<std::unique_ptr<InputFile>> NewInputFile(
+ std::string /*file_location*/) override {
+ return std::make_unique<ReadFailureInputFile>(state_);
+ }
+
+ private:
+ std::shared_ptr<CloseState> state_;
+};
+
+class WriteFailureOutputStream : public PositionOutputStream {
+ public:
+ explicit WriteFailureOutputStream(std::shared_ptr<CloseState> state)
+ : state_(std::move(state)) {}
+
+ Result<int64_t> Position() const override { return 0; }
+
+ Status Write(std::span<const std::byte> /*data*/) override {
+ return IOError("write failed");
+ }
+
+ Status Flush() override { return {}; }
+
+ Status Close() override {
+ state_->closed = true;
+ return IOError("close failed");
+ }
+
+ private:
+ std::shared_ptr<CloseState> state_;
+};
+
+class WriteFailureOutputFile : public OutputFile {
+ public:
+ explicit WriteFailureOutputFile(std::shared_ptr<CloseState> state)
+ : state_(std::move(state)) {}
+
+ std::string_view location() const override { return "write-failure"; }
+
+ Result<std::unique_ptr<PositionOutputStream>> Create() override {
+ return std::make_unique<WriteFailureOutputStream>(state_);
+ }
+
+ Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() override {
+ return std::make_unique<WriteFailureOutputStream>(state_);
+ }
+
+ private:
+ std::shared_ptr<CloseState> state_;
+};
+
+class WriteFailureFileIO : public FileIO {
+ public:
+ explicit WriteFailureFileIO(std::shared_ptr<CloseState> state)
+ : state_(std::move(state)) {}
+
+ Result<std::unique_ptr<OutputFile>> NewOutputFile(
+ std::string /*file_location*/) override {
+ return std::make_unique<WriteFailureOutputFile>(state_);
+ }
+
+ private:
+ std::shared_ptr<CloseState> state_;
+};
+
+struct PermissiveReadState {
+ std::string data;
+ bool closed = false;
+ int64_t position = 0;
+};
+
+class PermissiveInputStream : public SeekableInputStream {
+ public:
+ explicit PermissiveInputStream(std::shared_ptr<PermissiveReadState> state)
+ : state_(std::move(state)) {}
+
+ Result<int64_t> Position() const override { return state_->position; }
+
+ Status Seek(int64_t position) override {
+ if (position < 0) {
+ return InvalidArgument("Cannot seek to negative position {}", position);
+ }
+ state_->position = position;
+ return {};
+ }
+
+ Result<int64_t> Read(std::span<std::byte> out) override {
+ auto position = static_cast<size_t>(state_->position);
+ if (position >= state_->data.size()) {
+ return 0;
+ }
+ auto bytes_to_read = std::min(out.size(), state_->data.size() - position);
+ std::copy_n(reinterpret_cast<const std::byte*>(state_->data.data() +
position),
+ bytes_to_read, out.data());
+ state_->position += static_cast<int64_t>(bytes_to_read);
+ return static_cast<int64_t>(bytes_to_read);
+ }
+
+ Status ReadFully(int64_t position, std::span<std::byte> out) override {
+ if (position < 0) {
+ return InvalidArgument("Cannot read from negative position {}",
position);
+ }
+ auto offset = static_cast<size_t>(position);
+ if (offset > state_->data.size() || out.size() > state_->data.size() -
offset) {
+ return IOError("Unexpected EOF");
+ }
+ std::copy_n(reinterpret_cast<const std::byte*>(state_->data.data() +
offset),
+ out.size(), out.data());
+ return {};
+ }
+
+ Status Close() override {
+ state_->closed = true;
+ return {};
+ }
+
+ private:
+ std::shared_ptr<PermissiveReadState> state_;
+};
+
+class PermissiveInputFile : public InputFile {
+ public:
+ explicit PermissiveInputFile(std::shared_ptr<PermissiveReadState> state)
+ : state_(std::move(state)) {}
+
+ std::string_view location() const override { return "permissive-input"; }
+
+ Result<int64_t> Size() const override {
+ return static_cast<int64_t>(state_->data.size());
+ }
+
+ Result<std::unique_ptr<SeekableInputStream>> Open() override {
+ return std::make_unique<PermissiveInputStream>(state_);
+ }
+
+ private:
+ std::shared_ptr<PermissiveReadState> state_;
+};
+
+class PermissiveInputFileIO : public FileIO {
+ public:
+ explicit PermissiveInputFileIO(std::shared_ptr<PermissiveReadState> state)
+ : state_(std::move(state)) {}
+
+ Result<std::unique_ptr<InputFile>> NewInputFile(
+ std::string /*file_location*/) override {
+ return std::make_unique<PermissiveInputFile>(state_);
+ }
+
+ private:
+ std::shared_ptr<PermissiveReadState> state_;
+};
+
+struct PermissiveWriteState {
+ std::string data;
+ bool closed = false;
+};
+
+class PermissiveOutputStream : public PositionOutputStream {
+ public:
+ explicit PermissiveOutputStream(std::shared_ptr<PermissiveWriteState> state)
+ : state_(std::move(state)) {}
+
+ Result<int64_t> Position() const override {
+ return static_cast<int64_t>(state_->data.size());
+ }
+
+ Status Write(std::span<const std::byte> data) override {
+ state_->data.append(reinterpret_cast<const char*>(data.data()),
data.size());
+ return {};
+ }
+
+ Status Flush() override { return {}; }
+
+ Status Close() override {
+ state_->closed = true;
+ return {};
+ }
+
+ private:
+ std::shared_ptr<PermissiveWriteState> state_;
+};
+
+class PermissiveOutputFile : public OutputFile {
+ public:
+ explicit PermissiveOutputFile(std::shared_ptr<PermissiveWriteState> state)
+ : state_(std::move(state)) {}
+
+ std::string_view location() const override { return "permissive-output"; }
+
+ Result<std::unique_ptr<PositionOutputStream>> Create() override {
+ return std::make_unique<PermissiveOutputStream>(state_);
+ }
+
+ Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() override {
+ return std::make_unique<PermissiveOutputStream>(state_);
+ }
+
+ private:
+ std::shared_ptr<PermissiveWriteState> state_;
+};
+
+class PermissiveOutputFileIO : public FileIO {
+ public:
+ explicit PermissiveOutputFileIO(std::shared_ptr<PermissiveWriteState> state)
+ : state_(std::move(state)) {}
+
+ Result<std::unique_ptr<OutputFile>> NewOutputFile(
+ std::string /*file_location*/) override {
+ return std::make_unique<PermissiveOutputFile>(state_);
+ }
+
+ private:
+ std::shared_ptr<PermissiveWriteState> state_;
+};
+
+} // namespace
+
+class LocalFileIOTest : public TempFileTestBase {
+ protected:
+ void SetUp() override {
+ TempFileTestBase::SetUp();
+ file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
+ std::make_shared<::arrow::fs::LocalFileSystem>());
+ temp_filepath_ = CreateNewTempFilePath();
+ }
+
+ std::shared_ptr<iceberg::FileIO> file_io_;
+ std::string temp_filepath_;
+};
+
+TEST_F(LocalFileIOTest, ReadWriteFile) {
+ auto read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
+ EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
+ EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file"));
+
+ auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
+ EXPECT_THAT(write_res, IsOk());
+
+ read_res = file_io_->ReadFile(temp_filepath_, std::nullopt);
+ EXPECT_THAT(read_res, IsOk());
+ EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
+}
+
+TEST_F(LocalFileIOTest, DeleteFile) {
+ auto write_res = file_io_->WriteFile(temp_filepath_, "hello world");
+ EXPECT_THAT(write_res, IsOk());
+
+ auto del_res = file_io_->DeleteFile(temp_filepath_);
+ EXPECT_THAT(del_res, IsOk());
+
+ del_res = file_io_->DeleteFile(temp_filepath_);
+ EXPECT_THAT(del_res, IsError(ErrorKind::kIOError));
+ EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
+}
+
+void VerifyReadFullyReadsFromAbsolutePosition(const std::shared_ptr<FileIO>&
file_io,
+ const std::string& path) {
+ ASSERT_THAT(file_io->WriteFile(path, "abcdef"), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto input_file, file_io->NewInputFile(path));
+ ICEBERG_UNWRAP_OR_FAIL(auto stream, input_file->Open());
+ ASSERT_THAT(stream->Seek(5), IsOk());
+
+ std::array<std::byte, 2> buffer;
+ ASSERT_THAT(stream->ReadFully(1, buffer), IsOk());
+
+ std::string data(reinterpret_cast<const char*>(buffer.data()),
buffer.size());
+ EXPECT_EQ(data, "bc");
+
+ ASSERT_THAT(stream->Seek(5), IsOk());
+ std::array<std::byte, 1> next;
+ ICEBERG_UNWRAP_OR_FAIL(auto bytes_read, stream->Read(next));
+ ASSERT_EQ(bytes_read, 1);
+ EXPECT_EQ(next[0], std::byte{'f'});
+}
+
+TEST_F(LocalFileIOTest, ReadFullyReadsFromAbsolutePosition) {
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyReadFullyReadsFromAbsolutePosition(file_io_, temp_filepath_));
+}
+
+TEST_F(LocalFileIOTest, StdReadFullyReadsFromAbsolutePosition) {
+ auto file_io = std::make_shared<test::StdFileIO>();
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyReadFullyReadsFromAbsolutePosition(file_io, temp_filepath_));
+}
+
+TEST_F(LocalFileIOTest, StdReadKeepsPositionAvailableAtEof) {
+ auto file_io = std::make_shared<test::StdFileIO>();
+ ASSERT_THAT(file_io->WriteFile(temp_filepath_, "abc"), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto input_file,
file_io->NewInputFile(temp_filepath_));
+ ICEBERG_UNWRAP_OR_FAIL(auto stream, input_file->Open());
+
+ std::array<std::byte, 8> buffer;
+ ICEBERG_UNWRAP_OR_FAIL(auto bytes_read, stream->Read(buffer));
+ EXPECT_EQ(bytes_read, 3);
+ EXPECT_THAT(stream->Position(), HasValue(::testing::Eq(3)));
+
+ ICEBERG_UNWRAP_OR_FAIL(bytes_read, stream->Read(buffer));
+ EXPECT_EQ(bytes_read, 0);
+ EXPECT_THAT(stream->Position(), HasValue(::testing::Eq(3)));
+}
+
+TEST(FileIOAdapterTest, InputAdapterRejectsReadsAfterClose) {
+ auto state = std::make_shared<PermissiveReadState>();
+ state->data = "abc";
+ auto file_io = std::make_shared<PermissiveInputFileIO>(state);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto input, arrow::OpenArrowInputStream(file_io,
"input"));
+ ASSERT_TRUE(input->Close().ok());
+ ASSERT_TRUE(input->Close().ok());
+ ASSERT_TRUE(state->closed);
+
+ std::array<std::byte, 1> out;
+ auto result = input->Read(static_cast<int64_t>(out.size()), out.data());
+ auto read_at_result = input->ReadAt(0, static_cast<int64_t>(out.size()),
out.data());
+
+ EXPECT_FALSE(result.ok());
+ EXPECT_THAT(result.status().ToString(), ::testing::HasSubstr("closed"));
+ EXPECT_FALSE(read_at_result.ok());
+ EXPECT_THAT(read_at_result.status().ToString(),
::testing::HasSubstr("closed"));
+ EXPECT_EQ(state->position, 0);
+}
+
+TEST(FileIOAdapterTest, InputAdapterRejectsReadAtBeyondKnownSize) {
+ auto state = std::make_shared<PermissiveReadState>();
+ state->data = "abc";
+ auto file_io = std::make_shared<PermissiveInputFileIO>(state);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto input, arrow::OpenArrowInputStream(file_io,
"input"));
+
+ std::array<std::byte, 1> out;
+ auto read_at_end = input->ReadAt(3, static_cast<int64_t>(out.size()),
out.data());
+ auto read_past_end = input->ReadAt(4, static_cast<int64_t>(out.size()),
out.data());
+
+ ASSERT_TRUE(read_at_end.ok());
+ EXPECT_EQ(read_at_end.ValueOrDie(), 0);
+ EXPECT_FALSE(read_past_end.ok());
+ EXPECT_THAT(read_past_end.status().ToString(), ::testing::HasSubstr("out of
bounds"));
+}
+
+TEST(FileIOAdapterTest, InputAdapterUsesInputFileSizeWithLengthHint) {
+ auto state = std::make_shared<PermissiveReadState>();
+ state->data = "abc";
+ auto file_io = std::make_shared<PermissiveInputFileIO>(state);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto input, arrow::OpenArrowInputStream(file_io,
"input", 99));
+ auto size = input->GetSize();
+
+ ASSERT_TRUE(size.ok()) << size.status().ToString();
+ EXPECT_EQ(size.ValueOrDie(), 3);
+}
+
+TEST(FileIOAdapterTest, OutputAdapterRejectsWritesAfterClose) {
+ auto state = std::make_shared<PermissiveWriteState>();
+ auto file_io = std::make_shared<PermissiveOutputFileIO>(state);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto output, arrow::OpenArrowOutputStream(file_io,
"output"));
+ ASSERT_TRUE(output->Close().ok());
+ ASSERT_TRUE(output->Close().ok());
+ ASSERT_TRUE(state->closed);
+
+ auto status = output->Write("x", 1);
+ auto flush_status = output->Flush();
+
+ EXPECT_FALSE(status.ok());
+ EXPECT_THAT(status.ToString(), ::testing::HasSubstr("closed"));
+ EXPECT_FALSE(flush_status.ok());
+ EXPECT_THAT(flush_status.ToString(), ::testing::HasSubstr("closed"));
+ EXPECT_TRUE(state->data.empty());
+}
+
+TEST(FileIOTest, ReadFileReturnsReadErrorWithCloseContext) {
+ auto state = std::make_shared<CloseState>();
+ ReadFailureFileIO file_io(state);
+
+ auto result = file_io.ReadFile("read-failure", std::nullopt);
+
+ EXPECT_TRUE(state->closed);
+ EXPECT_THAT(result, IsError(ErrorKind::kIOError));
+ EXPECT_THAT(result, HasErrorMessage("read failed"));
+ EXPECT_THAT(result, HasErrorMessage("close failed"));
+}
+
+TEST(FileIOTest, WriteFileReturnsWriteErrorWithCloseContext) {
+ auto state = std::make_shared<CloseState>();
+ WriteFailureFileIO file_io(state);
+
+ auto result = file_io.WriteFile("write-failure", "data");
+
+ EXPECT_TRUE(state->closed);
+ EXPECT_THAT(result, IsError(ErrorKind::kIOError));
+ EXPECT_THAT(result, HasErrorMessage("write failed"));
+ EXPECT_THAT(result, HasErrorMessage("close failed"));
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/test/arrow_s3_file_io_test.cc
b/src/iceberg/test/arrow_s3_file_io_test.cc
index d890ad10..b1caff1e 100644
--- a/src/iceberg/test/arrow_s3_file_io_test.cc
+++ b/src/iceberg/test/arrow_s3_file_io_test.cc
@@ -26,7 +26,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/arrow/s3/s3_properties.h"
#include "iceberg/test/matchers.h"
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index 82da97ea..b74fe829 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -23,12 +23,13 @@
#include <arrow/array.h>
#include <arrow/array/array_base.h>
#include <arrow/c/bridge.h>
+#include <arrow/filesystem/filesystem.h>
#include <arrow/json/from_string.h>
#include <avro/DataFile.hh>
#include <avro/Generic.hh>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/avro/avro_writer.h"
@@ -37,16 +38,19 @@
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/test/matchers.h"
+#include "iceberg/test/std_io.h"
+#include "iceberg/test/temp_file_test_base.h"
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
namespace iceberg::avro {
-class AvroReaderTest : public ::testing::Test {
+class AvroReaderTest : public TempFileTestBase {
protected:
static void SetUpTestSuite() { RegisterAll(); }
void SetUp() override {
+ TempFileTestBase::SetUp();
file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
temp_avro_file_ = "avro_reader_test.avro";
}
@@ -187,6 +191,16 @@ TEST_F(AvroReaderTest, ReadTwoFields) {
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}
+TEST_F(AvroReaderTest, RoundTripWithGenericFileIO) {
+ file_io_ = std::make_shared<iceberg::test::StdFileIO>();
+ temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro");
+ auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())});
+
+ ASSERT_NO_FATAL_FAILURE(WriteAndVerify(schema, R"([[1, "Foo"], [2,
"Bar"]])"));
+}
+
TEST_F(AvroReaderTest, ReadReorderedFieldsWithNulls) {
CreateSimpleAvroFile();
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
diff --git a/src/iceberg/test/data_writer_test.cc
b/src/iceberg/test/data_writer_test.cc
index a3a8fc08..14b7bf62 100644
--- a/src/iceberg/test/data_writer_test.cc
+++ b/src/iceberg/test/data_writer_test.cc
@@ -25,7 +25,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/data/equality_delete_writer.h"
#include "iceberg/data/position_delete_writer.h"
diff --git a/src/iceberg/test/delete_file_index_test.cc
b/src/iceberg/test/delete_file_index_test.cc
index b99a2816..0c8c8821 100644
--- a/src/iceberg/test/delete_file_index_test.cc
+++ b/src/iceberg/test/delete_file_index_test.cc
@@ -29,7 +29,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
diff --git a/src/iceberg/test/delete_loader_test.cc
b/src/iceberg/test/delete_loader_test.cc
index 6dcd564b..c365b8ba 100644
--- a/src/iceberg/test/delete_loader_test.cc
+++ b/src/iceberg/test/delete_loader_test.cc
@@ -25,7 +25,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/data/equality_delete_writer.h"
#include "iceberg/data/position_delete_writer.h"
#include "iceberg/deletes/position_delete_index.h"
diff --git a/src/iceberg/test/file_scan_task_test.cc
b/src/iceberg/test/file_scan_task_test.cc
index ba0c41b3..55bc6a11 100644
--- a/src/iceberg/test/file_scan_task_test.cc
+++ b/src/iceberg/test/file_scan_task_test.cc
@@ -27,7 +27,7 @@
#include <parquet/arrow/writer.h>
#include <parquet/metadata.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/parquet/parquet_register.h"
@@ -36,7 +36,6 @@
#include "iceberg/test/matchers.h"
#include "iceberg/test/temp_file_test_base.h"
#include "iceberg/type.h"
-#include "iceberg/util/checked_cast.h"
namespace iceberg {
@@ -68,12 +67,14 @@ class FileScanTaskTest : public TempFileTestBase {
.ValueOrDie()})
.ValueOrDie();
- auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
- auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
+ ICEBERG_UNWRAP_OR_FAIL(auto outfile,
+ arrow::OpenArrowOutputStream(file_io_,
temp_parquet_file_));
ASSERT_TRUE(::parquet::arrow::WriteTable(*table,
::arrow::default_memory_pool(),
outfile, chunk_size)
.ok());
+ ASSERT_TRUE(outfile->Close().ok());
+ RefreshParquetFileSize();
}
// Helper to create a valid but empty Parquet file.
@@ -84,11 +85,28 @@ class FileScanTaskTest : public TempFileTestBase {
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey},
{"1"}))});
auto empty_table = ::arrow::Table::FromRecordBatches(arrow_schema,
{}).ValueOrDie();
- auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
- auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
+ ICEBERG_UNWRAP_OR_FAIL(auto outfile,
+ arrow::OpenArrowOutputStream(file_io_,
temp_parquet_file_));
ASSERT_TRUE(::parquet::arrow::WriteTable(*empty_table,
::arrow::default_memory_pool(),
outfile, 1024)
.ok());
+ ASSERT_TRUE(outfile->Close().ok());
+ RefreshParquetFileSize();
+ }
+
+ void RefreshParquetFileSize() {
+ ICEBERG_UNWRAP_OR_FAIL(auto input_file,
file_io_->NewInputFile(temp_parquet_file_));
+ ICEBERG_UNWRAP_OR_FAIL(auto size, input_file->Size());
+ ASSERT_GT(size, 0);
+ parquet_file_size_ = size;
+ }
+
+ std::shared_ptr<DataFile> MakeDataFile() const {
+ auto data_file = std::make_shared<DataFile>();
+ data_file->file_path = temp_parquet_file_;
+ data_file->file_format = FileFormatType::kParquet;
+ data_file->file_size_in_bytes = parquet_file_size_;
+ return data_file;
}
// Helper method to verify the content of the next batch from an
ArrowArrayStream.
@@ -124,12 +142,11 @@ class FileScanTaskTest : public TempFileTestBase {
std::shared_ptr<FileIO> file_io_;
std::string temp_parquet_file_;
+ int64_t parquet_file_size_ = 0;
};
TEST_F(FileScanTaskTest, ReadFullSchema) {
- auto data_file = std::make_shared<DataFile>();
- data_file->file_path = temp_parquet_file_;
- data_file->file_format = FileFormatType::kParquet;
+ auto data_file = MakeDataFile();
auto projected_schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
@@ -146,9 +163,7 @@ TEST_F(FileScanTaskTest, ReadFullSchema) {
}
TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
- auto data_file = std::make_shared<DataFile>();
- data_file->file_path = temp_parquet_file_;
- data_file->file_format = FileFormatType::kParquet;
+ auto data_file = MakeDataFile();
auto projected_schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeOptional(2, "name", string()),
@@ -166,9 +181,7 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
TEST_F(FileScanTaskTest, ReadEmptyFile) {
CreateEmptyParquetFile();
- auto data_file = std::make_shared<DataFile>();
- data_file->file_path = temp_parquet_file_;
- data_file->file_format = FileFormatType::kParquet;
+ auto data_file = MakeDataFile();
auto projected_schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
diff --git a/src/iceberg/test/gzip_decompress_test.cc
b/src/iceberg/test/gzip_decompress_test.cc
index 3415c46b..b855e4a0 100644
--- a/src/iceberg/test/gzip_decompress_test.cc
+++ b/src/iceberg/test/gzip_decompress_test.cc
@@ -23,7 +23,7 @@
#include <arrow/util/compression.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/file_io.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/temp_file_test_base.h"
@@ -69,7 +69,11 @@ TEST_F(GZipTest, GZipDecompressedString) {
ASSERT_TRUE(compressed_stream->Flush().ok());
ASSERT_TRUE(compressed_stream->Close().ok());
- auto result = io_->ReadFile(temp_filepath_, test_string.size());
+ ICEBERG_UNWRAP_OR_FAIL(auto input_file, io_->NewInputFile(temp_filepath_));
+ ICEBERG_UNWRAP_OR_FAIL(auto compressed_size, input_file->Size());
+ ASSERT_GE(compressed_size, 0);
+
+ auto result = io_->ReadFile(temp_filepath_,
static_cast<size_t>(compressed_size));
EXPECT_THAT(result, IsOk());
auto gzip_decompressor = std::make_unique<GZipDecompressor>();
diff --git a/src/iceberg/test/in_memory_catalog_test.cc
b/src/iceberg/test/in_memory_catalog_test.cc
index 78f67dda..1a65098e 100644
--- a/src/iceberg/test/in_memory_catalog_test.cc
+++ b/src/iceberg/test/in_memory_catalog_test.cc
@@ -27,7 +27,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
diff --git a/src/iceberg/test/manifest_group_test.cc
b/src/iceberg/test/manifest_group_test.cc
index 34ff9993..017f9803 100644
--- a/src/iceberg/test/manifest_group_test.cc
+++ b/src/iceberg/test/manifest_group_test.cc
@@ -29,7 +29,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/expression/expressions.h"
#include "iceberg/manifest/manifest_entry.h"
diff --git a/src/iceberg/test/manifest_list_versions_test.cc
b/src/iceberg/test/manifest_list_versions_test.cc
index 9c16a02e..b173d56e 100644
--- a/src/iceberg/test/manifest_list_versions_test.cc
+++ b/src/iceberg/test/manifest_list_versions_test.cc
@@ -25,7 +25,7 @@
#include <arrow/record_batch.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/file_reader.h"
#include "iceberg/file_writer.h"
diff --git a/src/iceberg/test/manifest_reader_stats_test.cc
b/src/iceberg/test/manifest_reader_stats_test.cc
index a94dca12..6c0e005b 100644
--- a/src/iceberg/test/manifest_reader_stats_test.cc
+++ b/src/iceberg/test/manifest_reader_stats_test.cc
@@ -25,7 +25,7 @@
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/expression/expressions.h"
#include "iceberg/manifest/manifest_entry.h"
diff --git a/src/iceberg/test/manifest_reader_test.cc
b/src/iceberg/test/manifest_reader_test.cc
index 3e93f6ff..3f85729a 100644
--- a/src/iceberg/test/manifest_reader_test.cc
+++ b/src/iceberg/test/manifest_reader_test.cc
@@ -28,7 +28,7 @@
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/expression/expressions.h"
#include "iceberg/manifest/manifest_entry.h"
diff --git a/src/iceberg/test/manifest_writer_versions_test.cc
b/src/iceberg/test/manifest_writer_versions_test.cc
index fc61980a..99022452 100644
--- a/src/iceberg/test/manifest_writer_versions_test.cc
+++ b/src/iceberg/test/manifest_writer_versions_test.cc
@@ -25,7 +25,7 @@
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/constants.h"
#include "iceberg/file_format.h"
diff --git a/src/iceberg/test/metadata_io_test.cc
b/src/iceberg/test/metadata_io_test.cc
index 72a23ecb..c780dc18 100644
--- a/src/iceberg/test/metadata_io_test.cc
+++ b/src/iceberg/test/metadata_io_test.cc
@@ -27,7 +27,7 @@
#include <gtest/gtest.h>
#include <nlohmann/json.hpp>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/file_io.h"
#include "iceberg/json_serde_internal.h"
#include "iceberg/partition_spec.h"
diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc
index 0d983db5..65a4602d 100644
--- a/src/iceberg/test/parquet_test.cc
+++ b/src/iceberg/test/parquet_test.cc
@@ -21,6 +21,7 @@
#include <arrow/array.h>
#include <arrow/c/bridge.h>
+#include <arrow/filesystem/filesystem.h>
#include <arrow/json/from_string.h>
#include <arrow/record_batch.h>
#include <arrow/table.h>
@@ -30,7 +31,7 @@
#include <parquet/arrow/writer.h>
#include <parquet/metadata.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/file_reader.h"
#include "iceberg/file_writer.h"
@@ -41,6 +42,8 @@
#include "iceberg/schema_field.h"
#include "iceberg/schema_internal.h"
#include "iceberg/test/matchers.h"
+#include "iceberg/test/std_io.h"
+#include "iceberg/test/temp_file_test_base.h"
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
@@ -123,11 +126,12 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data,
std::shared_ptr<Schema> s
} // namespace
-class ParquetReaderTest : public ::testing::Test {
+class ParquetReaderTest : public TempFileTestBase {
protected:
static void SetUpTestSuite() { parquet::RegisterAll(); }
void SetUp() override {
+ TempFileTestBase::SetUp();
file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
temp_parquet_file_ = "parquet_reader_test.parquet";
}
@@ -232,6 +236,42 @@ TEST_F(ParquetReaderTest, ReadTwoFields) {
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}
+TEST_F(ParquetReaderTest, RoundTripWithGenericFileIO) {
+ auto file_io = std::make_shared<iceberg::test::StdFileIO>();
+ auto path = CreateNewTempFilePathWithSuffix(".parquet");
+
+ auto schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name",
string())});
+ ArrowSchema arrow_c_schema;
+ ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
+ auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+ auto array =
+
::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()),
+ R"([[1, "Foo"], [2, "Bar"]])")
+ .ValueOrDie();
+
+ WriterProperties writer_properties;
+ writer_properties.Set(WriterProperties::kParquetCompression,
+ std::string("uncompressed"));
+ auto writer_result = WriterFactoryRegistry::Open(
+ FileFormatType::kParquet, {.path = path,
+ .schema = schema,
+ .io = file_io,
+ .properties = std::move(writer_properties)});
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+ ASSERT_THAT(WriteArray(array, *writer), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto length, writer->length());
+
+ std::shared_ptr<::arrow::Array> out;
+ auto read_status = ReadArray(
+ out, {.path = path, .length = length, .io = file_io, .projection =
schema},
+ nullptr);
+ ASSERT_THAT(read_status, IsOk());
+ ASSERT_TRUE(out->Equals(*array));
+}
+
TEST_F(ParquetReaderTest, ReadReorderedFieldsWithNulls) {
CreateSimpleParquetFile();
diff --git a/src/iceberg/test/rolling_manifest_writer_test.cc
b/src/iceberg/test/rolling_manifest_writer_test.cc
index b996eb16..2eae5a15 100644
--- a/src/iceberg/test/rolling_manifest_writer_test.cc
+++ b/src/iceberg/test/rolling_manifest_writer_test.cc
@@ -28,7 +28,7 @@
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest/manifest_entry.h"
diff --git a/src/iceberg/test/scan_test_base.h
b/src/iceberg/test/scan_test_base.h
index 1a2f5648..5bd1222e 100644
--- a/src/iceberg/test/scan_test_base.h
+++ b/src/iceberg/test/scan_test_base.h
@@ -30,7 +30,7 @@
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
diff --git a/src/iceberg/test/std_io.h b/src/iceberg/test/std_io.h
index 3b58267d..3866bddf 100644
--- a/src/iceberg/test/std_io.h
+++ b/src/iceberg/test/std_io.h
@@ -19,78 +19,297 @@
#pragma once
+#include <cstddef>
+#include <cstdint>
#include <filesystem>
#include <fstream>
+#include <ios>
+#include <limits>
+#include <memory>
#include <optional>
-#include <sstream>
+#include <span>
#include <string>
#include <string_view>
+#include <system_error>
+#include <utility>
#include "iceberg/file_io.h"
#include "iceberg/result.h"
+#include "iceberg/util/macros.h"
namespace iceberg::test {
-/// \brief Simple local filesystem FileIO implementation for testing
-///
-/// This class provides a basic FileIO implementation that reads and writes
-/// files to the local filesystem using standard C++ file streams.
-class StdFileIO : public FileIO {
+namespace detail {
+
+inline Result<std::streamsize> ToStreamSize(size_t size) {
+ if (size > static_cast<size_t>(std::numeric_limits<std::streamsize>::max()))
{
+ return InvalidArgument("Buffer size {} exceeds streamsize max", size);
+ }
+ return static_cast<std::streamsize>(size);
+}
+
+inline Result<int64_t> ToInt64FileSize(uintmax_t size, std::string_view
location) {
+ if (size > static_cast<uintmax_t>(std::numeric_limits<int64_t>::max())) {
+ return Invalid("File size for {} exceeds int64_t max", location);
+ }
+ return static_cast<int64_t>(size);
+}
+
+} // namespace detail
+
+class StdSeekableInputStream : public SeekableInputStream {
public:
- Result<std::string> ReadFile(const std::string& file_location,
- std::optional<size_t> length) override {
- std::ifstream file(file_location, std::ios::binary);
- if (!file.is_open()) {
- return IOError("Failed to open file for reading: {}", file_location);
- }
-
- if (length.has_value()) {
- std::string content(length.value(), '\0');
- file.read(content.data(), length.value());
- if (!file) {
- return IOError("Failed to read {} bytes from file: {}", length.value(),
- file_location);
+ explicit StdSeekableInputStream(std::string location)
+ : location_(std::move(location)), file_(location_, std::ios::binary) {}
+
+ bool is_open() const { return file_.is_open(); }
+
+ Result<int64_t> Position() const override {
+ auto position = file_.tellg();
+ if (position < 0) {
+ return IOError("Failed to get read position for: {}", location_);
+ }
+ return static_cast<int64_t>(position);
+ }
+
+ Status Seek(int64_t position) override {
+ if (position < 0) {
+ return InvalidArgument("Cannot seek to negative position {}", position);
+ }
+ file_.clear();
+ file_.seekg(position);
+ if (!file_) {
+ return IOError("Failed to seek to {} in file: {}", position, location_);
+ }
+ return {};
+ }
+
+ Result<int64_t> Read(std::span<std::byte> out) override {
+ if (out.empty()) {
+ return 0;
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto read_size, detail::ToStreamSize(out.size()));
+ file_.read(reinterpret_cast<char*>(out.data()), read_size);
+ auto bytes_read = file_.gcount();
+ if (!file_) {
+ if (file_.bad() || !file_.eof()) {
+ return IOError("Failed to read from file: {}", location_);
}
- return content;
- } else {
- std::stringstream buffer;
- buffer << file.rdbuf();
- if (!file && !file.eof()) {
- return IOError("Failed to read file: {}", file_location);
+ file_.clear();
+ }
+ if (bytes_read < 0) {
+ return IOError("Failed to read from file: {}", location_);
+ }
+ return static_cast<int64_t>(bytes_read);
+ }
+
+ Status ReadFully(int64_t position, std::span<std::byte> out) override {
+ if (position < 0) {
+ return InvalidArgument("Cannot read from negative position {}",
position);
+ }
+ if (out.empty()) {
+ return {};
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto original_position, Position());
+ auto seek_status = Seek(position);
+ if (!seek_status.has_value()) {
+ static_cast<void>(Seek(original_position));
+ return seek_status;
+ }
+
+ Status read_status = {};
+ size_t total_read = 0;
+ while (total_read < out.size()) {
+ auto read_result = Read(out.subspan(total_read));
+ if (!read_result.has_value()) {
+ read_status = std::unexpected<Error>(read_result.error());
+ break;
+ }
+ if (read_result.value() == 0) {
+ read_status =
+ IOError("Failed to read {} bytes from file: {}", out.size(),
location_);
+ break;
}
- return buffer.str();
+ total_read += static_cast<size_t>(read_result.value());
+ }
+
+ auto restore_status = Seek(original_position);
+ ICEBERG_RETURN_UNEXPECTED(read_status);
+ return restore_status;
+ }
+
+ Status Close() override {
+ if (!file_.is_open()) {
+ return {};
+ }
+ file_.close();
+ if (!file_) {
+ return IOError("Failed to close file: {}", location_);
+ }
+ return {};
+ }
+
+ private:
+ std::string location_;
+ mutable std::ifstream file_;
+};
+
+class StdPositionOutputStream : public PositionOutputStream {
+ public:
+ explicit StdPositionOutputStream(std::string location)
+ : location_(std::move(location)),
+ file_(location_, std::ios::binary | std::ios::out | std::ios::trunc) {}
+
+ bool is_open() const { return file_.is_open(); }
+
+ Result<int64_t> Position() const override {
+ auto position = file_.tellp();
+ if (position < 0) {
+ return IOError("Failed to get write position for: {}", location_);
+ }
+ return static_cast<int64_t>(position);
+ }
+
+ Status Write(std::span<const std::byte> data) override {
+ if (data.empty()) {
+ return {};
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto write_size,
detail::ToStreamSize(data.size()));
+ file_.write(reinterpret_cast<const char*>(data.data()), write_size);
+ if (!file_) {
+ return IOError("Failed to write to file: {}", location_);
}
+ return {};
}
- Status WriteFile(const std::string& file_location, std::string_view content)
override {
- // Create parent directories if they don't exist
- std::filesystem::path path(file_location);
+ Status Flush() override {
+ file_.flush();
+ if (!file_) {
+ return IOError("Failed to flush file: {}", location_);
+ }
+ return {};
+ }
+
+ Status Close() override {
+ if (!file_.is_open()) {
+ return {};
+ }
+ file_.close();
+ if (!file_) {
+ return IOError("Failed to close file: {}", location_);
+ }
+ return {};
+ }
+
+ private:
+ std::string location_;
+ mutable std::ofstream file_;
+};
+
+class StdInputFile : public InputFile {
+ public:
+ explicit StdInputFile(std::string location,
+ std::optional<int64_t> file_size = std::nullopt)
+ : location_(std::move(location)), file_size_(file_size) {}
+
+ std::string_view location() const override { return location_; }
+
+ Result<int64_t> Size() const override {
+ if (file_size_.has_value()) {
+ return *file_size_;
+ }
+ std::error_code ec;
+ auto size = std::filesystem::file_size(location_, ec);
+ if (ec) {
+ return IOError("Failed to get file size for {}: {}", location_,
ec.message());
+ }
+ return detail::ToInt64FileSize(size, location_);
+ }
+
+ Result<std::unique_ptr<SeekableInputStream>> Open() override {
+ auto stream = std::make_unique<StdSeekableInputStream>(location_);
+ if (!stream->is_open()) {
+ return IOError("Failed to open file for reading: {}", location_);
+ }
+ return stream;
+ }
+
+ private:
+ std::string location_;
+ std::optional<int64_t> file_size_;
+};
+
+class StdOutputFile : public OutputFile {
+ public:
+ explicit StdOutputFile(std::string location) :
location_(std::move(location)) {}
+
+ std::string_view location() const override { return location_; }
+
+ Result<std::unique_ptr<PositionOutputStream>> Create() override {
+ return Create(/*overwrite=*/false);
+ }
+
+ Result<std::unique_ptr<PositionOutputStream>> CreateOrOverwrite() override {
+ return Create(/*overwrite=*/true);
+ }
+
+ private:
+ Result<std::unique_ptr<PositionOutputStream>> Create(bool overwrite) {
+ std::filesystem::path path(location_);
+ std::error_code ec;
+ auto exists = std::filesystem::exists(path, ec);
+ if (ec) {
+ return IOError("Failed to check file existence for {}: {}", location_,
+ ec.message());
+ }
+ if (!overwrite && exists) {
+ return AlreadyExists("File already exists: {}", location_);
+ }
if (path.has_parent_path()) {
- std::error_code ec;
std::filesystem::create_directories(path.parent_path(), ec);
if (ec) {
- return IOError("Failed to create parent directories for: {}",
file_location);
+ return IOError("Failed to create parent directories for {}: {}",
location_,
+ ec.message());
}
}
-
- std::ofstream file(file_location, std::ios::binary);
- if (!file.is_open()) {
- return IOError("Failed to open file for writing: {}", file_location);
+ auto stream = std::make_unique<StdPositionOutputStream>(location_);
+ if (!stream->is_open()) {
+ return IOError("Failed to open file for writing: {}", location_);
}
+ return stream;
+ }
- file.write(content.data(), content.size());
- if (!file) {
- return IOError("Failed to write to file: {}", file_location);
+ std::string location_;
+};
+
+/// \brief Simple local filesystem FileIO implementation for testing
+///
+/// This class provides a basic FileIO implementation that reads and writes
+/// files to the local filesystem using standard C++ file streams.
+class StdFileIO : public FileIO {
+ public:
+ Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location)
override {
+ return std::make_unique<StdInputFile>(std::move(file_location));
+ }
+
+ Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location,
+ size_t length) override {
+ if (length > static_cast<size_t>(std::numeric_limits<int64_t>::max())) {
+ return InvalidArgument("File length {} exceeds int64_t max", length);
}
+ return std::make_unique<StdInputFile>(std::move(file_location),
+ static_cast<int64_t>(length));
+ }
- return {};
+ Result<std::unique_ptr<OutputFile>> NewOutputFile(std::string file_location)
override {
+ return std::make_unique<StdOutputFile>(std::move(file_location));
}
Status DeleteFile(const std::string& file_location) override {
std::error_code ec;
if (!std::filesystem::remove(file_location, ec)) {
if (ec) {
- return IOError("Failed to delete file: {}", file_location);
+ return IOError("Failed to delete file {}: {}", file_location,
ec.message());
}
return IOError("File does not exist: {}", file_location);
}
diff --git a/src/iceberg/test/update_location_test.cc
b/src/iceberg/test/update_location_test.cc
index 53b347b5..a208209b 100644
--- a/src/iceberg/test/update_location_test.cc
+++ b/src/iceberg/test/update_location_test.cc
@@ -26,7 +26,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/result.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/update_test_base.h"
diff --git a/src/iceberg/test/update_partition_spec_test.cc
b/src/iceberg/test/update_partition_spec_test.cc
index 632c4a55..e310c0d1 100644
--- a/src/iceberg/test/update_partition_spec_test.cc
+++ b/src/iceberg/test/update_partition_spec_test.cc
@@ -28,7 +28,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/catalog/memory/in_memory_catalog.h"
#include "iceberg/expression/expressions.h"
#include "iceberg/partition_spec.h"
diff --git a/src/iceberg/test/update_test_base.h
b/src/iceberg/test/update_test_base.h
index 310feb37..b0ad3d20 100644
--- a/src/iceberg/test/update_test_base.h
+++ b/src/iceberg/test/update_test_base.h
@@ -26,7 +26,7 @@
#include <arrow/filesystem/mockfs.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/catalog/memory/in_memory_catalog.h"
#include "iceberg/result.h"
#include "iceberg/snapshot.h"