wgtmac commented on code in PR #198:
URL: https://github.com/apache/iceberg-cpp/pull/198#discussion_r2315352526


##########
src/iceberg/parquet/parquet_writer.cc:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_writer.h"
+
+#include <memory>
+
+#include <arrow/c/bridge.h>
+#include <arrow/record_batch.h>
+#include <arrow/util/key_value_metadata.h>
+#include <parquet/arrow/schema.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/file_writer.h>
+#include <parquet/properties.h>
+
+#include "iceberg/arrow/arrow_error_transform_internal.h"
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
+    const WriterOptions& options) {
+  auto io = 
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, 
io->fs()->OpenOutputStream(options.path));
+  return output;
+}
+
+}  // namespace
+
+class ParquetWriter::Impl {
+ public:
+  Status Open(const WriterOptions& options) {
+    auto writer_properties =
+        ::parquet::WriterProperties::Builder().memory_pool(pool_)->build();
+    auto arrow_writer_properties = 
::parquet::default_arrow_writer_properties();
+
+    ArrowSchema c_schema;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema));
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, 
::arrow::ImportSchema(&c_schema));
+
+    std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
+    ICEBERG_ARROW_RETURN_NOT_OK(
+        ::parquet::arrow::ToParquetSchema(arrow_schema_.get(), 
*writer_properties,
+                                          *arrow_writer_properties, 
&schema_descriptor));
+    auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
+        schema_descriptor->schema_root());
+
+    ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
+    auto file_writer = ::parquet::ParquetFileWriter::Open(output_stream_, 
schema_node,
+                                                          writer_properties);
+    ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileWriter::Make(
+        pool_, std::move(file_writer), arrow_schema_, arrow_writer_properties, 
&writer_));

Review Comment:
   ```suggestion
           pool_, std::move(file_writer), arrow_schema_, 
std::move(arrow_writer_properties), &writer_));
   ```



##########
src/iceberg/parquet/parquet_writer.cc:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_writer.h"
+
+#include <memory>
+
+#include <arrow/c/bridge.h>
+#include <arrow/record_batch.h>
+#include <arrow/util/key_value_metadata.h>
+#include <parquet/arrow/schema.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/file_writer.h>
+#include <parquet/properties.h>
+
+#include "iceberg/arrow/arrow_error_transform_internal.h"
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
+    const WriterOptions& options) {
+  auto io = 
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, 
io->fs()->OpenOutputStream(options.path));
+  return output;
+}
+
+}  // namespace
+
+class ParquetWriter::Impl {
+ public:
+  Status Open(const WriterOptions& options) {
+    auto writer_properties =
+        ::parquet::WriterProperties::Builder().memory_pool(pool_)->build();
+    auto arrow_writer_properties = 
::parquet::default_arrow_writer_properties();
+
+    ArrowSchema c_schema;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema));
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, 
::arrow::ImportSchema(&c_schema));
+
+    std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
+    ICEBERG_ARROW_RETURN_NOT_OK(
+        ::parquet::arrow::ToParquetSchema(arrow_schema_.get(), 
*writer_properties,
+                                          *arrow_writer_properties, 
&schema_descriptor));
+    auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
+        schema_descriptor->schema_root());
+
+    ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
+    auto file_writer = ::parquet::ParquetFileWriter::Open(output_stream_, 
schema_node,
+                                                          writer_properties);

Review Comment:
   ```suggestion
       auto file_writer = ::parquet::ParquetFileWriter::Open(output_stream_, 
std::move(schema_node),
                                                             
std::move(writer_properties));
   ```



##########
test/parquet_test.cc:
##########
@@ -17,38 +17,121 @@
  * under the License.
  */
 
+#include <optional>
+
 #include <arrow/array.h>
 #include <arrow/c/bridge.h>
 #include <arrow/json/from_string.h>
 #include <arrow/record_batch.h>
 #include <arrow/table.h>
+#include <arrow/type.h>
 #include <arrow/util/key_value_metadata.h>
 #include <parquet/arrow/reader.h>
 #include <parquet/arrow/writer.h>
 #include <parquet/metadata.h>
 
+#include "iceberg/arrow/arrow_error_transform_internal.h"
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/parquet/parquet_reader.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
 #include "iceberg/parquet/parquet_register.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
 #include "matchers.h"
-#include "temp_file_test_base.h"
 
 namespace iceberg::parquet {
 
-class ParquetReaderTest : public TempFileTestBase {
+namespace {
+
+Status WriteTableInner(Writer& writer, std::shared_ptr<::arrow::Array> data) {

Review Comment:
   ```suggestion
   Status WriteTable(Writer& writer, std::shared_ptr<::arrow::Array> data) {
   ```
   
   Adding a `Inner` suffix looks weird.



##########
src/iceberg/parquet/parquet_writer.cc:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_writer.h"
+
+#include <memory>
+
+#include <arrow/c/bridge.h>
+#include <arrow/record_batch.h>
+#include <arrow/util/key_value_metadata.h>
+#include <parquet/arrow/schema.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/file_writer.h>
+#include <parquet/properties.h>
+
+#include "iceberg/arrow/arrow_error_transform_internal.h"
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
+    const WriterOptions& options) {
+  auto io = 
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, 
io->fs()->OpenOutputStream(options.path));
+  return output;
+}
+
+}  // namespace
+
+class ParquetWriter::Impl {
+ public:
+  Status Open(const WriterOptions& options) {
+    auto writer_properties =
+        ::parquet::WriterProperties::Builder().memory_pool(pool_)->build();
+    auto arrow_writer_properties = 
::parquet::default_arrow_writer_properties();
+
+    ArrowSchema c_schema;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema));
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, 
::arrow::ImportSchema(&c_schema));
+
+    std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
+    ICEBERG_ARROW_RETURN_NOT_OK(
+        ::parquet::arrow::ToParquetSchema(arrow_schema_.get(), 
*writer_properties,
+                                          *arrow_writer_properties, 
&schema_descriptor));
+    auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
+        schema_descriptor->schema_root());
+
+    ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
+    auto file_writer = ::parquet::ParquetFileWriter::Open(output_stream_, 
schema_node,
+                                                          writer_properties);
+    ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileWriter::Make(
+        pool_, std::move(file_writer), arrow_schema_, arrow_writer_properties, 
&writer_));
+
+    return {};
+  }
+
+  Status Write(ArrowArray array) {
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch,
+                                   ::arrow::ImportRecordBatch(&array, 
arrow_schema_));
+
+    ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
+
+    return {};
+  }
+
+  // Close the writer and release resources
+  Status Close() {
+    if (writer_ == nullptr) {
+      return {};  // Already closed
+    }
+
+    ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close());
+    auto& metadata = writer_->metadata();
+    split_offsets_.reserve(metadata->num_row_groups());
+    for (int i = 0; i < metadata->num_row_groups(); ++i) {
+      split_offsets_.push_back(metadata->RowGroup(i)->file_offset());
+    }
+    writer_.reset();
+
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell());
+    ICEBERG_ARROW_RETURN_NOT_OK(output_stream_->Close());

Review Comment:
   ```suggestion
       ICEBERG_ARROW_RETURN_NOT_OK(output_stream_->Close());
       ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell());
   ```
   
   Shouldn't we close it before getting the length? It may contain an internal 
buffer that is not flushed yet.



##########
test/parquet_test.cc:
##########
@@ -17,38 +17,121 @@
  * under the License.
  */
 
+#include <optional>
+
 #include <arrow/array.h>
 #include <arrow/c/bridge.h>
 #include <arrow/json/from_string.h>
 #include <arrow/record_batch.h>
 #include <arrow/table.h>
+#include <arrow/type.h>
 #include <arrow/util/key_value_metadata.h>
 #include <parquet/arrow/reader.h>
 #include <parquet/arrow/writer.h>
 #include <parquet/metadata.h>
 
+#include "iceberg/arrow/arrow_error_transform_internal.h"
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/parquet/parquet_reader.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
 #include "iceberg/parquet/parquet_register.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
 #include "matchers.h"
-#include "temp_file_test_base.h"
 
 namespace iceberg::parquet {
 
-class ParquetReaderTest : public TempFileTestBase {
+namespace {
+
+Status WriteTableInner(Writer& writer, std::shared_ptr<::arrow::Array> data) {
+  ArrowArray arr;
+  ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr));
+  ICEBERG_RETURN_UNEXPECTED(writer.Write(arr));
+  return writer.Close();
+}
+
+Status WriteTable(std::shared_ptr<::arrow::Array> data,
+                  const WriterOptions& writer_options) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, 
writer_options));
+  return WriteTableInner(*writer, data);
+}
+
+Status ReadTable(std::shared_ptr<::arrow::Array>& out,

Review Comment:
   ditto



##########
test/parquet_test.cc:
##########
@@ -17,38 +17,121 @@
  * under the License.
  */
 
+#include <optional>
+
 #include <arrow/array.h>
 #include <arrow/c/bridge.h>
 #include <arrow/json/from_string.h>
 #include <arrow/record_batch.h>
 #include <arrow/table.h>
+#include <arrow/type.h>
 #include <arrow/util/key_value_metadata.h>
 #include <parquet/arrow/reader.h>
 #include <parquet/arrow/writer.h>
 #include <parquet/metadata.h>
 
+#include "iceberg/arrow/arrow_error_transform_internal.h"
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/parquet/parquet_reader.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
 #include "iceberg/parquet/parquet_register.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
 #include "matchers.h"
-#include "temp_file_test_base.h"
 
 namespace iceberg::parquet {
 
-class ParquetReaderTest : public TempFileTestBase {
+namespace {
+
+Status WriteTableInner(Writer& writer, std::shared_ptr<::arrow::Array> data) {
+  ArrowArray arr;
+  ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr));
+  ICEBERG_RETURN_UNEXPECTED(writer.Write(arr));
+  return writer.Close();
+}
+
+Status WriteTable(std::shared_ptr<::arrow::Array> data,
+                  const WriterOptions& writer_options) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, 
writer_options));
+  return WriteTableInner(*writer, data);
+}
+
+Status ReadTable(std::shared_ptr<::arrow::Array>& out,
+                 const ReaderOptions& reader_options) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, 
reader_options));
+  ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next());
+
+  if (!read_data.has_value()) {
+    out = nullptr;
+    return {};
+  }
+  auto arrow_c_array = read_data.value();
+
+  ArrowSchema arrow_schema;
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*reader_options.projection, 
&arrow_schema));
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(out,
+                                 ::arrow::ImportArray(&arrow_c_array, 
&arrow_schema));
+  return {};
+}
+
+void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr<Schema> 
schema,
+                 std::shared_ptr<::arrow::Array>& out) {
+  std::shared_ptr<FileIO> file_io = 
arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+  const std::string basePath = "base.parquet";
+
+  auto writer_data = WriterFactoryRegistry::Open(
+      FileFormatType::kParquet, {.path = basePath, .schema = schema, .io = 
file_io});
+  ASSERT_THAT(writer_data, IsOk())
+      << "Failed to create writer: " << writer_data.error().message;
+  auto writer = std::move(writer_data.value());
+  ASSERT_THAT(WriteTableInner(*writer, data), IsOk());
+
+  ASSERT_THAT(ReadTable(out, {.path = basePath,
+                              .length = writer->length(),
+                              .io = file_io,
+                              .projection = schema}),
+              IsOk());
+
+  ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";
+}
+
+}  // namespace
+
+class ParquetReaderTest : public ::testing::Test {
  protected:
   static void SetUpTestSuite() { parquet::RegisterAll(); }
 
   void SetUp() override {
-    TempFileTestBase::SetUp();
-    file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO();
-    temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet");
+    file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+    temp_parquet_file_ = "parquet_reader_test.parquet";
   }
 
   void CreateSimpleParquetFile() {
+    auto schema = std::make_shared<Schema>(
+        std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                                 SchemaField::MakeOptional(2, "name", 
string())});
+
+    ArrowSchema arrow_c_schema;
+    ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
+    auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();

Review Comment:
   nit: use arrow::ImportSchema



##########
test/parquet_test.cc:
##########
@@ -17,38 +17,121 @@
  * under the License.
  */
 
+#include <optional>
+
 #include <arrow/array.h>
 #include <arrow/c/bridge.h>
 #include <arrow/json/from_string.h>
 #include <arrow/record_batch.h>
 #include <arrow/table.h>
+#include <arrow/type.h>
 #include <arrow/util/key_value_metadata.h>
 #include <parquet/arrow/reader.h>
 #include <parquet/arrow/writer.h>
 #include <parquet/metadata.h>
 
+#include "iceberg/arrow/arrow_error_transform_internal.h"
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/parquet/parquet_reader.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
 #include "iceberg/parquet/parquet_register.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
 #include "matchers.h"
-#include "temp_file_test_base.h"
 
 namespace iceberg::parquet {
 
-class ParquetReaderTest : public TempFileTestBase {
+namespace {
+
+Status WriteTableInner(Writer& writer, std::shared_ptr<::arrow::Array> data) {
+  ArrowArray arr;
+  ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr));
+  ICEBERG_RETURN_UNEXPECTED(writer.Write(arr));
+  return writer.Close();
+}
+
+Status WriteTable(std::shared_ptr<::arrow::Array> data,

Review Comment:
   ditto



##########
test/parquet_test.cc:
##########
@@ -17,38 +17,121 @@
  * under the License.
  */
 
+#include <optional>
+
 #include <arrow/array.h>
 #include <arrow/c/bridge.h>
 #include <arrow/json/from_string.h>
 #include <arrow/record_batch.h>
 #include <arrow/table.h>
+#include <arrow/type.h>
 #include <arrow/util/key_value_metadata.h>
 #include <parquet/arrow/reader.h>
 #include <parquet/arrow/writer.h>
 #include <parquet/metadata.h>
 
+#include "iceberg/arrow/arrow_error_transform_internal.h"
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/parquet/parquet_reader.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
 #include "iceberg/parquet/parquet_register.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
 #include "matchers.h"
-#include "temp_file_test_base.h"
 
 namespace iceberg::parquet {
 
-class ParquetReaderTest : public TempFileTestBase {
+namespace {
+
+Status WriteTableInner(Writer& writer, std::shared_ptr<::arrow::Array> data) {
+  ArrowArray arr;
+  ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr));
+  ICEBERG_RETURN_UNEXPECTED(writer.Write(arr));
+  return writer.Close();
+}
+
+Status WriteTable(std::shared_ptr<::arrow::Array> data,
+                  const WriterOptions& writer_options) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, 
writer_options));
+  return WriteTableInner(*writer, data);
+}
+
+Status ReadTable(std::shared_ptr<::arrow::Array>& out,
+                 const ReaderOptions& reader_options) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, 
reader_options));
+  ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next());
+
+  if (!read_data.has_value()) {
+    out = nullptr;
+    return {};
+  }
+  auto arrow_c_array = read_data.value();
+
+  ArrowSchema arrow_schema;
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*reader_options.projection, 
&arrow_schema));

Review Comment:
   nit: you may use reader->Schema() to get it to save some typing.



##########
test/parquet_test.cc:
##########
@@ -17,38 +17,121 @@
  * under the License.
  */
 
+#include <optional>
+
 #include <arrow/array.h>
 #include <arrow/c/bridge.h>
 #include <arrow/json/from_string.h>
 #include <arrow/record_batch.h>
 #include <arrow/table.h>
+#include <arrow/type.h>
 #include <arrow/util/key_value_metadata.h>
 #include <parquet/arrow/reader.h>
 #include <parquet/arrow/writer.h>
 #include <parquet/metadata.h>
 
+#include "iceberg/arrow/arrow_error_transform_internal.h"
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/parquet/parquet_reader.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
 #include "iceberg/parquet/parquet_register.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
 #include "matchers.h"
-#include "temp_file_test_base.h"
 
 namespace iceberg::parquet {
 
-class ParquetReaderTest : public TempFileTestBase {
+namespace {
+
+Status WriteTableInner(Writer& writer, std::shared_ptr<::arrow::Array> data) {
+  ArrowArray arr;
+  ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr));
+  ICEBERG_RETURN_UNEXPECTED(writer.Write(arr));
+  return writer.Close();
+}
+
+Status WriteTable(std::shared_ptr<::arrow::Array> data,
+                  const WriterOptions& writer_options) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, 
writer_options));
+  return WriteTableInner(*writer, data);
+}
+
+Status ReadTable(std::shared_ptr<::arrow::Array>& out,
+                 const ReaderOptions& reader_options) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, 
reader_options));
+  ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next());
+
+  if (!read_data.has_value()) {
+    out = nullptr;
+    return {};
+  }
+  auto arrow_c_array = read_data.value();
+
+  ArrowSchema arrow_schema;
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*reader_options.projection, 
&arrow_schema));
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(out,
+                                 ::arrow::ImportArray(&arrow_c_array, 
&arrow_schema));
+  return {};
+}
+
+void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr<Schema> 
schema,
+                 std::shared_ptr<::arrow::Array>& out) {
+  std::shared_ptr<FileIO> file_io = 
arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+  const std::string basePath = "base.parquet";
+
+  auto writer_data = WriterFactoryRegistry::Open(
+      FileFormatType::kParquet, {.path = basePath, .schema = schema, .io = 
file_io});
+  ASSERT_THAT(writer_data, IsOk())
+      << "Failed to create writer: " << writer_data.error().message;
+  auto writer = std::move(writer_data.value());
+  ASSERT_THAT(WriteTableInner(*writer, data), IsOk());
+
+  ASSERT_THAT(ReadTable(out, {.path = basePath,
+                              .length = writer->length(),
+                              .io = file_io,
+                              .projection = schema}),
+              IsOk());
+
+  ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";

Review Comment:
   Why not comparing `data` and `out` here?



##########
test/parquet_test.cc:
##########
@@ -204,4 +288,81 @@ TEST_F(ParquetReaderTest, ReadSplit) {
   }
 }
 
+class ParquetReadWrite : public ::testing::Test {
+ protected:
+  static void SetUpTestSuite() { parquet::RegisterAll(); }
+};
+
+TEST_F(ParquetReadWrite, EmptyStruct) {
+  auto schema =
+      
std::make_shared<Schema>(std::vector<SchemaField>{SchemaField::MakeRequired(
+          1, "empty_struct", 
std::make_shared<StructType>(std::vector<SchemaField>{}))});
+
+  ArrowSchema arrow_c_schema;
+  ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
+  auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+  auto array = ::arrow::json::ArrayFromJSONString(
+                   ::arrow::struct_(arrow_schema->fields()), R"([null, {}])")
+                   .ValueOrDie();
+
+  std::shared_ptr<FileIO> file_io = 
arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+  const std::string basePath = "base.parquet";
+
+  ASSERT_THAT(WriteTable(array, {.path = basePath, .schema = schema, .io = 
file_io}),
+              IsError(ErrorKind::kNotImplemented));
+}
+
+TEST_F(ParquetReadWrite, SimpleStructRoundTrip) {
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeOptional(1, "a",
+                                struct_({
+                                    SchemaField::MakeOptional(2, "a1", 
int64()),
+                                    SchemaField::MakeOptional(3, "a2", 
string()),
+                                })),
+  });
+
+  ArrowSchema arrow_c_schema;
+  ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
+  auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+  auto array = ::arrow::json::ArrayFromJSONString(
+                   ::arrow::struct_(arrow_schema->fields()),
+                   R"([[{"a1": 1, "a2": "abc"}], [{"a1": 0}], [{"a2": "edf"}], 
[{}]])")
+                   .ValueOrDie();
+
+  std::shared_ptr<::arrow::Array> out;
+  DoRoundtrip(array, schema, out);
+
+  ASSERT_TRUE(out->Equals(*array));
+}
+
+TEST_F(ParquetReadWrite, SimpleTypeRoundTrip) {
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeOptional(1, "a", boolean()),
+      SchemaField::MakeOptional(2, "b", int32()),
+      SchemaField::MakeOptional(3, "c", int64()),
+      SchemaField::MakeOptional(4, "d", float32()),
+      SchemaField::MakeOptional(5, "e", float64()),
+      SchemaField::MakeOptional(6, "f", string()),
+      SchemaField::MakeOptional(7, "g", time()),
+      SchemaField::MakeOptional(8, "h", timestamp()),
+  });
+
+  ArrowSchema arrow_c_schema;
+  ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
+  auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+  auto array =
+      ::arrow::json::ArrayFromJSONString(
+          ::arrow::struct_(arrow_schema->fields()),
+          R"([[true, 1, 2, 1.1, 1.2, "abc", 44614000, 1756696503000000], 
[false, 0, 0, 0, 0, "", 0, 0], [null, null, null, null, null, null, null, 
null]])")

Review Comment:
   ```suggestion
             R"([[true, 1, 2, 1.1, 1.2, "abc", 44614000, 1756696503000000],
                 [false, 0, 0, 0, 0, "", 0, 0],
                 [null, null, null, null, null, null, null, null]])")
   ```



##########
test/parquet_test.cc:
##########
@@ -17,38 +17,121 @@
  * under the License.
  */
 
+#include <optional>
+
 #include <arrow/array.h>
 #include <arrow/c/bridge.h>
 #include <arrow/json/from_string.h>
 #include <arrow/record_batch.h>
 #include <arrow/table.h>
+#include <arrow/type.h>
 #include <arrow/util/key_value_metadata.h>
 #include <parquet/arrow/reader.h>
 #include <parquet/arrow/writer.h>
 #include <parquet/metadata.h>
 
+#include "iceberg/arrow/arrow_error_transform_internal.h"
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/parquet/parquet_reader.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
 #include "iceberg/parquet/parquet_register.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
 #include "matchers.h"
-#include "temp_file_test_base.h"
 
 namespace iceberg::parquet {
 
-class ParquetReaderTest : public TempFileTestBase {
+namespace {
+
+Status WriteTableInner(Writer& writer, std::shared_ptr<::arrow::Array> data) {

Review Comment:
   This is not a Table actually. In the other test cases, I use Table because 
I've created a `::arrow::Table` instance to write to Parquet. To be precise, we 
should use `WriteArray` instead.



##########
test/parquet_test.cc:
##########
@@ -17,38 +17,121 @@
  * under the License.
  */
 
+#include <optional>
+
 #include <arrow/array.h>
 #include <arrow/c/bridge.h>
 #include <arrow/json/from_string.h>
 #include <arrow/record_batch.h>
 #include <arrow/table.h>
+#include <arrow/type.h>
 #include <arrow/util/key_value_metadata.h>
 #include <parquet/arrow/reader.h>
 #include <parquet/arrow/writer.h>
 #include <parquet/metadata.h>
 
+#include "iceberg/arrow/arrow_error_transform_internal.h"
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/parquet/parquet_reader.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
 #include "iceberg/parquet/parquet_register.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
 #include "matchers.h"
-#include "temp_file_test_base.h"
 
 namespace iceberg::parquet {
 
-class ParquetReaderTest : public TempFileTestBase {
+namespace {
+
+Status WriteTableInner(Writer& writer, std::shared_ptr<::arrow::Array> data) {
+  ArrowArray arr;
+  ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr));
+  ICEBERG_RETURN_UNEXPECTED(writer.Write(arr));
+  return writer.Close();
+}
+
+Status WriteTable(std::shared_ptr<::arrow::Array> data,
+                  const WriterOptions& writer_options) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, 
writer_options));
+  return WriteTableInner(*writer, data);
+}
+
+Status ReadTable(std::shared_ptr<::arrow::Array>& out,
+                 const ReaderOptions& reader_options) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, 
reader_options));
+  ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next());
+
+  if (!read_data.has_value()) {
+    out = nullptr;
+    return {};
+  }
+  auto arrow_c_array = read_data.value();
+
+  ArrowSchema arrow_schema;
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*reader_options.projection, 
&arrow_schema));
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(out,
+                                 ::arrow::ImportArray(&arrow_c_array, 
&arrow_schema));
+  return {};
+}
+
+void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr<Schema> 
schema,
+                 std::shared_ptr<::arrow::Array>& out) {
+  std::shared_ptr<FileIO> file_io = 
arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+  const std::string basePath = "base.parquet";
+
+  auto writer_data = WriterFactoryRegistry::Open(
+      FileFormatType::kParquet, {.path = basePath, .schema = schema, .io = 
file_io});
+  ASSERT_THAT(writer_data, IsOk())
+      << "Failed to create writer: " << writer_data.error().message;
+  auto writer = std::move(writer_data.value());
+  ASSERT_THAT(WriteTableInner(*writer, data), IsOk());
+
+  ASSERT_THAT(ReadTable(out, {.path = basePath,
+                              .length = writer->length(),
+                              .io = file_io,
+                              .projection = schema}),
+              IsOk());
+
+  ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";
+}
+
+}  // namespace
+
+class ParquetReaderTest : public ::testing::Test {
  protected:
   static void SetUpTestSuite() { parquet::RegisterAll(); }
 
   void SetUp() override {
-    TempFileTestBase::SetUp();
-    file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO();
-    temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet");
+    file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+    temp_parquet_file_ = "parquet_reader_test.parquet";
   }
 
   void CreateSimpleParquetFile() {
+    auto schema = std::make_shared<Schema>(
+        std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                                 SchemaField::MakeOptional(2, "name", 
string())});
+
+    ArrowSchema arrow_c_schema;
+    ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
+    auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();

Review Comment:
   Same for below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to