wgtmac commented on code in PR #651:
URL: https://github.com/apache/iceberg-cpp/pull/651#discussion_r3287140131
##########
src/iceberg/parquet/parquet_writer.cc:
##########
@@ -152,15 +157,34 @@ class ParquetWriter::Impl {
std::vector<int64_t> split_offsets() const { return split_offsets_; }
+ Result<Metrics> metrics() {
+ if (writer_) {
+ return Invalid("Cannot return metrics for unclosed writer");
+ }
+ if (!metadata_) {
+ return Metrics();
+ }
+ return ParquetMetrics::GetMetrics(*schema_, *parquet_schema_,
*metrics_config_,
+ *metadata_, {});
Review Comment:
This drops the write-side field metrics. Java passes `model.metrics()` into
`ParquetMetrics.metrics(...)`, which is where float/double NaN counts come
from. With `{}` here, `nan_value_counts` stays empty even when the file has
NaNs, and the tests currently skip that assertion. We should either
collect/pass `FieldMetrics` here or leave NaN metrics unsupported explicitly.
##########
src/iceberg/parquet/parquet_metrics.cc:
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_metrics.h"
+
+#include <limits>
+#include <optional>
+#include <ranges>
+#include <string>
+#include <unordered_map>
+
+#include <parquet/column_reader.h>
+#include <parquet/schema.h>
+#include <parquet/statistics.h>
+#include <parquet/types.h>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/conversions.h"
+#include "iceberg/util/truncate_util.h"
+#include "iceberg/util/visit_type.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+/// \brief Get the Iceberg field ID from a Parquet column descriptor.
+/// \return The field ID, or nullopt if no field ID is set.
+std::optional<int32_t> GetFieldId(const ::parquet::ColumnDescriptor& column) {
+ const auto& node = column.schema_node();
+ if (node == nullptr || !node->is_primitive()) {
+ return std::nullopt;
+ }
+ if (node->field_id() < 0) {
+ return std::nullopt;
+ }
+ return node->field_id();
+}
+
+/// \brief Find the column index for a field in the Parquet schema.
+std::optional<int32_t> FindColumnIndex(const ::parquet::SchemaDescriptor&
parquet_schema,
+ int32_t field_id) {
+ auto columns = std::views::iota(0, parquet_schema.num_columns());
+ auto it = std::ranges::find_if(columns, [&](int i) {
+ auto column_field_id = GetFieldId(*parquet_schema.Column(i));
+ return column_field_id.has_value() && column_field_id.value() == field_id;
+ });
+ return it != columns.end() ? std::optional(*it) : std::nullopt;
+}
+
+/// \brief Collect counts (value count and null count) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \return A pair of (value_count, null_count), or nullopt if stats are not
available.
+std::optional<FieldMetrics> CollectCounts(int32_t field_id,
+ const ::parquet::FileMetaData&
metadata,
+ int32_t column_idx) {
+ int64_t value_count = 0;
+ int64_t null_count = 0;
+
+ for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+ auto row_group = metadata.RowGroup(rg);
+ auto column_chunk = row_group->ColumnChunk(column_idx);
+ auto stats = column_chunk->statistics();
+ if (stats == nullptr || !stats->HasNullCount()) {
+ return std::nullopt;
+ }
+
+ null_count += stats->null_count();
+ value_count += column_chunk->num_values();
+ }
+
+ return FieldMetrics{
+ .field_id = field_id, .value_count = value_count, .null_value_count =
null_count};
+}
+
+/// \brief Collect bounds (lower and upper) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param iceberg_type The Iceberg primitive type for deserializing values.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \param truncate_length The length to truncate strings/binary values.
+/// \return FieldMetrics with counts and bounds, or nullopt if stats are not
available.
+Result<std::optional<FieldMetrics>> CollectBounds(
+ int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
+ const ::parquet::FileMetaData& metadata, int32_t column_idx,
+ int32_t truncate_length) {
+ int64_t null_count = 0;
+ int64_t value_count = 0;
+ std::optional<Literal> lower_bound;
+ std::optional<Literal> upper_bound;
+
+ for (int32_t rg = 0; rg < metadata.num_row_groups(); ++rg) {
+ auto row_group = metadata.RowGroup(rg);
+ auto column_chunk = row_group->ColumnChunk(column_idx);
+ auto stats = column_chunk->statistics();
+ if (stats == nullptr || !stats->HasNullCount()) {
+ return std::nullopt;
+ }
+
+ null_count += stats->null_count();
+ value_count += column_chunk->num_values();
+
+ if (stats->HasMinMax()) {
+ auto min_bytes = stats->EncodeMin();
+ auto min_span = std::span<const uint8_t>(
+ reinterpret_cast<const uint8_t*>(min_bytes.data()),
min_bytes.size());
+ ICEBERG_ASSIGN_OR_RAISE(auto min_value,
+ Conversions::FromBytes(iceberg_type, min_span));
+ if (!lower_bound.has_value() || min_value < lower_bound.value()) {
+ lower_bound = std::move(min_value);
+ }
+
+ auto max_bytes = stats->EncodeMax();
+ auto max_span = std::span<const uint8_t>(
+ reinterpret_cast<const uint8_t*>(max_bytes.data()),
max_bytes.size());
+ ICEBERG_ASSIGN_OR_RAISE(auto max_value,
+ Conversions::FromBytes(iceberg_type, max_span));
+ if (!upper_bound.has_value() || max_value > upper_bound.value()) {
+ upper_bound = std::move(max_value);
+ }
+ }
+ }
+
+ if (!lower_bound.has_value() || !upper_bound.has_value() ||
lower_bound->IsNaN() ||
+ upper_bound->IsNaN()) {
+ return FieldMetrics{
+ .field_id = field_id,
+ .value_count = value_count,
+ .null_value_count = null_count,
+ };
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto truncated_lower,
+ TruncateUtils::TruncateLowerBound(
+ *iceberg_type, lower_bound.value(),
truncate_length));
+ ICEBERG_ASSIGN_OR_RAISE(auto truncated_upper,
Review Comment:
This turns a missing representable upper bound into a hard metrics failure.
For `truncate(N)`, Java returns null from `BinaryUtil.truncateBinaryMax` /
`UnicodeUtil.truncateStringMax` when values like `0xff...` cannot produce a
safe upper bound, and `ParquetMetrics` just omits the upper bound. Here
`TruncateUpperBound` bubbles an `InvalidArgument` out of `writer->metrics()`,
so a valid file can be written but fail while building DataFile metrics. Can we
treat this case as no upper bound instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]