zhjwpku commented on code in PR #641:
URL: https://github.com/apache/iceberg-cpp/pull/641#discussion_r3201619207


##########
src/iceberg/file_io.h:
##########
@@ -50,21 +138,14 @@ class ICEBERG_EXPORT FileIO {
   /// \return The content of the file if the read succeeded, an error code if 
the read
   /// failed.
   virtual Result<std::string> ReadFile(const std::string& file_location,
-                                       std::optional<size_t> length) {
-    // We provide a default implementation to avoid Windows linker error 
LNK2019.
-    return NotImplemented("ReadFile not implemented");
-  }
+                                       std::optional<size_t> length);

Review Comment:
   Can we just use int64_t for the length?



##########
src/iceberg/test/std_io.h:
##########
@@ -19,78 +19,297 @@
 
 #pragma once
 
+#include <cstddef>
+#include <cstdint>
 #include <filesystem>
 #include <fstream>
+#include <ios>
+#include <limits>
+#include <memory>
 #include <optional>
-#include <sstream>
+#include <span>
 #include <string>
 #include <string_view>
+#include <system_error>
+#include <utility>
 
 #include "iceberg/file_io.h"
 #include "iceberg/result.h"
+#include "iceberg/util/macros.h"
 
 namespace iceberg::test {
 
-/// \brief Simple local filesystem FileIO implementation for testing
-///
-/// This class provides a basic FileIO implementation that reads and writes
-/// files to the local filesystem using standard C++ file streams.
-class StdFileIO : public FileIO {
+namespace detail {
+
+inline Result<std::streamsize> ToStreamSize(size_t size) {
+  if (size > static_cast<size_t>(std::numeric_limits<std::streamsize>::max())) 
{
+    return InvalidArgument("Buffer size {} exceeds streamsize max", size);
+  }
+  return static_cast<std::streamsize>(size);
+}
+
+inline Result<int64_t> ToInt64FileSize(uintmax_t size, std::string_view 
location) {
+  if (size > static_cast<uintmax_t>(std::numeric_limits<int64_t>::max())) {
+    return Invalid("File size for {} exceeds int64_t max", location);
+  }
+  return static_cast<int64_t>(size);
+}
+
+}  // namespace detail
+
+class StdSeekableInputStream : public SeekableInputStream {
  public:
-  Result<std::string> ReadFile(const std::string& file_location,
-                               std::optional<size_t> length) override {
-    std::ifstream file(file_location, std::ios::binary);
-    if (!file.is_open()) {
-      return IOError("Failed to open file for reading: {}", file_location);
-    }
-
-    if (length.has_value()) {
-      std::string content(length.value(), '\0');
-      file.read(content.data(), length.value());
-      if (!file) {
-        return IOError("Failed to read {} bytes from file: {}", length.value(),
-                       file_location);
+  explicit StdSeekableInputStream(std::string location)
+      : location_(std::move(location)), file_(location_, std::ios::binary) {}
+
+  bool is_open() const { return file_.is_open(); }
+
+  Result<int64_t> Position() const override {
+    auto position = file_.tellg();
+    if (position < 0) {
+      return IOError("Failed to get read position for: {}", location_);
+    }
+    return static_cast<int64_t>(position);
+  }
+
+  Status Seek(int64_t position) override {
+    if (position < 0) {
+      return InvalidArgument("Cannot seek to negative position {}", position);
+    }
+    file_.clear();
+    file_.seekg(position);
+    if (!file_) {
+      return IOError("Failed to seek to {} in file: {}", position, location_);
+    }
+    return {};
+  }
+
+  Result<int64_t> Read(std::span<std::byte> out) override {
+    if (out.empty()) {
+      return 0;
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto read_size, detail::ToStreamSize(out.size()));
+    file_.read(reinterpret_cast<char*>(out.data()), read_size);
+    auto bytes_read = file_.gcount();
+    if (!file_) {
+      if (file_.bad() || !file_.eof()) {
+        return IOError("Failed to read from file: {}", location_);
       }
-      return content;
-    } else {
-      std::stringstream buffer;
-      buffer << file.rdbuf();
-      if (!file && !file.eof()) {
-        return IOError("Failed to read file: {}", file_location);
+      file_.clear();
+    }
+    if (bytes_read < 0) {
+      return IOError("Failed to read from file: {}", location_);
+    }
+    return static_cast<int64_t>(bytes_read);
+  }
+
+  Status ReadFully(int64_t position, std::span<std::byte> out) override {
+    if (position < 0) {
+      return InvalidArgument("Cannot read from negative position {}", 
position);
+    }
+    if (out.empty()) {
+      return {};
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto original_position, Position());
+    auto seek_status = Seek(position);
+    if (!seek_status.has_value()) {
+      static_cast<void>(Seek(original_position));
+      return seek_status;
+    }
+
+    Status read_status = {};
+    size_t total_read = 0;
+    while (total_read < out.size()) {
+      auto read_result = Read(out.subspan(total_read));
+      if (!read_result.has_value()) {
+        read_status = std::unexpected<Error>(read_result.error());
+        break;
+      }
+      if (read_result.value() == 0) {
+        read_status =
+            IOError("Failed to read {} bytes from file: {}", out.size(), 
location_);
+        break;
       }
-      return buffer.str();
+      total_read += static_cast<size_t>(read_result.value());
+    }
+
+    auto restore_status = Seek(original_position);
+    ICEBERG_RETURN_UNEXPECTED(read_status);
+    return restore_status;
+  }
+
+  Status Close() override {
+    if (!file_.is_open()) {
+      return {};
+    }
+    file_.close();
+    if (!file_) {
+      return IOError("Failed to close file: {}", location_);
+    }
+    return {};
+  }
+
+ private:
+  std::string location_;
+  mutable std::ifstream file_;
+};
+
+class StdPositionOutputStream : public PositionOutputStream {
+ public:
+  explicit StdPositionOutputStream(std::string location)
+      : location_(std::move(location)),
+        file_(location_, std::ios::binary | std::ios::out | std::ios::trunc) {}
+
+  bool is_open() const { return file_.is_open(); }
+
+  Result<int64_t> Position() const override {
+    auto position = file_.tellp();
+    if (position < 0) {
+      return IOError("Failed to get write position for: {}", location_);
+    }
+    return static_cast<int64_t>(position);
+  }
+
+  Status Write(std::span<const std::byte> data) override {
+    if (data.empty()) {
+      return {};
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto write_size, 
detail::ToStreamSize(data.size()));
+    file_.write(reinterpret_cast<const char*>(data.data()), write_size);
+    if (!file_) {
+      return IOError("Failed to write to file: {}", location_);
     }
+    return {};
   }
 
-  Status WriteFile(const std::string& file_location, std::string_view content) 
override {
-    // Create parent directories if they don't exist
-    std::filesystem::path path(file_location);
+  Status Flush() override {
+    file_.flush();

Review Comment:
   Do we need is_open() here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to