This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 8295d50e feat: impl metrics config (#488)
8295d50e is described below

commit 8295d50e11385141d418d34bcdf4ef79083d4fa6
Author: wzhuo <[email protected]>
AuthorDate: Mon Jan 12 11:49:36 2026 +0800

    feat: impl metrics config (#488)
---
 src/iceberg/metrics_config.cc           | 203 +++++++++++++++++++++++++-
 src/iceberg/metrics_config.h            |  61 +++++++-
 src/iceberg/sort_order.cc               |  14 ++
 src/iceberg/sort_order.h                |   4 +
 src/iceberg/test/metrics_config_test.cc | 251 +++++++++++++++++++++++++++++---
 src/iceberg/util/string_util.h          |   7 +
 src/iceberg/util/type_util.cc           |  29 ++++
 src/iceberg/util/type_util.h            |  15 ++
 8 files changed, 558 insertions(+), 26 deletions(-)

diff --git a/src/iceberg/metrics_config.cc b/src/iceberg/metrics_config.cc
index f78cadf2..8e2c7262 100644
--- a/src/iceberg/metrics_config.cc
+++ b/src/iceberg/metrics_config.cc
@@ -19,15 +19,201 @@
 
 #include "iceberg/metrics_config.h"
 
+#include <charconv>
 #include <string>
 #include <unordered_map>
 
 #include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/sort_order.h"
+#include "iceberg/table.h"
 #include "iceberg/table_properties.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/type_util.h"
 
 namespace iceberg {
 
+namespace {
+
+constexpr std::string_view kNoneName = "none";
+constexpr std::string_view kCountsName = "counts";
+constexpr std::string_view kFullName = "full";
+constexpr std::string_view kTruncatePrefix = "truncate(";
+constexpr int32_t kDefaultTruncateLength = 16;
+constexpr MetricsMode kDefaultMetricsMode = {.kind = 
MetricsMode::Kind::kTruncate,
+                                             .length = kDefaultTruncateLength};
+
+MetricsMode SortedColumnDefaultMode(MetricsMode default_mode) {
+  if (default_mode.kind == MetricsMode::Kind::kNone ||
+      default_mode.kind == MetricsMode::Kind::kCounts) {
+    return kDefaultMetricsMode;
+  } else {
+    return default_mode;
+  }
+}
+
+int32_t MaxInferredColumns(const TableProperties& properties) {
+  int32_t max_inferred_columns =
+      properties.Get(TableProperties::kMetricsMaxInferredColumnDefaults);
+  if (max_inferred_columns < 0) {
+    // fallback to default
+    return TableProperties::kMetricsMaxInferredColumnDefaults.value();
+  }
+  return max_inferred_columns;
+}
+
+Result<MetricsMode> ParseMode(std::string_view mode, MetricsMode fallback) {
+  return MetricsMode::FromString(mode).value_or(fallback);
+}
+
+}  // namespace
+
+MetricsMode MetricsMode::None() { return {.kind = Kind::kNone}; }
+
+MetricsMode MetricsMode::Counts() { return {.kind = Kind::kCounts}; }
+
+MetricsMode MetricsMode::Full() { return {.kind = Kind::kFull}; }
+
+Result<MetricsMode> MetricsMode::FromString(std::string_view mode) {
+  if (StringUtils::EqualsIgnoreCase(mode, kNoneName)) {
+    return MetricsMode::None();
+  } else if (StringUtils::EqualsIgnoreCase(mode, kCountsName)) {
+    return MetricsMode::Counts();
+  } else if (StringUtils::EqualsIgnoreCase(mode, kFullName)) {
+    return MetricsMode::Full();
+  }
+
+  if (StringUtils::StartsWithIgnoreCase(mode, kTruncatePrefix) && 
mode.ends_with(")")) {
+    int32_t length;
+    auto [ptr, ec] = std::from_chars(mode.data() + 9 /* "truncate(" length */,
+                                     mode.data() + mode.size() - 1, length);
+    if (ec != std::errc{}) {
+      return InvalidArgument("Invalid truncate mode: {}", mode);
+    }
+    if (length == kDefaultTruncateLength) {
+      return kDefaultMetricsMode;
+    }
+    ICEBERG_PRECHECK(length > 0, "Truncate length should be positive.");
+    return MetricsMode{.kind = Kind::kTruncate, .length = length};
+  }
+  return InvalidArgument("Invalid metrics mode: {}", mode);
+}
+
+MetricsConfig::MetricsConfig(ColumnModeMap column_modes, MetricsMode 
default_mode)
+    : column_modes_(std::move(column_modes)), default_mode_(default_mode) {}
+
+const std::shared_ptr<MetricsConfig>& MetricsConfig::Default() {
+  static const std::shared_ptr<MetricsConfig> kDefaultConfig(
+      new MetricsConfig(/*column_modes=*/{}, kDefaultMetricsMode));
+  return kDefaultConfig;
+}
+
+Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(const Table& table) 
{
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, table.schema());
+  auto sort_order = table.sort_order();
+  return MakeInternal(table.properties(), *schema,
+                      *sort_order.value_or(SortOrder::Unsorted()));
+}
+
+Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
+    const TableProperties& props, const Schema& schema, const SortOrder& 
order) {
+  ColumnModeMap column_modes;
+
+  MetricsMode default_mode = kDefaultMetricsMode;
+  if 
(props.configs().contains(TableProperties::kDefaultWriteMetricsMode.key())) {
+    std::string configured_metrics_mode =
+        props.Get(TableProperties::kDefaultWriteMetricsMode);
+    ICEBERG_ASSIGN_OR_RAISE(default_mode,
+                            ParseMode(configured_metrics_mode, 
kDefaultMetricsMode));
+  } else {
+    int32_t max_inferred_columns = MaxInferredColumns(props);
+    GetProjectedIdsVisitor visitor(/*include_struct_ids=*/true);
+    ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema));
+    auto projected_columns = static_cast<int32_t>(visitor.Finish().size());
+    if (max_inferred_columns < projected_columns) {
+      ICEBERG_ASSIGN_OR_RAISE(auto limit_field_ids,
+                              LimitFieldIds(schema, max_inferred_columns));
+      for (auto id : limit_field_ids) {
+        ICEBERG_ASSIGN_OR_RAISE(auto column_name, 
schema.FindColumnNameById(id));
+        ICEBERG_CHECK(column_name.has_value(), "Field id {} not found in 
schema", id);
+        column_modes[std::string(column_name.value())] = kDefaultMetricsMode;
+      }
+      // All other columns don't use metrics
+      default_mode = MetricsMode::None();
+    }
+  }
+
+  // First set sorted column with sorted column default (can be overridden by 
user)
+  auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
+  auto sorted_columns = SortOrder::OrderPreservingSortedColumns(schema, order);
+  for (const auto& sorted_column : sorted_columns) {
+    column_modes[std::string(sorted_column)] = sorted_col_default_mode;
+  }
+
+  // Handle user overrides of defaults
+  for (const auto& prop : props.configs()) {
+    if (prop.first.starts_with(TableProperties::kMetricModeColumnConfPrefix)) {
+      std::string column_alias =
+          
prop.first.substr(TableProperties::kMetricModeColumnConfPrefix.size());
+      ICEBERG_ASSIGN_OR_RAISE(auto mode, ParseMode(prop.second, default_mode));
+      column_modes[std::move(column_alias)] = mode;
+    }
+  }
+
+  return std::shared_ptr<MetricsConfig>(
+      new MetricsConfig(std::move(column_modes), default_mode));
+}
+
+Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& 
schema,
+                                                                 int32_t 
limit) {
+  class Visitor {
+   public:
+    explicit Visitor(int32_t limit) : limit_(limit) {}
+
+    Status Visit(const Type& type) {
+      if (type.is_nested()) {
+        return VisitNested(internal::checked_cast<const NestedType&>(type));
+      } else {
+        return VisitPrimitive(internal::checked_cast<const 
PrimitiveType&>(type));
+      }
+    }
+
+    Status VisitNested(const NestedType& type) {
+      for (const auto& field : type.fields()) {
+        if (!ShouldContinue()) {
+          break;
+        }
+        // TODO(zhuo.wang): variant type should also be handled here
+        if (field.type()->is_primitive()) {
+          ids_.insert(field.field_id());
+        }
+      }
+
+      for (const auto& field : type.fields()) {
+        if (ShouldContinue()) {
+          ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+        }
+      }
+      return {};
+    }
+
+    Status VisitPrimitive(const PrimitiveType& type) { return {}; }
+
+    std::unordered_set<int32_t> Finish() const { return ids_; }
+
+   private:
+    bool ShouldContinue() { return ids_.size() < limit_; }
+
+   private:
+    std::unordered_set<int32_t> ids_;
+    int32_t limit_;
+  };
+
+  Visitor visitor(limit);
+  ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema));
+  return visitor.Finish();
+}
+
 Status MetricsConfig::VerifyReferencedColumns(
     const std::unordered_map<std::string, std::string>& updates, const Schema& 
schema) {
   for (const auto& [key, value] : updates) {
@@ -37,14 +223,19 @@ Status MetricsConfig::VerifyReferencedColumns(
     auto field_name =
         
std::string_view(key).substr(TableProperties::kMetricModeColumnConfPrefix.size());
     ICEBERG_ASSIGN_OR_RAISE(auto field, schema.FindFieldByName(field_name));
-    if (!field.has_value()) {
-      return ValidationFailed(
-          "Invalid metrics config, could not find column {} from table prop {} 
in "
-          "schema {}",
-          field_name, key, schema.ToString());
-    }
+    ICEBERG_CHECK(field.has_value(),
+                  "Invalid metrics config, could not find column {} from table 
prop {} "
+                  "in schema {}",
+                  field_name, key, schema.ToString());
   }
   return {};
 }
 
+MetricsMode MetricsConfig::ColumnMode(std::string_view column_name) const {
+  if (auto it = column_modes_.find(column_name); it != column_modes_.end()) {
+    return it->second;
+  }
+  return default_mode_;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/metrics_config.h b/src/iceberg/metrics_config.h
index c42539d6..7a49e906 100644
--- a/src/iceberg/metrics_config.h
+++ b/src/iceberg/metrics_config.h
@@ -22,24 +22,83 @@
 /// \file iceberg/metrics_config.h
 /// \brief Metrics configuration for Iceberg tables
 
+#include <memory>
 #include <string>
+#include <string_view>
 #include <unordered_map>
+#include <unordered_set>
+#include <variant>
 
 #include "iceberg/iceberg_export.h"
 #include "iceberg/result.h"
 #include "iceberg/type_fwd.h"
+#include "iceberg/util/string_util.h"
 
 namespace iceberg {
 
-/// \brief Configuration utilities for table metrics
+struct ICEBERG_EXPORT MetricsMode {
+ public:
+  enum class Kind : uint8_t {
+    kNone,
+    kCounts,
+    kTruncate,
+    kFull,
+  };
+
+  static Result<MetricsMode> FromString(std::string_view mode);
+  static MetricsMode None();
+  static MetricsMode Counts();
+  static MetricsMode Full();
+
+  Kind kind;
+  std::variant<std::monostate, int32_t> length;
+};
+
+/// \brief Configuration for collecting column metrics for an Iceberg table.
 class ICEBERG_EXPORT MetricsConfig {
  public:
+  /// \brief Get the default metrics config.
+  static const std::shared_ptr<MetricsConfig>& Default();
+
+  /// \brief Creates a metrics config from a table.
+  static Result<std::shared_ptr<MetricsConfig>> Make(const Table& table);
+
+  /// \brief Get `limit` num of primitive field ids from schema
+  static Result<std::unordered_set<int32_t>> LimitFieldIds(const Schema& 
schema,
+                                                           int32_t limit);
+
   /// \brief Verify that all referenced columns are valid
   /// \param updates The updates to verify
   /// \param schema The schema to verify against
   /// \return OK if all referenced columns are valid
   static Status VerifyReferencedColumns(
       const std::unordered_map<std::string, std::string>& updates, const 
Schema& schema);
+
+  /// \brief Get the metrics mode for a specific column
+  /// \param column_name The full name of the column
+  /// \return The metrics mode for the column
+  MetricsMode ColumnMode(std::string_view column_name) const;
+
+ private:
+  using ColumnModeMap =
+      std::unordered_map<std::string, MetricsMode, StringHash, StringEqual>;
+
+  MetricsConfig(ColumnModeMap column_modes, MetricsMode default_mode);
+
+  /// \brief Generate a MetricsConfig for all columns based on overrides, 
schema, and sort
+  /// order.
+  ///
+  /// \param props will be read for metrics overrides 
(write.metadata.metrics.column.*)
+  /// and default(write.metadata.metrics.default)
+  /// \param schema table schema
+  /// \param order sort order columns, will be promoted to truncate(16)
+  /// \return metrics configuration
+  static Result<std::shared_ptr<MetricsConfig>> MakeInternal(const 
TableProperties& props,
+                                                             const Schema& 
schema,
+                                                             const SortOrder& 
order);
+
+  ColumnModeMap column_modes_;
+  MetricsMode default_mode_;
 };
 
 }  // namespace iceberg
diff --git a/src/iceberg/sort_order.cc b/src/iceberg/sort_order.cc
index fca138a6..b317efb9 100644
--- a/src/iceberg/sort_order.cc
+++ b/src/iceberg/sort_order.cc
@@ -132,4 +132,18 @@ Result<std::unique_ptr<SortOrder>> SortOrder::Make(int32_t 
sort_id,
   return std::unique_ptr<SortOrder>(new SortOrder(sort_id, std::move(fields)));
 }
 
+std::unordered_set<std::string_view> SortOrder::OrderPreservingSortedColumns(
+    const Schema& schema, const SortOrder& order) {
+  return order.fields() | std::views::filter([&schema](const SortField& field) 
{
+           return field.transform()->PreservesOrder();
+         }) |
+         std::views::transform([&schema](const SortField& field) {
+           return schema.FindColumnNameById(field.source_id())
+               .value_or(std::nullopt)
+               .value_or("");
+         }) |
+         std::views::filter([](std::string_view name) { return !name.empty(); 
}) |
+         std::ranges::to<std::unordered_set<std::string_view>>();
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/sort_order.h b/src/iceberg/sort_order.h
index 1e7285d3..7c9b799f 100644
--- a/src/iceberg/sort_order.h
+++ b/src/iceberg/sort_order.h
@@ -22,6 +22,7 @@
 #include <cstdint>
 #include <memory>
 #include <span>
+#include <unordered_set>
 #include <vector>
 
 #include "iceberg/iceberg_export.h"
@@ -91,6 +92,9 @@ class ICEBERG_EXPORT SortOrder : public util::Formattable {
   static Result<std::unique_ptr<SortOrder>> Make(int32_t sort_id,
                                                  std::vector<SortField> 
fields);
 
+  static std::unordered_set<std::string_view> OrderPreservingSortedColumns(
+      const Schema& schema, const SortOrder& order);
+
  private:
   /// \brief Constructs a SortOrder instance.
   /// \param order_id The sort order id.
diff --git a/src/iceberg/test/metrics_config_test.cc 
b/src/iceberg/test/metrics_config_test.cc
index e6a8e0f5..b162e661 100644
--- a/src/iceberg/test/metrics_config_test.cc
+++ b/src/iceberg/test/metrics_config_test.cc
@@ -19,39 +19,252 @@
 
 #include "iceberg/metrics_config.h"
 
-#include <memory>
-#include <unordered_map>
-
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
 #include "iceberg/result.h"
 #include "iceberg/schema.h"
 #include "iceberg/schema_field.h"
+#include "iceberg/sort_order.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
 #include "iceberg/table_properties.h"
 #include "iceberg/test/matchers.h"
+#include "iceberg/test/mock_catalog.h"
+#include "iceberg/test/mock_io.h"
+#include "iceberg/transform.h"
 
 namespace iceberg {
 
-class MetricsConfigTest : public ::testing::Test {
- protected:
-  void SetUp() override {
-    SchemaField field1(1, "col1", std::make_shared<LongType>(), false);
-    SchemaField field2(2, "col2", std::make_shared<StringType>(), true);
-    SchemaField field3(3, "col3", std::make_shared<DoubleType>(), false);
-    schema_ =
-        std::make_unique<Schema>(std::vector<SchemaField>{field1, field2, 
field3}, 100);
+TEST(MetricsConfigTest, MetricsMode) {
+  EXPECT_EQ(MetricsMode::Kind::kNone, MetricsMode::None().kind);
+  EXPECT_EQ(MetricsMode::Kind::kCounts, MetricsMode::Counts().kind);
+  EXPECT_EQ(MetricsMode::Kind::kFull, MetricsMode::Full().kind);
+
+  EXPECT_EQ(MetricsMode::Kind::kNone, 
MetricsMode::FromString("none").value().kind);
+  EXPECT_EQ(MetricsMode::Kind::kCounts, 
MetricsMode::FromString("counts").value().kind);
+  EXPECT_EQ(MetricsMode::Kind::kFull, 
MetricsMode::FromString("full").value().kind);
+  EXPECT_EQ(MetricsMode::Kind::kTruncate,
+            MetricsMode::FromString("truncate(32)").value().kind);
+
+  auto result = MetricsMode::FromString("truncate(abc)");
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(result, HasErrorMessage("Invalid truncate mode"));
+
+  result = MetricsMode::FromString("truncate(-1)");
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(result, HasErrorMessage("Truncate length should be positive"));
+
+  result = MetricsMode::FromString("invalid");
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(result, HasErrorMessage("Invalid metrics mode"));
+}
+
+TEST(MetricsConfigTest, ForTable) {
+  auto io = std::make_shared<MockFileIO>();
+  auto catalog = std::make_shared<MockCatalog>();
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int64()),
+                               SchemaField::MakeOptional(2, "name", string()),
+                               SchemaField::MakeOptional(3, "addr", string())},
+      1);
+  TableIdentifier ident{.ns = Namespace{.levels = {"db"}}, .name = "t"};
+
+  {
+    // Default
+    auto metadata = std::make_shared<TableMetadata>(
+        TableMetadata{.format_version = 2, .schemas = {schema}, 
.current_schema_id = 1});
+    ICEBERG_UNWRAP_OR_FAIL(
+        auto table, Table::Make(ident, metadata, "s3://bucket/meta.json", io, 
catalog));
+
+    ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(*table));
+    auto mode = config->ColumnMode("id");
+    EXPECT_EQ(MetricsMode::Kind::kTruncate, mode.kind);
+    EXPECT_EQ(16, std::get<int32_t>(mode.length));
+
+    mode = config->ColumnMode("name");
+    EXPECT_EQ(MetricsMode::Kind::kTruncate, mode.kind);
+    EXPECT_EQ(16, std::get<int32_t>(mode.length));
+    mode = config->ColumnMode("addr");
+    EXPECT_EQ(MetricsMode::Kind::kTruncate, mode.kind);
+    EXPECT_EQ(16, std::get<int32_t>(mode.length));
+  }
+
+  {
+    // Custom metrics mode by set default metrics mode properties
+    auto metadata = std::make_shared<TableMetadata>(
+        TableMetadata{.format_version = 2,
+                      .schemas = {schema},
+                      .current_schema_id = 1,
+                      .properties = TableProperties::FromMap(
+                          {{TableProperties::kDefaultWriteMetricsMode.key(), 
"full"}})});
+    ICEBERG_UNWRAP_OR_FAIL(
+        auto table, Table::Make(ident, metadata, "s3://bucket/meta.json", io, 
catalog));
+
+    ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(*table));
+    auto mode = config->ColumnMode("id");
+    EXPECT_EQ(MetricsMode::Kind::kFull, mode.kind);
+
+    mode = config->ColumnMode("name");
+    EXPECT_EQ(MetricsMode::Kind::kFull, mode.kind);
+
+    mode = config->ColumnMode("addr");
+    EXPECT_EQ(MetricsMode::Kind::kFull, mode.kind);
+  }
+
+  {
+    // Custom metrics mode by set column's metrics mode
+    ICEBERG_UNWRAP_OR_FAIL(
+        std::shared_ptr<SortOrder> sort_order,
+        SortOrder::Make(*schema, /*sort_id=*/1,
+                        std::vector<SortField>(
+                            {SortField(/*source_id=*/1, Transform::Identity(),
+                                       SortDirection::kAscending, 
NullOrder::kLast)})));
+
+    auto metadata = std::make_shared<TableMetadata>(TableMetadata{
+        .format_version = 2,
+        .schemas = {schema},
+        .current_schema_id = 1,
+        .properties = TableProperties::FromMap(
+            {{TableProperties::kDefaultWriteMetricsMode.key(), "none"},
+             {TableProperties::kMetricsMaxInferredColumnDefaults.key(), "2"},
+             {std::string(TableProperties::kMetricModeColumnConfPrefix) + 
"name",
+              "full"}}),
+        .sort_orders = {sort_order},
+        .default_sort_order_id = 1,
+    });
+
+    ICEBERG_UNWRAP_OR_FAIL(
+        auto table, Table::Make(ident, metadata, "s3://bucket/meta.json", io, 
catalog));
+
+    ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(*table));
+    auto mode = config->ColumnMode("id");
+    EXPECT_EQ(MetricsMode::Kind::kTruncate, mode.kind);
+    EXPECT_EQ(16, std::get<int32_t>(mode.length));
+
+    mode = config->ColumnMode("name");
+    EXPECT_EQ(MetricsMode::Kind::kFull, mode.kind);
+
+    mode = config->ColumnMode("addr");
+    EXPECT_EQ(MetricsMode::Kind::kNone, mode.kind);
+  }
+}
+
+TEST(MetricsConfigTest, LimitFieldIds) {
+  {
+    // Nested struct type
+    // Create nested struct type for level1_struct_a
+    auto level2_struct_a_type = struct_(std::vector<SchemaField>{
+        SchemaField::MakeRequired(31, "level3_primitive_s", string())});
+
+    auto level1_struct_a_type = struct_(std::vector<SchemaField>{
+        SchemaField::MakeOptional(21, "level2_primitive_i", int32()),
+        SchemaField::MakeOptional(22, "level2_struct_a", level2_struct_a_type),
+        SchemaField::MakeRequired(23, "level2_primitive_b", boolean())});
+
+    // Create nested struct type for level1_struct_b
+    auto level2_struct_b_type = struct_(std::vector<SchemaField>{
+        SchemaField::MakeRequired(32, "level3_primitive_s", string())});
+
+    auto level1_struct_b_type = struct_(std::vector<SchemaField>{
+        SchemaField::MakeRequired(24, "level2_primitive_i", int32()),
+        SchemaField::MakeRequired(25, "level2_struct_b", 
level2_struct_b_type)});
+    // Create the main schema
+    Schema schema(
+        std::vector<SchemaField>{
+            SchemaField::MakeRequired(11, "level1_struct_a", 
level1_struct_a_type),
+            SchemaField::MakeRequired(12, "level1_struct_b", 
level1_struct_b_type),
+            SchemaField::MakeRequired(13, "level1_primitive_i", int32())},
+        100);
+
+    auto result1 = MetricsConfig::LimitFieldIds(schema, 1);
+    EXPECT_EQ(result1, (std::unordered_set<int32_t>{13}))
+        << "Should only include top level primitive field";
+
+    auto result2 = MetricsConfig::LimitFieldIds(schema, 2);
+    EXPECT_EQ(result2, (std::unordered_set<int32_t>{13, 21}))
+        << "Should include level 2 primitive field before nested struct";
+
+    auto result3 = MetricsConfig::LimitFieldIds(schema, 3);
+    EXPECT_EQ(result3, (std::unordered_set<int32_t>{13, 21, 23}))
+        << "Should include all of level 2 primitive fields of struct a before 
nested "
+           "struct";
+
+    auto result4 = MetricsConfig::LimitFieldIds(schema, 4);
+    EXPECT_EQ(result4, (std::unordered_set<int32_t>{13, 21, 23, 31}))
+        << "Should include all eligible fields in struct a";
+
+    auto result5 = MetricsConfig::LimitFieldIds(schema, 5);
+    EXPECT_EQ(result5, (std::unordered_set<int32_t>{13, 21, 23, 31, 24}))
+        << "Should include first primitive field in struct b";
+
+    auto result6 = MetricsConfig::LimitFieldIds(schema, 6);
+    EXPECT_EQ(result6, (std::unordered_set<int32_t>{13, 21, 23, 31, 24, 32}))
+        << "Should include all primitive fields";
+
+    auto result7 = MetricsConfig::LimitFieldIds(schema, 7);
+    EXPECT_EQ(result7, (std::unordered_set<int32_t>{13, 21, 23, 31, 24, 32}))
+        << "Should return all primitive fields when limit is higher";
+  }
+
+  {
+    // Nested map
+    auto map_type = map(SchemaField::MakeRequired(2, "key", int32()),
+                        SchemaField::MakeRequired(3, "value", int32()));
+
+    Schema schema(std::vector<SchemaField>{SchemaField::MakeRequired(1, "map", 
map_type),
+                                           SchemaField::MakeRequired(4, "top", 
int32())},
+                  100);
+
+    auto result1 = MetricsConfig::LimitFieldIds(schema, 1);
+    EXPECT_EQ(result1, (std::unordered_set<int32_t>{4}));
+
+    auto result2 = MetricsConfig::LimitFieldIds(schema, 2);
+    EXPECT_EQ(result2, (std::unordered_set<int32_t>{4, 2}));
+
+    auto result3 = MetricsConfig::LimitFieldIds(schema, 3);
+    EXPECT_EQ(result3, (std::unordered_set<int32_t>{4, 2, 3}));
+
+    auto result4 = MetricsConfig::LimitFieldIds(schema, 4);
+    EXPECT_EQ(result4, (std::unordered_set<int32_t>{4, 2, 3}));
+  }
+
+  {
+    // Nested list of maps
+    auto map_type = map(SchemaField::MakeRequired(3, "key", int32()),
+                        SchemaField::MakeRequired(4, "value", int32()));
+    auto list_type = list(SchemaField::MakeRequired(2, "element", map_type));
+
+    Schema schema(
+        std::vector<SchemaField>{SchemaField::MakeRequired(1, "array_of_maps", 
list_type),
+                                 SchemaField::MakeRequired(5, "top", int32())},
+        100);
+
+    auto result1 = MetricsConfig::LimitFieldIds(schema, 1);
+    EXPECT_EQ(result1, (std::unordered_set<int32_t>{5}));
+
+    auto result2 = MetricsConfig::LimitFieldIds(schema, 2);
+    EXPECT_EQ(result2, (std::unordered_set<int32_t>{5, 3}));
+
+    auto result3 = MetricsConfig::LimitFieldIds(schema, 3);
+    EXPECT_EQ(result3, (std::unordered_set<int32_t>{5, 3, 4}));
+
+    auto result4 = MetricsConfig::LimitFieldIds(schema, 4);
+    EXPECT_EQ(result4, (std::unordered_set<int32_t>{5, 3, 4}));
   }
+}
 
-  std::unique_ptr<Schema> schema_;
-};
+TEST(MetricsConfigTest, ValidateColumnReferences) {
+  SchemaField field1 = SchemaField::MakeRequired(1, "col1", int64());
+  SchemaField field2 = SchemaField::MakeOptional(2, "col2", string());
+  SchemaField field3 = SchemaField::MakeRequired(3, "col3", float64());
+  Schema schema(std::vector<SchemaField>{field1, field2, field3}, 100);
 
-TEST_F(MetricsConfigTest, ValidateColumnReferences) {
   {
     // Empty updates should be valid
     std::unordered_map<std::string, std::string> updates;
 
-    auto result = MetricsConfig::VerifyReferencedColumns(updates, *schema_);
+    auto result = MetricsConfig::VerifyReferencedColumns(updates, schema);
     EXPECT_THAT(result, IsOk()) << "Validation should pass for empty updates";
   }
 
@@ -61,7 +274,7 @@ TEST_F(MetricsConfigTest, ValidateColumnReferences) {
     updates["write.format.default"] = "parquet";
     updates["write.target-file-size-bytes"] = "524288000";
 
-    auto result = MetricsConfig::VerifyReferencedColumns(updates, *schema_);
+    auto result = MetricsConfig::VerifyReferencedColumns(updates, schema);
     EXPECT_THAT(result, IsOk())
         << "Validation should pass when no column references exist";
   }
@@ -74,7 +287,7 @@ TEST_F(MetricsConfigTest, ValidateColumnReferences) {
     updates[std::string(TableProperties::kMetricModeColumnConfPrefix) + 
"col2"] = "full";
     updates["some.other.property"] = "value";
 
-    auto result = MetricsConfig::VerifyReferencedColumns(updates, *schema_);
+    auto result = MetricsConfig::VerifyReferencedColumns(updates, schema);
     EXPECT_THAT(result, IsOk()) << "Validation should pass for valid column 
references";
   }
 
@@ -84,7 +297,7 @@ TEST_F(MetricsConfigTest, ValidateColumnReferences) {
     updates[std::string(TableProperties::kMetricModeColumnConfPrefix) + 
"nonexistent"] =
         "counts";
 
-    auto result = MetricsConfig::VerifyReferencedColumns(updates, *schema_);
+    auto result = MetricsConfig::VerifyReferencedColumns(updates, schema);
     EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed))
         << "Validation should fail for invalid column references";
   }
@@ -97,7 +310,7 @@ TEST_F(MetricsConfigTest, ValidateColumnReferences) {
     updates[std::string(TableProperties::kMetricModeColumnConfPrefix) + 
"nonexistent"] =
         "full";
 
-    auto result = MetricsConfig::VerifyReferencedColumns(updates, *schema_);
+    auto result = MetricsConfig::VerifyReferencedColumns(updates, schema);
     EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed))
         << "Validation should fail when any column reference is invalid";
   }
diff --git a/src/iceberg/util/string_util.h b/src/iceberg/util/string_util.h
index 8aa209c9..0c9e89bc 100644
--- a/src/iceberg/util/string_util.h
+++ b/src/iceberg/util/string_util.h
@@ -44,6 +44,13 @@ class ICEBERG_EXPORT StringUtils {
         lhs, rhs, [](char lc, char rc) { return std::tolower(lc) == 
std::tolower(rc); });
   }
 
+  static bool StartsWithIgnoreCase(std::string_view str, std::string_view 
prefix) {
+    if (str.size() < prefix.size()) {
+      return false;
+    }
+    return EqualsIgnoreCase(str.substr(0, prefix.size()), prefix);
+  }
+
   /// \brief Count the number of code points in a UTF-8 string.
   static size_t CodePointCount(std::string_view str) {
     size_t count = 0;
diff --git a/src/iceberg/util/type_util.cc b/src/iceberg/util/type_util.cc
index a6cfd645..8f6d74b3 100644
--- a/src/iceberg/util/type_util.cc
+++ b/src/iceberg/util/type_util.cc
@@ -271,6 +271,35 @@ Result<std::shared_ptr<Type>> PruneColumnVisitor::Visit(
       MakeField(value_field, std::move(value_type)));
 }
 
+GetProjectedIdsVisitor::GetProjectedIdsVisitor(bool include_struct_ids)
+    : include_struct_ids_(include_struct_ids) {}
+
+Status GetProjectedIdsVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    return VisitNested(internal::checked_cast<const NestedType&>(type));
+  } else {
+    return VisitPrimitive(internal::checked_cast<const PrimitiveType&>(type));
+  }
+}
+
+Status GetProjectedIdsVisitor::VisitNested(const NestedType& type) {
+  for (auto& field : type.fields()) {
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  for (auto& field : type.fields()) {
+    // TODO(zhuo.wang) or is_variant
+    if ((include_struct_ids_ && field.type()->type_id() == TypeId::kStruct) ||
+        field.type()->is_primitive()) {
+      ids_.insert(field.field_id());
+    }
+  }
+  return {};
+}
+
+Status GetProjectedIdsVisitor::VisitPrimitive(const PrimitiveType& type) { 
return {}; }
+
+std::unordered_set<int32_t> GetProjectedIdsVisitor::Finish() const { return 
ids_; }
+
 std::unordered_map<int32_t, int32_t> IndexParents(const StructType& 
root_struct) {
   std::unordered_map<int32_t, int32_t> id_to_parent;
   std::stack<int32_t> parent_id_stack;
diff --git a/src/iceberg/util/type_util.h b/src/iceberg/util/type_util.h
index 959bdb9f..c2a219b1 100644
--- a/src/iceberg/util/type_util.h
+++ b/src/iceberg/util/type_util.h
@@ -122,6 +122,21 @@ class PruneColumnVisitor {
   const bool select_full_types_;
 };
 
+/// \brief Visitor for getting projected field IDs.
+class GetProjectedIdsVisitor {
+ public:
+  explicit GetProjectedIdsVisitor(bool include_struct_ids = false);
+
+  Status Visit(const Type& type);
+  Status VisitNested(const NestedType& type);
+  Status VisitPrimitive(const PrimitiveType& type);
+  std::unordered_set<int32_t> Finish() const;
+
+ private:
+  const bool include_struct_ids_;
+  std::unordered_set<int32_t> ids_;
+};
+
 /// \brief Index parent field IDs for all fields in a struct hierarchy.
 /// \param root_struct The root struct type to analyze
 /// \return A map from field ID to its parent struct field ID

Reply via email to