This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new d54d079 feat: add file_io and local impl by adapting
arrow::filesystem (#30)
d54d079 is described below
commit d54d079c8fef95b86fdf5fe42500e8d450ec9430
Author: Junwang Zhao <[email protected]>
AuthorDate: Wed Apr 2 03:22:56 2025 +0800
feat: add file_io and local impl by adapting arrow::filesystem (#30)
This PR add file io interface and arrow local filesystem implementation.
FileIO is a pluggable interface for reading, writing, and deleting
metadata files, not for data files.
---------
Signed-off-by: Junwang Zhao <[email protected]>
---
cmake_modules/IcebergThirdpartyToolchain.cmake | 2 +-
src/iceberg/CMakeLists.txt | 3 +-
src/iceberg/arrow/arrow_error_transform_internal.h | 60 ++++++++++++++++
src/iceberg/arrow/arrow_fs_file_io.cc | 71 ++++++++++++++++++
src/iceberg/arrow/arrow_fs_file_io.h | 54 ++++++++++++++
src/iceberg/demo.cc | 1 +
src/iceberg/error.h | 3 +
src/iceberg/file_io.h | 83 ++++++++++++++++++++++
test/CMakeLists.txt | 6 +-
test/arrow_fs_file_io_test.cc | 65 +++++++++++++++++
10 files changed, 343 insertions(+), 5 deletions(-)
diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake
b/cmake_modules/IcebergThirdpartyToolchain.cmake
index e361f1e..783d99c 100644
--- a/cmake_modules/IcebergThirdpartyToolchain.cmake
+++ b/cmake_modules/IcebergThirdpartyToolchain.cmake
@@ -70,7 +70,7 @@ function(resolve_arrow_dependency)
ON
CACHE BOOL "" FORCE)
set(ARROW_FILESYSTEM
- OFF
+ ON
CACHE BOOL "" FORCE)
set(ARROW_SIMD_LEVEL
"NONE"
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index ac33be2..fec8952 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -60,7 +60,8 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)
if(ICEBERG_BUILD_BUNDLE)
- set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc avro/demo_avro.cc)
+ set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc
+ avro/demo_avro.cc)
# Libraries to link with exported libiceberg_bundle.{so,a}.
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h
b/src/iceberg/arrow/arrow_error_transform_internal.h
new file mode 100644
index 0000000..588bab5
--- /dev/null
+++ b/src/iceberg/arrow/arrow_error_transform_internal.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include "iceberg/error.h"
+#include "iceberg/expected.h"
+
+namespace iceberg::arrow {
+
+inline ErrorKind ToErrorKind(const ::arrow::Status& status) {
+ switch (status.code()) {
+ case ::arrow::StatusCode::IOError:
+ return ErrorKind::kIOError;
+ default:
+ return ErrorKind::kUnknownError;
+ }
+}
+
+#define ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr,
error_transform) \
+ auto&& result_name = (rexpr);
\
+ if (!result_name.ok()) {
\
+ return unexpected<Error>{{.kind = error_transform(result_name.status()),
\
+ .message = result_name.status().ToString()}};
\
+ }
\
+ lhs = std::move(result_name).ValueOrDie();
+
+#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \
+ ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \
+ ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr,
ToErrorKind)
+
+#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \
+ do { \
+ auto&& _status = (expr); \
+ if (!_status.ok()) { \
+ return unexpected<Error>{ \
+ {.kind = ToErrorKind(_status), .message = _status.ToString()}}; \
+ } \
+ } while (0)
+
+} // namespace iceberg::arrow
diff --git a/src/iceberg/arrow/arrow_fs_file_io.cc
b/src/iceberg/arrow/arrow_fs_file_io.cc
new file mode 100644
index 0000000..270ecb7
--- /dev/null
+++ b/src/iceberg/arrow/arrow_fs_file_io.cc
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/arrow/arrow_fs_file_io.h"
+
+#include <arrow/filesystem/localfs.h>
+
+#include "iceberg/arrow/arrow_error_transform_internal.h"
+
+namespace iceberg::arrow {
+
+/// \brief Read the content of the file at the given location.
+expected<std::string, Error> ArrowFileSystemFileIO::ReadFile(
+ const std::string& file_location, std::optional<size_t> length) {
+ ::arrow::fs::FileInfo file_info(file_location);
+ if (length.has_value()) {
+ file_info.set_size(length.value());
+ }
+ std::string content;
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file,
arrow_fs_->OpenInputFile(file_info));
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
+
+ content.resize(file_size);
+ size_t remain = file_size;
+ size_t offset = 0;
+ while (remain > 0) {
+ size_t read_length = std::min(remain, static_cast<size_t>(1024 * 1024));
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(
+ auto read_bytes,
+ file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
+ remain -= read_bytes;
+ offset += read_bytes;
+ }
+
+ return content;
+}
+
+/// \brief Write the given content to the file at the given location.
+expected<void, Error> ArrowFileSystemFileIO::WriteFile(const std::string&
file_location,
+ std::string_view
content) {
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file,
arrow_fs_->OpenOutputStream(file_location));
+ ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
+ ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
+ ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
+ return {};
+}
+
+/// \brief Delete a file at the given location.
+expected<void, Error> ArrowFileSystemFileIO::DeleteFile(
+ const std::string& file_location) {
+ ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location));
+ return {};
+}
+
+} // namespace iceberg::arrow
diff --git a/src/iceberg/arrow/arrow_fs_file_io.h
b/src/iceberg/arrow/arrow_fs_file_io.h
new file mode 100644
index 0000000..e79e75f
--- /dev/null
+++ b/src/iceberg/arrow/arrow_fs_file_io.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include "iceberg/file_io.h"
+#include "iceberg/iceberg_bundle_export.h"
+
+namespace iceberg::arrow {
+
+/// \brief A concrete implementation of FileIO for Arrow file system.
+class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
+ public:
+ explicit ArrowFileSystemFileIO(std::shared_ptr<::arrow::fs::FileSystem>
arrow_fs)
+ : arrow_fs_(std::move(arrow_fs)) {}
+
+ ~ArrowFileSystemFileIO() override = default;
+
+ /// \brief Read the content of the file at the given location.
+ expected<std::string, Error> ReadFile(const std::string& file_location,
+ std::optional<size_t> length) override;
+
+ /// \brief Write the given content to the file at the given location.
+ expected<void, Error> WriteFile(const std::string& file_location,
+ std::string_view content) override;
+
+ /// \brief Delete a file at the given location.
+ expected<void, Error> DeleteFile(const std::string& file_location) override;
+
+ private:
+ std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
+};
+
+} // namespace iceberg::arrow
diff --git a/src/iceberg/demo.cc b/src/iceberg/demo.cc
index a9c0a03..aa1835b 100644
--- a/src/iceberg/demo.cc
+++ b/src/iceberg/demo.cc
@@ -21,6 +21,7 @@
#include "iceberg/avro.h" // include to export symbols
#include "iceberg/catalog.h"
+#include "iceberg/file_io.h"
#include "iceberg/location_provider.h"
#include "iceberg/table.h"
#include "iceberg/transaction.h"
diff --git a/src/iceberg/error.h b/src/iceberg/error.h
index 60f07f3..a4b74a9 100644
--- a/src/iceberg/error.h
+++ b/src/iceberg/error.h
@@ -34,6 +34,9 @@ enum class ErrorKind {
kCommitStateUnknown,
kInvalidSchema,
kInvalidArgument,
+ kIOError,
+ kNotImplemented,
+ kUnknownError,
kNotSupported,
};
diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h
new file mode 100644
index 0000000..03922f6
--- /dev/null
+++ b/src/iceberg/file_io.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <string>
+#include <string_view>
+
+#include "iceberg/error.h"
+#include "iceberg/expected.h"
+#include "iceberg/iceberg_export.h"
+
+namespace iceberg {
+
+/// \brief Pluggable module for reading, writing, and deleting files.
+///
+/// This module only handle metadata files, not data files. The metadata files
+/// are typically small and are used to store schema, partition information,
+/// and other metadata about the table.
+///
+/// Note that these functions are not atomic. For example, if a write fails,
+/// the file may be partially written. Implementations should be careful to
+/// avoid corrupting metadata files.
+class ICEBERG_EXPORT FileIO {
+ public:
+ FileIO() = default;
+ virtual ~FileIO() = default;
+
+ /// \brief Read the content of the file at the given location.
+ ///
+ /// \param file_location The location of the file to read.
+ /// \param length The number of bytes to read. Some object storage need to
specify
+ /// the length to read, e.g. S3 `GetObject` has a Range parameter.
+ /// \return The content of the file if the read succeeded, an error code if
the read
+ /// failed.
+ virtual expected<std::string, Error> ReadFile(const std::string&
file_location,
+ std::optional<size_t> length) {
+ // We provide a default implementation to avoid Windows linker error
LNK2019.
+ return unexpected<Error>{
+ {.kind = ErrorKind::kNotImplemented, .message = "ReadFile not
implemented"}};
+ }
+
+ /// \brief Write the given content to the file at the given location.
+ ///
+ /// \param file_location The location of the file to write.
+ /// \param content The content to write to the file.
+ /// \param overwrite If true, overwrite the file if it exists. If false,
fail if the
+ /// file exists.
+ /// \return void if the write succeeded, an error code if the write failed.
+ virtual expected<void, Error> WriteFile(const std::string& file_location,
+ std::string_view content) {
+ return unexpected<Error>{
+ {.kind = ErrorKind::kNotImplemented, .message = "WriteFile not
implemented"}};
+ }
+
+ /// \brief Delete a file at the given location.
+ ///
+ /// \param file_location The location of the file to delete.
+ /// \return void if the delete succeeded, an error code if the delete failed.
+ virtual expected<void, Error> DeleteFile(const std::string& file_location) {
+ return unexpected<Error>{
+ {.kind = ErrorKind::kNotImplemented, .message = "DeleteFile not
implemented"}};
+ }
+};
+
+} // namespace iceberg
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 9849d43..96e3194 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -47,8 +47,8 @@ if(ICEBERG_BUILD_BUNDLE)
add_test(NAME avro_test COMMAND avro_test)
add_executable(arrow_test)
- target_sources(arrow_test PRIVATE arrow_test.cc)
- target_link_libraries(arrow_test PRIVATE iceberg_bundle_static
Arrow::arrow_static
- GTest::gtest_main GTest::gmock)
+ target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc)
+ target_link_libraries(arrow_test PRIVATE iceberg_bundle_static
GTest::gtest_main
+ GTest::gmock)
add_test(NAME arrow_test COMMAND arrow_test)
endif()
diff --git a/test/arrow_fs_file_io_test.cc b/test/arrow_fs_file_io_test.cc
new file mode 100644
index 0000000..193c13d
--- /dev/null
+++ b/test/arrow_fs_file_io_test.cc
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/arrow/arrow_fs_file_io.h"
+
+#include <filesystem>
+
+#include <arrow/filesystem/localfs.h>
+#include <gtest/gtest.h>
+
+#include "matchers.h"
+
+namespace iceberg {
+
+class LocalFileIOTest : public testing::Test {
+ protected:
+ void SetUp() override {
+ local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
+ file_io_ =
std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
+ }
+
+ std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
+ std::shared_ptr<iceberg::FileIO> file_io_;
+ std::filesystem::path tmpfile = std::filesystem::temp_directory_path() /
"123.txt";
+};
+
+TEST_F(LocalFileIOTest, ReadWriteFile) {
+ auto read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
+ EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
+ EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file"));
+
+ auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world");
+ EXPECT_THAT(write_res, IsOk());
+
+ read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
+ EXPECT_THAT(read_res, IsOk());
+ EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
+}
+
+TEST_F(LocalFileIOTest, DeleteFile) {
+ auto del_res = file_io_->DeleteFile(tmpfile.string());
+ EXPECT_THAT(del_res, IsOk());
+
+ del_res = file_io_->DeleteFile(tmpfile.string());
+ EXPECT_THAT(del_res, IsError(ErrorKind::kIOError));
+ EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
+}
+
+} // namespace iceberg