This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new d54d079  feat: add file_io and local impl by adapting 
arrow::filesystem (#30)
d54d079 is described below

commit d54d079c8fef95b86fdf5fe42500e8d450ec9430
Author: Junwang Zhao <[email protected]>
AuthorDate: Wed Apr 2 03:22:56 2025 +0800

    feat: add file_io and local impl by adapting arrow::filesystem (#30)
    
    This PR add file io interface and arrow local filesystem implementation.
    
    FileIO is a pluggable interface for reading, writing, and deleting
    metadata files, not for data files.
    
    ---------
    
    Signed-off-by: Junwang Zhao <[email protected]>
---
 cmake_modules/IcebergThirdpartyToolchain.cmake     |  2 +-
 src/iceberg/CMakeLists.txt                         |  3 +-
 src/iceberg/arrow/arrow_error_transform_internal.h | 60 ++++++++++++++++
 src/iceberg/arrow/arrow_fs_file_io.cc              | 71 ++++++++++++++++++
 src/iceberg/arrow/arrow_fs_file_io.h               | 54 ++++++++++++++
 src/iceberg/demo.cc                                |  1 +
 src/iceberg/error.h                                |  3 +
 src/iceberg/file_io.h                              | 83 ++++++++++++++++++++++
 test/CMakeLists.txt                                |  6 +-
 test/arrow_fs_file_io_test.cc                      | 65 +++++++++++++++++
 10 files changed, 343 insertions(+), 5 deletions(-)

diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake 
b/cmake_modules/IcebergThirdpartyToolchain.cmake
index e361f1e..783d99c 100644
--- a/cmake_modules/IcebergThirdpartyToolchain.cmake
+++ b/cmake_modules/IcebergThirdpartyToolchain.cmake
@@ -70,7 +70,7 @@ function(resolve_arrow_dependency)
       ON
       CACHE BOOL "" FORCE)
   set(ARROW_FILESYSTEM
-      OFF
+      ON
       CACHE BOOL "" FORCE)
   set(ARROW_SIMD_LEVEL
       "NONE"
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index ac33be2..fec8952 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -60,7 +60,8 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
         DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)
 
 if(ICEBERG_BUILD_BUNDLE)
-  set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc avro/demo_avro.cc)
+  set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc
+                             avro/demo_avro.cc)
 
   # Libraries to link with exported libiceberg_bundle.{so,a}.
   set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h 
b/src/iceberg/arrow/arrow_error_transform_internal.h
new file mode 100644
index 0000000..588bab5
--- /dev/null
+++ b/src/iceberg/arrow/arrow_error_transform_internal.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include "iceberg/error.h"
+#include "iceberg/expected.h"
+
+namespace iceberg::arrow {
+
+inline ErrorKind ToErrorKind(const ::arrow::Status& status) {
+  switch (status.code()) {
+    case ::arrow::StatusCode::IOError:
+      return ErrorKind::kIOError;
+    default:
+      return ErrorKind::kUnknownError;
+  }
+}
+
+#define ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, 
error_transform) \
+  auto&& result_name = (rexpr);                                                
       \
+  if (!result_name.ok()) {                                                     
       \
+    return unexpected<Error>{{.kind = error_transform(result_name.status()),   
       \
+                              .message = result_name.status().ToString()}};    
       \
+  }                                                                            
       \
+  lhs = std::move(result_name).ValueOrDie();
+
+#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \
+  ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL(             \
+      ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, 
ToErrorKind)
+
+#define ICEBERG_ARROW_RETURN_NOT_OK(expr)                                 \
+  do {                                                                    \
+    auto&& _status = (expr);                                              \
+    if (!_status.ok()) {                                                  \
+      return unexpected<Error>{                                           \
+          {.kind = ToErrorKind(_status), .message = _status.ToString()}}; \
+    }                                                                     \
+  } while (0)
+
+}  // namespace iceberg::arrow
diff --git a/src/iceberg/arrow/arrow_fs_file_io.cc 
b/src/iceberg/arrow/arrow_fs_file_io.cc
new file mode 100644
index 0000000..270ecb7
--- /dev/null
+++ b/src/iceberg/arrow/arrow_fs_file_io.cc
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/arrow/arrow_fs_file_io.h"
+
+#include <arrow/filesystem/localfs.h>
+
+#include "iceberg/arrow/arrow_error_transform_internal.h"
+
+namespace iceberg::arrow {
+
+/// \brief Read the content of the file at the given location.
+expected<std::string, Error> ArrowFileSystemFileIO::ReadFile(
+    const std::string& file_location, std::optional<size_t> length) {
+  ::arrow::fs::FileInfo file_info(file_location);
+  if (length.has_value()) {
+    file_info.set_size(length.value());
+  }
+  std::string content;
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, 
arrow_fs_->OpenInputFile(file_info));
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
+
+  content.resize(file_size);
+  size_t remain = file_size;
+  size_t offset = 0;
+  while (remain > 0) {
+    size_t read_length = std::min(remain, static_cast<size_t>(1024 * 1024));
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(
+        auto read_bytes,
+        file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
+    remain -= read_bytes;
+    offset += read_bytes;
+  }
+
+  return content;
+}
+
+/// \brief Write the given content to the file at the given location.
+expected<void, Error> ArrowFileSystemFileIO::WriteFile(const std::string& 
file_location,
+                                                       std::string_view 
content) {
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, 
arrow_fs_->OpenOutputStream(file_location));
+  ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
+  ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
+  ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
+  return {};
+}
+
+/// \brief Delete a file at the given location.
+expected<void, Error> ArrowFileSystemFileIO::DeleteFile(
+    const std::string& file_location) {
+  ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location));
+  return {};
+}
+
+}  // namespace iceberg::arrow
diff --git a/src/iceberg/arrow/arrow_fs_file_io.h 
b/src/iceberg/arrow/arrow_fs_file_io.h
new file mode 100644
index 0000000..e79e75f
--- /dev/null
+++ b/src/iceberg/arrow/arrow_fs_file_io.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include "iceberg/file_io.h"
+#include "iceberg/iceberg_bundle_export.h"
+
+namespace iceberg::arrow {
+
+/// \brief A concrete implementation of FileIO for Arrow file system.
+class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
+ public:
+  explicit ArrowFileSystemFileIO(std::shared_ptr<::arrow::fs::FileSystem> 
arrow_fs)
+      : arrow_fs_(std::move(arrow_fs)) {}
+
+  ~ArrowFileSystemFileIO() override = default;
+
+  /// \brief Read the content of the file at the given location.
+  expected<std::string, Error> ReadFile(const std::string& file_location,
+                                        std::optional<size_t> length) override;
+
+  /// \brief Write the given content to the file at the given location.
+  expected<void, Error> WriteFile(const std::string& file_location,
+                                  std::string_view content) override;
+
+  /// \brief Delete a file at the given location.
+  expected<void, Error> DeleteFile(const std::string& file_location) override;
+
+ private:
+  std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
+};
+
+}  // namespace iceberg::arrow
diff --git a/src/iceberg/demo.cc b/src/iceberg/demo.cc
index a9c0a03..aa1835b 100644
--- a/src/iceberg/demo.cc
+++ b/src/iceberg/demo.cc
@@ -21,6 +21,7 @@
 
 #include "iceberg/avro.h"  // include to export symbols
 #include "iceberg/catalog.h"
+#include "iceberg/file_io.h"
 #include "iceberg/location_provider.h"
 #include "iceberg/table.h"
 #include "iceberg/transaction.h"
diff --git a/src/iceberg/error.h b/src/iceberg/error.h
index 60f07f3..a4b74a9 100644
--- a/src/iceberg/error.h
+++ b/src/iceberg/error.h
@@ -34,6 +34,9 @@ enum class ErrorKind {
   kCommitStateUnknown,
   kInvalidSchema,
   kInvalidArgument,
+  kIOError,
+  kNotImplemented,
+  kUnknownError,
   kNotSupported,
 };
 
diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h
new file mode 100644
index 0000000..03922f6
--- /dev/null
+++ b/src/iceberg/file_io.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <string>
+#include <string_view>
+
+#include "iceberg/error.h"
+#include "iceberg/expected.h"
+#include "iceberg/iceberg_export.h"
+
+namespace iceberg {
+
+/// \brief Pluggable module for reading, writing, and deleting files.
+///
+/// This module only handle metadata files, not data files. The metadata files
+/// are typically small and are used to store schema, partition information,
+/// and other metadata about the table.
+///
+/// Note that these functions are not atomic. For example, if a write fails,
+/// the file may be partially written. Implementations should be careful to
+/// avoid corrupting metadata files.
+class ICEBERG_EXPORT FileIO {
+ public:
+  FileIO() = default;
+  virtual ~FileIO() = default;
+
+  /// \brief Read the content of the file at the given location.
+  ///
+  /// \param file_location The location of the file to read.
+  /// \param length The number of bytes to read. Some object storage need to 
specify
+  /// the length to read, e.g. S3 `GetObject` has a Range parameter.
+  /// \return The content of the file if the read succeeded, an error code if 
the read
+  /// failed.
+  virtual expected<std::string, Error> ReadFile(const std::string& 
file_location,
+                                                std::optional<size_t> length) {
+    // We provide a default implementation to avoid Windows linker error 
LNK2019.
+    return unexpected<Error>{
+        {.kind = ErrorKind::kNotImplemented, .message = "ReadFile not 
implemented"}};
+  }
+
+  /// \brief Write the given content to the file at the given location.
+  ///
+  /// \param file_location The location of the file to write.
+  /// \param content The content to write to the file.
+  /// \param overwrite If true, overwrite the file if it exists. If false, 
fail if the
+  /// file exists.
+  /// \return void if the write succeeded, an error code if the write failed.
+  virtual expected<void, Error> WriteFile(const std::string& file_location,
+                                          std::string_view content) {
+    return unexpected<Error>{
+        {.kind = ErrorKind::kNotImplemented, .message = "WriteFile not 
implemented"}};
+  }
+
+  /// \brief Delete a file at the given location.
+  ///
+  /// \param file_location The location of the file to delete.
+  /// \return void if the delete succeeded, an error code if the delete failed.
+  virtual expected<void, Error> DeleteFile(const std::string& file_location) {
+    return unexpected<Error>{
+        {.kind = ErrorKind::kNotImplemented, .message = "DeleteFile not 
implemented"}};
+  }
+};
+
+}  // namespace iceberg
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 9849d43..96e3194 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -47,8 +47,8 @@ if(ICEBERG_BUILD_BUNDLE)
   add_test(NAME avro_test COMMAND avro_test)
 
   add_executable(arrow_test)
-  target_sources(arrow_test PRIVATE arrow_test.cc)
-  target_link_libraries(arrow_test PRIVATE iceberg_bundle_static 
Arrow::arrow_static
-                                           GTest::gtest_main GTest::gmock)
+  target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc)
+  target_link_libraries(arrow_test PRIVATE iceberg_bundle_static 
GTest::gtest_main
+                                           GTest::gmock)
   add_test(NAME arrow_test COMMAND arrow_test)
 endif()
diff --git a/test/arrow_fs_file_io_test.cc b/test/arrow_fs_file_io_test.cc
new file mode 100644
index 0000000..193c13d
--- /dev/null
+++ b/test/arrow_fs_file_io_test.cc
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/arrow/arrow_fs_file_io.h"
+
+#include <filesystem>
+
+#include <arrow/filesystem/localfs.h>
+#include <gtest/gtest.h>
+
+#include "matchers.h"
+
+namespace iceberg {
+
+class LocalFileIOTest : public testing::Test {
+ protected:
+  void SetUp() override {
+    local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
+    file_io_ = 
std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
+  }
+
+  std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
+  std::shared_ptr<iceberg::FileIO> file_io_;
+  std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / 
"123.txt";
+};
+
+TEST_F(LocalFileIOTest, ReadWriteFile) {
+  auto read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
+  EXPECT_THAT(read_res, IsError(ErrorKind::kIOError));
+  EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file"));
+
+  auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world");
+  EXPECT_THAT(write_res, IsOk());
+
+  read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt);
+  EXPECT_THAT(read_res, IsOk());
+  EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world")));
+}
+
+TEST_F(LocalFileIOTest, DeleteFile) {
+  auto del_res = file_io_->DeleteFile(tmpfile.string());
+  EXPECT_THAT(del_res, IsOk());
+
+  del_res = file_io_->DeleteFile(tmpfile.string());
+  EXPECT_THAT(del_res, IsError(ErrorKind::kIOError));
+  EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
+}
+
+}  // namespace iceberg

Reply via email to