wgtmac commented on code in PR #317:
URL: https://github.com/apache/iceberg-cpp/pull/317#discussion_r2554790700


##########
src/iceberg/manifest_writer.cc:
##########
@@ -38,9 +111,33 @@ Status ManifestWriter::Add(const ManifestEntry& entry) {
   return adapter_->Append(entry);
 }
 
+Status ManifestWriter::CheckDataFile(const DataFile& file) const {
+  switch (adapter_->content()) {
+    case ManifestContent::kData:
+      if (file.content != DataFile::Content::kData) {
+        return InvalidArgument(
+            "Manifest content type: data, data file content should be: data, 
but got: {}",
+            ToString(file.content));
+      }
+      break;
+    case ManifestContent::kDeletes:
+      if (file.content != DataFile::Content::kPositionDeletes &&
+          file.content != DataFile::Content::kEqualityDeletes) {
+        return InvalidArgument(
+            "Manifest content type: deletes, data file content should be: "
+            "position_deletes or equality_deletes, but got: {}",
+            ToString(file.content));

Review Comment:
   ```suggestion
           return InvalidArgument(
               "Cannot write {} file to delete manifest file", 
ToString(file.content));
   ```



##########
src/iceberg/manifest_writer.cc:
##########
@@ -38,9 +111,33 @@ Status ManifestWriter::Add(const ManifestEntry& entry) {
   return adapter_->Append(entry);
 }
 
+Status ManifestWriter::CheckDataFile(const DataFile& file) const {
+  switch (adapter_->content()) {
+    case ManifestContent::kData:
+      if (file.content != DataFile::Content::kData) {
+        return InvalidArgument(
+            "Manifest content type: data, data file content should be: data, 
but got: {}",
+            ToString(file.content));

Review Comment:
   ```suggestion
           return InvalidArgument(
               "Cannot write {} file to data manifest file", 
ToString(file.content));
   ```



##########
src/iceberg/manifest_adapter.cc:
##########
@@ -161,6 +164,27 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
   }
 }
 
+Result<ManifestFile> ManifestEntryAdapter::ToManifestFile() const {
+  ManifestFile manifest_file;

Review Comment:
   nit: use aggregate initialization for struct



##########
src/iceberg/manifest_writer.cc:
##########
@@ -21,15 +21,88 @@
 
 #include "iceberg/manifest_entry.h"
 #include "iceberg/manifest_list.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/table_metadata.h"
 #include "iceberg/util/macros.h"
 #include "iceberg/v1_metadata.h"
 #include "iceberg/v2_metadata.h"
 #include "iceberg/v3_metadata.h"
 
 namespace iceberg {
 
-Status ManifestWriter::Add(const ManifestEntry& entry) {
+Status ManifestWriter::WriteAddedEntry(std::shared_ptr<DataFile> file,
+                                       std::optional<int64_t> 
data_sequence_number) {
+  ManifestEntry added;
+  added.status = ManifestStatus::kAdded;
+  added.snapshot_id = adapter_->snapshot_id();
+  added.data_file = std::move(file);

Review Comment:
   Just reminder that first_row_id should be suppressed if set: 
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java#L79
   
   Same for the `WriteAddedEntry` function below



##########
src/iceberg/v2_metadata.cc:
##########
@@ -83,11 +83,18 @@ Status ManifestEntryAdapterV2::Init() {
 
   ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
                           partition_spec_->PartitionType(*current_schema_));
+  partition_summary_ = std::make_unique<PartitionSummary>(*partition_type);
   manifest_schema_ = EntrySchema(std::move(partition_type));
   return ToArrowSchema(*manifest_schema_, &schema_);
 }
 
 Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) {
+  if (entry.IsAlive() && entry.sequence_number.has_value()) {
+    if (!min_sequence_number_.has_value() ||

Review Comment:
   Why the maintenance of `min_sequence_number_` is duplicated in v2 and v3 but 
not v1? I think the best place is in the ManifestWriter::WriteEntry together 
with stats update.



##########
src/iceberg/manifest_writer.h:
##########
@@ -100,9 +156,20 @@ class ICEBERG_EXPORT ManifestWriter {
       std::shared_ptr<Schema> current_schema, ManifestContent content);
 
  private:
+  /// \brief Write the entry that all its fields are populated correctly.
+  /// \param entry Manifest entry to write.
+  /// \return Status::OK() if entry was written successfully
+  /// \note All other write entry variants delegate to this method after 
populating
+  /// the necessary fields.
+  Status WriteEntry(const ManifestEntry& entry);

Review Comment:
   I think this can be public because `AddAll(const std::vector<ManifestEntry>& 
entries)` is also public.



##########
src/iceberg/partition_summary_internal.h:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+class PartitionFieldStats {
+ public:
+  explicit PartitionFieldStats(const std::shared_ptr<Type>& type) : 
type_(type) {}
+
+  Status Update(const Literal& value);
+
+  Result<PartitionFieldSummary> Finish() const;
+
+  const std::shared_ptr<Type>& type() const { return type_; }
+
+ private:
+  std::shared_ptr<Type> type_{nullptr};

Review Comment:
   Perhaps `const Type& type_` is better to avoid copy.



##########
src/iceberg/partition_summary_internal.h:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+class PartitionFieldStats {

Review Comment:
   Add ICEBERG_EXPORT if used in the test.



##########
src/iceberg/partition_summary_internal.cc:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/partition_summary_internal.h"
+
+#include <memory>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/result.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Status PartitionFieldStats::Update(const Literal& value) {
+  if (type_->type_id() != value.type()->type_id()) {
+    return InvalidArgument("value is not compatible with type");
+  }
+
+  if (value.IsNull()) {
+    contains_null_ = true;
+    return {};
+  }
+
+  if (value.IsNaN()) {
+    contains_nan_ = true;
+    return {};
+  }
+
+  if (!lower_bound_ || value < *lower_bound_) {
+    lower_bound_ = value;
+  }
+  if (!upper_bound_ || value > *upper_bound_) {
+    upper_bound_ = value;
+  }
+  return {};
+}
+
+Result<PartitionFieldSummary> PartitionFieldStats::Finish() const {
+  PartitionFieldSummary summary;
+  summary.contains_null = contains_null_;
+  summary.contains_nan = contains_nan_;
+  if (lower_bound_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto serialized_lower, lower_bound_->Serialize())
+    summary.lower_bound = std::move(serialized_lower);
+  }
+  if (upper_bound_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto serialized_upper, upper_bound_->Serialize())
+    summary.upper_bound = std::move(serialized_upper);
+  }
+  return summary;
+}
+
+PartitionSummary::PartitionSummary(const StructType& partition_type) {
+  std::vector<PartitionFieldStats> field_stats;

Review Comment:
   BTW, why not directly use `field_stats_`?



##########
src/iceberg/manifest_list.h:
##########
@@ -72,17 +72,17 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
   static const StructType& Type();
 };
 
+/// \brief The type of files tracked by the manifest, either data or delete 
files; 0 for
+/// all v1 manifests
+enum class ManifestContent {

Review Comment:
   I think `ManifestContent` is fine which is used by the Java impl.



##########
src/iceberg/partition_summary_internal.cc:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/partition_summary_internal.h"
+
+#include <memory>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/result.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Status PartitionFieldStats::Update(const Literal& value) {
+  if (type_->type_id() != value.type()->type_id()) {
+    return InvalidArgument("value is not compatible with type");
+  }
+
+  if (value.IsNull()) {
+    contains_null_ = true;
+    return {};
+  }
+
+  if (value.IsNaN()) {
+    contains_nan_ = true;
+    return {};
+  }
+
+  if (!lower_bound_ || value < *lower_bound_) {
+    lower_bound_ = value;
+  }
+  if (!upper_bound_ || value > *upper_bound_) {
+    upper_bound_ = value;
+  }
+  return {};
+}
+
+Result<PartitionFieldSummary> PartitionFieldStats::Finish() const {
+  PartitionFieldSummary summary;
+  summary.contains_null = contains_null_;
+  summary.contains_nan = contains_nan_;
+  if (lower_bound_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto serialized_lower, lower_bound_->Serialize())
+    summary.lower_bound = std::move(serialized_lower);
+  }
+  if (upper_bound_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto serialized_upper, upper_bound_->Serialize())
+    summary.upper_bound = std::move(serialized_upper);
+  }

Review Comment:
   ```suggestion
     if (lower_bound_) {
       ICEBERG_ASSIGN_OR_RAISE(summary.lower_bound, lower_bound_->Serialize())
     }
     if (upper_bound_) {
       ICEBERG_ASSIGN_OR_RAISE(summary.upper_bound, upper_bound_->Serialize())
     }
   ```



##########
src/iceberg/partition_summary_internal.cc:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/partition_summary_internal.h"
+
+#include <memory>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/result.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Status PartitionFieldStats::Update(const Literal& value) {
+  if (type_->type_id() != value.type()->type_id()) {
+    return InvalidArgument("value is not compatible with type");
+  }
+
+  if (value.IsNull()) {
+    contains_null_ = true;
+    return {};
+  }
+
+  if (value.IsNaN()) {
+    contains_nan_ = true;
+    return {};
+  }
+
+  if (!lower_bound_ || value < *lower_bound_) {
+    lower_bound_ = value;
+  }
+  if (!upper_bound_ || value > *upper_bound_) {
+    upper_bound_ = value;
+  }
+  return {};
+}
+
+Result<PartitionFieldSummary> PartitionFieldStats::Finish() const {
+  PartitionFieldSummary summary;
+  summary.contains_null = contains_null_;
+  summary.contains_nan = contains_nan_;
+  if (lower_bound_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto serialized_lower, lower_bound_->Serialize())
+    summary.lower_bound = std::move(serialized_lower);
+  }
+  if (upper_bound_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto serialized_upper, upper_bound_->Serialize())
+    summary.upper_bound = std::move(serialized_upper);
+  }
+  return summary;
+}
+
+PartitionSummary::PartitionSummary(const StructType& partition_type) {
+  std::vector<PartitionFieldStats> field_stats;

Review Comment:
   nit: call reserve on field_stats



##########
src/iceberg/manifest_adapter.cc:
##########
@@ -161,6 +164,27 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
   }
 }
 
+Result<ManifestFile> ManifestEntryAdapter::ToManifestFile() const {
+  ManifestFile manifest_file;
+  manifest_file.partition_spec_id = partition_spec_->spec_id();
+  manifest_file.content = content_;
+  // sequence_number and min_sequence_number with kInvalidSequenceNumber will 
be
+  // replace with real sequence number in `ManifestListWriter`.
+  manifest_file.sequence_number = TableMetadata::kInvalidSequenceNumber;
+  manifest_file.min_sequence_number =
+      min_sequence_number_.value_or(TableMetadata::kInvalidSequenceNumber);
+  manifest_file.existing_files_count = existing_files_count_;
+  manifest_file.added_files_count = add_files_count_;
+  manifest_file.existing_files_count = existing_files_count_;
+  manifest_file.deleted_files_count = delete_files_count_;
+  manifest_file.added_rows_count = add_rows_count_;
+  manifest_file.existing_rows_count = existing_rows_count_;
+  manifest_file.deleted_rows_count = delete_rows_count_;
+  ICEBERG_ASSIGN_OR_RAISE(auto partition_summary, 
partition_summary_->Summaries());
+  manifest_file.partitions = std::move(partition_summary);

Review Comment:
   ```suggestion
     ICEBERG_ASSIGN_OR_RAISE(manifest_file.partitions, 
partition_summary_->Summaries());
   ```



##########
src/iceberg/manifest_adapter.cc:
##########
@@ -161,6 +164,27 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
   }
 }
 
+Result<ManifestFile> ManifestEntryAdapter::ToManifestFile() const {

Review Comment:
   It looks strange that both `ManifestEntryAdapter` and `ManifestWriter` 
support `ToManifestFile()` but the one returned by `ManifestEntryAdapter` is 
partial. Should we remove the function here and maintain all entry stats in the 
ManifestWriter to make it look less strange?



##########
src/iceberg/manifest_writer.cc:
##########
@@ -50,11 +147,28 @@ Status ManifestWriter::Close() {
     ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
     ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
   }
-  return writer_->Close();
+  ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+  closed_ = true;
+  return {};
 }
 
 ManifestContent ManifestWriter::content() const { return adapter_->content(); }
 
+Result<Metrics> ManifestWriter::metrics() const { return writer_->metrics(); }
+
+Result<ManifestFile> ManifestWriter::ToManifestFile() const {
+  if (!closed_) [[unlikely]] {
+    return Invalid("Cannot get ManifestFile before closing the writer.");
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, adapter_->ToManifestFile());
+  manifest_file.manifest_path = manifest_location_;
+  manifest_file.manifest_length = writer_->length().value_or(0);

Review Comment:
   Shouldn't we return error if `writer_->length()` returns error?



##########
src/iceberg/metrics.h:
##########
@@ -32,12 +32,12 @@ namespace iceberg {
 /// \brief Iceberg file format metrics
 struct ICEBERG_EXPORT Metrics {
   int64_t row_count = 0;

Review Comment:
   The reason is that now we return `Result<Metrics>` and we need to make it 
explicit that row_count is unavailable. In the Java impl it is also a `Long` 
type.



##########
src/iceberg/partition_summary_internal.cc:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/partition_summary_internal.h"

Review Comment:
   Fair enough. Just note that only header file is required to add the suffix. 
We can remove the suffix from .cc file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to