wgtmac commented on code in PR #180:
URL: https://github.com/apache/iceberg-cpp/pull/180#discussion_r2297492923


##########
test/schema_test.cc:
##########
@@ -24,6 +24,7 @@
 
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
+#include <iceberg/result.h>

Review Comment:
   ```suggestion
   ```



##########
src/iceberg/schema.h:
##########
@@ -54,13 +55,43 @@ class ICEBERG_EXPORT Schema : public StructType {
 
   [[nodiscard]] std::string ToString() const override;
 
+  ///\brief Find thd SchemaField By field name

Review Comment:
   ```suggestion
     /// \brief Find thd SchemaField By field name
   ```



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);

Review Comment:
   ```suggestion
     auto it = lowercase_name_to_id_.find(StringUtils::ToLower(name));
   ```
   
   nit: you can include `iceberg/util/string_utils.h` to make it one-liner.



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    id_to_field_.emplace(field.field_id(), std::cref(field));
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  return {};
+}
+
+NameToIdVisitor::NameToIdVisitor(
+    std::unordered_map<std::string, int32_t>& name_to_id, bool case_sensitive,
+    std::function<std::string(std::string_view)> quoting_func)
+    : name_to_id_(name_to_id),
+      case_sensitive_(case_sensitive),
+      quoting_func_(std::move(quoting_func)) {}
+
+NameToIdVisitor::~NameToIdVisitor() { Merge(); }

Review Comment:
   This is too hacky. Let's explicitly call the proposed `Finish` function.



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    id_to_field_.emplace(field.field_id(), std::cref(field));

Review Comment:
   nit: check return value just in case of a duplicate field_id



##########
src/iceberg/schema.cc:
##########
@@ -19,13 +19,58 @@
 
 #include "iceberg/schema.h"
 
+#include <algorithm>
 #include <format>
+#include <functional>
 
 #include "iceberg/type.h"
 #include "iceberg/util/formatter.h"  // IWYU pragma: keep
+#include "iceberg/util/macros.h"
+#include "iceberg/util/visit_type.h"
 
 namespace iceberg {
 
+class IdToFieldVisitor {
+ public:
+  explicit IdToFieldVisitor(
+      std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>&
+          id_to_field);
+  Status Visit(const Type& type);
+  Status VisitNestedType(const Type& type);
+
+ private:
+  std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field_;
+};
+
+class NameToIdVisitor {
+ public:
+  explicit NameToIdVisitor(
+      std::unordered_map<std::string, int32_t>& name_to_id, bool 
case_sensitive_ = true,
+      std::function<std::string(std::string_view)> quoting_func_ =
+          [](std::string_view s) { return std::string(s); });

Review Comment:
   ```suggestion
         std::unordered_map<std::string, int32_t>& name_to_id, bool 
case_sensitive = true,
         std::function<std::string(std::string_view)> quoting_func = {});
   ```
   
   If you provide a default implementation, then if (!quoting_func_) will never 
be hit.
   
   Please also pay attention to the naming style.



##########
src/iceberg/schema.cc:
##########
@@ -19,13 +19,58 @@
 
 #include "iceberg/schema.h"
 
+#include <algorithm>
 #include <format>
+#include <functional>
 
 #include "iceberg/type.h"
 #include "iceberg/util/formatter.h"  // IWYU pragma: keep
+#include "iceberg/util/macros.h"
+#include "iceberg/util/visit_type.h"
 
 namespace iceberg {
 
+class IdToFieldVisitor {
+ public:
+  explicit IdToFieldVisitor(
+      std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>&
+          id_to_field);
+  Status Visit(const Type& type);
+  Status VisitNestedType(const Type& type);
+
+ private:
+  std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field_;
+};
+
+class NameToIdVisitor {
+ public:
+  explicit NameToIdVisitor(
+      std::unordered_map<std::string, int32_t>& name_to_id, bool 
case_sensitive_ = true,
+      std::function<std::string(std::string_view)> quoting_func_ =
+          [](std::string_view s) { return std::string(s); });
+  ~NameToIdVisitor();
+
+  Status Visit(const ListType& type, const std::string& path,
+               const std::string& short_path);
+  Status Visit(const MapType& type, const std::string& path,
+               const std::string& short_path);
+  Status Visit(const StructType& type, const std::string& path,
+               const std::string& short_path);
+  Status Visit(const PrimitiveType& type, const std::string& path,
+               const std::string& short_path);
+
+ private:
+  std::string BuildPath(std::string_view prefix, std::string_view field_name,
+                        bool case_sensitive);
+  void Merge();

Review Comment:
   ```suggestion
   
     void Finish();
   
    private:
     std::string BuildPath(std::string_view prefix, std::string_view field_name,
                           bool case_sensitive);
   ```



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    id_to_field_.emplace(field.field_id(), std::cref(field));
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  return {};
+}
+
+NameToIdVisitor::NameToIdVisitor(
+    std::unordered_map<std::string, int32_t>& name_to_id, bool case_sensitive,
+    std::function<std::string(std::string_view)> quoting_func)
+    : name_to_id_(name_to_id),
+      case_sensitive_(case_sensitive),
+      quoting_func_(std::move(quoting_func)) {}
+
+NameToIdVisitor::~NameToIdVisitor() { Merge(); }
+
+Status NameToIdVisitor::Visit(const ListType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& field = type.fields()[0];
+  std::string new_path = BuildPath(path, field.name(), case_sensitive_);
+  std::string new_short_path;
+  if (field.type()->type_id() == TypeId::kStruct) {
+    new_short_path = short_path;
+  } else {
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+  }
+  auto it = name_to_id_.emplace(new_path, field.field_id());
+  if (!it.second) {
+    return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+  }
+  shortname_to_id_.emplace(new_short_path, field.field_id());
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const MapType& type, const std::string& path,
+                              const std::string& short_path) {
+  std::string new_path, new_short_path;
+  const auto& fields = type.fields();
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    if (field.name() == MapType::kValueName &&
+        field.type()->type_id() == TypeId::kStruct) {
+      new_short_path = short_path;
+    } else {
+      new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    }
+    auto it = name_to_id_.emplace(new_path, field.field_id());
+    if (!it.second) {
+      return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+    }
+    shortname_to_id_.emplace(new_short_path, field.field_id());
+    ICEBERG_RETURN_UNEXPECTED(
+        VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  }
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const StructType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& fields = type.fields();
+  std::string new_path, new_short_path;
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    auto it = name_to_id_.emplace(new_path, field.field_id());
+    if (!it.second) {
+      return NotSupported("Duplicate path in name_to_id_: {}", new_path);

Review Comment:
   ditto



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    id_to_field_.emplace(field.field_id(), std::cref(field));
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  return {};
+}
+
+NameToIdVisitor::NameToIdVisitor(
+    std::unordered_map<std::string, int32_t>& name_to_id, bool case_sensitive,
+    std::function<std::string(std::string_view)> quoting_func)
+    : name_to_id_(name_to_id),
+      case_sensitive_(case_sensitive),
+      quoting_func_(std::move(quoting_func)) {}
+
+NameToIdVisitor::~NameToIdVisitor() { Merge(); }
+
+Status NameToIdVisitor::Visit(const ListType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& field = type.fields()[0];
+  std::string new_path = BuildPath(path, field.name(), case_sensitive_);
+  std::string new_short_path;
+  if (field.type()->type_id() == TypeId::kStruct) {
+    new_short_path = short_path;
+  } else {
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+  }
+  auto it = name_to_id_.emplace(new_path, field.field_id());
+  if (!it.second) {
+    return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+  }
+  shortname_to_id_.emplace(new_short_path, field.field_id());
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const MapType& type, const std::string& path,
+                              const std::string& short_path) {
+  std::string new_path, new_short_path;
+  const auto& fields = type.fields();
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    if (field.name() == MapType::kValueName &&
+        field.type()->type_id() == TypeId::kStruct) {
+      new_short_path = short_path;
+    } else {
+      new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    }
+    auto it = name_to_id_.emplace(new_path, field.field_id());
+    if (!it.second) {
+      return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+    }
+    shortname_to_id_.emplace(new_short_path, field.field_id());
+    ICEBERG_RETURN_UNEXPECTED(
+        VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  }
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const StructType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& fields = type.fields();
+  std::string new_path, new_short_path;
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    auto it = name_to_id_.emplace(new_path, field.field_id());
+    if (!it.second) {
+      return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+    }
+    shortname_to_id_.emplace(new_short_path, field.field_id());
+    ICEBERG_RETURN_UNEXPECTED(
+        VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  }
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const PrimitiveType& type, const std::string& 
path,
+                              const std::string& short_path) {
+  return {};
+}
+
+std::string NameToIdVisitor::BuildPath(std::string_view prefix,
+                                       std::string_view field_name, bool 
case_sensitive) {
+  std::string quoted_name;
+  if (!quoting_func_) {
+    quoted_name = std::string(field_name);
+  } else {
+    quoted_name = quoting_func_(field_name);
+  }
+  if (case_sensitive) {
+    return prefix.empty() ? quoted_name : std::string(prefix) + "." + 
quoted_name;
+  }
+  std::string lower_name = quoted_name;
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  return prefix.empty() ? lower_name : std::string(prefix) + "." + lower_name;
+}
+
+void NameToIdVisitor::Merge() {
+  for (const auto& it : shortname_to_id_) {
+    if (name_to_id_.find(it.first) == name_to_id_.end()) {
+      name_to_id_[it.first] = it.second;
+    }
+  }

Review Comment:
   ```suggestion
     for (auto&& [path, id] : short_name_to_id_) {
       name_to_id_.try_emplace(path, id);
     }
   ```



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    id_to_field_.emplace(field.field_id(), std::cref(field));
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  return {};
+}
+
+NameToIdVisitor::NameToIdVisitor(
+    std::unordered_map<std::string, int32_t>& name_to_id, bool case_sensitive,
+    std::function<std::string(std::string_view)> quoting_func)
+    : name_to_id_(name_to_id),
+      case_sensitive_(case_sensitive),
+      quoting_func_(std::move(quoting_func)) {}
+
+NameToIdVisitor::~NameToIdVisitor() { Merge(); }
+
+Status NameToIdVisitor::Visit(const ListType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& field = type.fields()[0];
+  std::string new_path = BuildPath(path, field.name(), case_sensitive_);
+  std::string new_short_path;
+  if (field.type()->type_id() == TypeId::kStruct) {
+    new_short_path = short_path;
+  } else {
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+  }
+  auto it = name_to_id_.emplace(new_path, field.field_id());
+  if (!it.second) {
+    return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+  }
+  shortname_to_id_.emplace(new_short_path, field.field_id());
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const MapType& type, const std::string& path,
+                              const std::string& short_path) {
+  std::string new_path, new_short_path;
+  const auto& fields = type.fields();
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    if (field.name() == MapType::kValueName &&
+        field.type()->type_id() == TypeId::kStruct) {
+      new_short_path = short_path;
+    } else {
+      new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    }
+    auto it = name_to_id_.emplace(new_path, field.field_id());
+    if (!it.second) {
+      return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+    }
+    shortname_to_id_.emplace(new_short_path, field.field_id());
+    ICEBERG_RETURN_UNEXPECTED(
+        VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  }
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const StructType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& fields = type.fields();
+  std::string new_path, new_short_path;
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    auto it = name_to_id_.emplace(new_path, field.field_id());
+    if (!it.second) {
+      return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+    }
+    shortname_to_id_.emplace(new_short_path, field.field_id());
+    ICEBERG_RETURN_UNEXPECTED(
+        VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  }
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const PrimitiveType& type, const std::string& 
path,
+                              const std::string& short_path) {
+  return {};
+}
+
+std::string NameToIdVisitor::BuildPath(std::string_view prefix,
+                                       std::string_view field_name, bool 
case_sensitive) {
+  std::string quoted_name;
+  if (!quoting_func_) {
+    quoted_name = std::string(field_name);
+  } else {
+    quoted_name = quoting_func_(field_name);
+  }
+  if (case_sensitive) {
+    return prefix.empty() ? quoted_name : std::string(prefix) + "." + 
quoted_name;
+  }
+  std::string lower_name = quoted_name;
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  return prefix.empty() ? lower_name : std::string(prefix) + "." + lower_name;
+}
+
+void NameToIdVisitor::Merge() {

Review Comment:
   ```suggestion
   void NameToIdVisitor::Finish() {
   ```



##########
test/schema_test.cc:
##########
@@ -81,3 +82,420 @@ TEST(SchemaTest, Equality) {
   ASSERT_EQ(schema1, schema5);
   ASSERT_EQ(schema5, schema1);
 }
+
+class NestedTypeTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    field1_ = std::make_unique<iceberg::SchemaField>(1, "Foo", 
iceberg::int32(), true);
+    field2_ = std::make_unique<iceberg::SchemaField>(2, "Bar", 
iceberg::string(), true);
+    field3_ = std::make_unique<iceberg::SchemaField>(3, "Foobar", 
iceberg::int32(), true);
+
+    iceberg::StructType structtype = iceberg::StructType(
+        std::vector<iceberg::SchemaField>{*field1_, *field2_, *field3_});
+
+    auto listype = iceberg::ListType(iceberg::SchemaField::MakeRequired(
+        4, "element", std::make_shared<iceberg::StructType>(structtype)));
+
+    auto maptype =
+        iceberg::MapType(iceberg::SchemaField::MakeRequired(5, "key", 
iceberg::int32()),
+                         iceberg::SchemaField::MakeRequired(
+                             6, "value", 
std::make_shared<iceberg::ListType>(listype)));
+
+    field4_ = std::make_unique<iceberg::SchemaField>(
+        4, "element", std::make_shared<iceberg::StructType>(structtype), 
false);
+    field5_ = std::make_unique<iceberg::SchemaField>(5, "key", 
iceberg::int32(), false);
+    field6_ = std::make_unique<iceberg::SchemaField>(
+        6, "value", std::make_shared<iceberg::ListType>(listype), false);
+    field7_ = std::make_unique<iceberg::SchemaField>(
+        7, "Value", std::make_shared<iceberg::MapType>(maptype), false);
+
+    schema_ =
+        
std::make_shared<iceberg::Schema>(std::vector<iceberg::SchemaField>{*field7_}, 
1);
+  }
+
+  std::shared_ptr<iceberg::Schema> schema_;
+  std::unique_ptr<iceberg::SchemaField> field1_;
+  std::unique_ptr<iceberg::SchemaField> field2_;
+  std::unique_ptr<iceberg::SchemaField> field3_;
+  std::unique_ptr<iceberg::SchemaField> field4_;
+  std::unique_ptr<iceberg::SchemaField> field5_;
+  std::unique_ptr<iceberg::SchemaField> field6_;
+  std::unique_ptr<iceberg::SchemaField> field7_;
+};
+
+TEST_F(NestedTypeTest, TestFindById) {
+  ASSERT_THAT(schema_->FindFieldById(7), ::testing::Optional(*field7_));
+  ASSERT_THAT(schema_->FindFieldById(6), ::testing::Optional(*field6_));
+  ASSERT_THAT(schema_->FindFieldById(5), ::testing::Optional(*field5_));
+  ASSERT_THAT(schema_->FindFieldById(4), ::testing::Optional(*field4_));
+  ASSERT_THAT(schema_->FindFieldById(3), ::testing::Optional(*field3_));
+  ASSERT_THAT(schema_->FindFieldById(2), ::testing::Optional(*field2_));
+  ASSERT_THAT(schema_->FindFieldById(1), ::testing::Optional(*field1_));
+
+  ASSERT_THAT(schema_->FindFieldById(10), ::testing::Optional(std::nullopt));
+}
+
+TEST_F(NestedTypeTest, TestFindByName) {
+  ASSERT_THAT(schema_->FindFieldByName("Value"), 
::testing::Optional(*field7_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value"), 
::testing::Optional(*field6_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.key"), 
::testing::Optional(*field5_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.element"),
+              ::testing::Optional(*field4_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.element.Foobar"),
+              ::testing::Optional(*field3_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.element.Bar"),
+              ::testing::Optional(*field2_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.element.Foo"),
+              ::testing::Optional(*field1_));
+
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.element.FoO"),
+              ::testing::Optional(std::nullopt));
+}
+
+TEST_F(NestedTypeTest, TestFindByNameCaseInsensitive) {
+  ASSERT_THAT(schema_->FindFieldByName("vALue", false), 
::testing::Optional(*field7_));
+  ASSERT_THAT(schema_->FindFieldByName("vALue.VALUE", false),
+              ::testing::Optional(*field6_));
+  ASSERT_THAT(schema_->FindFieldByName("valUe.kEy", false),
+              ::testing::Optional(*field5_));
+  ASSERT_THAT(schema_->FindFieldByName("vaLue.vAlue.elEment", false),
+              ::testing::Optional(*field4_));
+  ASSERT_THAT(schema_->FindFieldByName("vaLue.vAlue.eLement.fOObar", false),
+              ::testing::Optional(*field3_));
+  ASSERT_THAT(schema_->FindFieldByName("valUe.vaLUe.elemEnt.Bar", false),
+              ::testing::Optional(*field2_));
+  ASSERT_THAT(schema_->FindFieldByName("valUe.valUe.ELEMENT.FOO", false),
+              ::testing::Optional(*field1_));
+  ASSERT_THAT(schema_->FindFieldByName("valUe.valUe.ELEMENT.FO", false),
+              ::testing::Optional(std::nullopt));
+}
+
+TEST_F(NestedTypeTest, TestFindByShortNameCaseInsensitive) {
+  ASSERT_THAT(schema_->FindFieldByName("vaLue.value.FOO", false),
+              ::testing::Optional(*field1_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.Bar", false),
+              ::testing::Optional(*field2_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.FooBAR", false),
+              ::testing::Optional(*field3_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.FooBAR.a", false),
+              ::testing::Optional(std::nullopt));
+}
+
+class NestType2Test : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    field1_ = std::make_unique<iceberg::SchemaField>(1, "Foo", 
iceberg::int32(), true);
+    field2_ = std::make_unique<iceberg::SchemaField>(2, "Bar", 
iceberg::string(), true);
+    field3_ = std::make_unique<iceberg::SchemaField>(3, "Foobar", 
iceberg::int32(), true);
+
+    iceberg::StructType structtype = iceberg::StructType({*field1_, *field2_, 
*field3_});
+
+    field4_ = std::make_unique<iceberg::SchemaField>(
+        4, "element", std::make_shared<iceberg::StructType>(structtype), 
false);
+
+    auto listype = iceberg::ListType(*field4_);
+
+    iceberg::StructType structtype2 = iceberg::StructType(
+        {iceberg::SchemaField::MakeRequired(5, "First_child", 
iceberg::int32()),
+         iceberg::SchemaField::MakeRequired(
+             6, "Second_child", 
std::make_shared<iceberg::ListType>(listype))});
+
+    auto maptype = iceberg::MapType(
+        iceberg::SchemaField::MakeRequired(7, "key", iceberg::int32()),
+        iceberg::SchemaField::MakeRequired(
+            8, "value", std::make_shared<iceberg::StructType>(structtype2)));
+
+    field5_ =
+        std::make_unique<iceberg::SchemaField>(5, "First_child", 
iceberg::int32(), false);
+    field6_ = std::make_unique<iceberg::SchemaField>(
+        6, "Second_child", std::make_shared<iceberg::ListType>(listype), 
false);
+    field7_ = std::make_unique<iceberg::SchemaField>(7, "key", 
iceberg::int32(), false);
+    field8_ = std::make_unique<iceberg::SchemaField>(
+        8, "value", std::make_shared<iceberg::StructType>(structtype2), false);
+    field9_ = std::make_unique<iceberg::SchemaField>(
+        9, "Map", std::make_shared<iceberg::MapType>(maptype), false);
+
+    schema_ =
+        
std::make_shared<iceberg::Schema>(std::vector<iceberg::SchemaField>{*field9_}, 
1);

Review Comment:
   ```suggestion
       field1_ = std::make_unique<iceberg::SchemaField>(1, "Foo", 
iceberg::int32(), true);
       field2_ = std::make_unique<iceberg::SchemaField>(2, "Bar", 
iceberg::string(), true);
       field3_ = std::make_unique<iceberg::SchemaField>(3, "Foobar", 
iceberg::int32(), true);
       auto struct_type = std::make_shared<iceberg::StructType>(
           std::vector<iceberg::SchemaField>{*field1_, *field2_, *field3_});
   
       field4_ = std::make_unique<iceberg::SchemaField>(4, "element", 
struct_type, false);
       auto list_type = std::make_shared<iceberg::ListType>(*field4_);
   
       field5_ =
           std::make_unique<iceberg::SchemaField>(5, "First_child", 
iceberg::int32(), false);
       field6_ = std::make_unique<iceberg::SchemaField>(6, "Second_child", 
list_type, false);
       auto struct_type2 = std::make_shared<iceberg::StructType>(
           std::vector<iceberg::SchemaField>{*field5_, *field6_});
   
       field7_ = std::make_unique<iceberg::SchemaField>(7, "key", 
iceberg::int32(), false);
       field8_ = std::make_unique<iceberg::SchemaField>(8, "value", 
struct_type2, false);
       auto map_type = std::make_shared<iceberg::MapType>(*field7_, *field8_);
   
       field9_ = std::make_unique<iceberg::SchemaField>(9, "Map", map_type, 
false);
       schema_ =
           
std::make_shared<iceberg::Schema>(std::vector<iceberg::SchemaField>{*field9_}, 
1);
   ```



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +88,182 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    auto [it, inserted] = id_to_field_.emplace(field.field_id(), 
std::cref(field));
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  return {};
+}
+
+NameToIdVisitor::NameToIdVisitor(
+    std::unordered_map<std::string, int32_t>& name_to_id, bool case_sensitive,
+    std::function<std::string(std::string_view)> quoting_func)
+    : name_to_id_(name_to_id),
+      case_sensitive_(case_sensitive),
+      quoting_func_(std::move(quoting_func)) {}
+
+NameToIdVisitor::~NameToIdVisitor() { Merge(); }
+
+Status NameToIdVisitor::Visit(const ListType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& field = type.fields()[0];
+  std::string new_path = BuildPath(path, field.name(), case_sensitive_);
+  std::string new_short_path;
+  if (field.type()->type_id() == TypeId::kStruct) {
+    new_short_path = short_path;
+  } else {
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+  }
+  auto [it, inserted] = name_to_id_.emplace(new_path, field.field_id());
+  if (!inserted) {
+    std::string msg = "Duplicate path in name_to_id_: " + new_path;
+    return NotSupported("{}", msg);
+  }
+  shortname_to_id_.emplace(new_short_path, field.field_id());
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const MapType& type, const std::string& path,
+                              const std::string& short_path) {
+  std::string new_path, new_short_path;
+  const auto& fields = type.fields();
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    if (field.name() == MapType::kValueName &&
+        field.type()->type_id() == TypeId::kStruct) {
+      new_short_path = short_path;
+    } else {
+      new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    }
+    auto [it, inserted] = name_to_id_.emplace(new_path, field.field_id());
+    if (!inserted) {
+      std::string msg = "Duplicate path in name_to_id_: " + new_path;
+      return NotSupported("{}", msg);
+    }
+    shortname_to_id_.emplace(new_short_path, field.field_id());
+    ICEBERG_RETURN_UNEXPECTED(
+        VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  }
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const StructType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& fields = type.fields();
+  std::string new_path, new_short_path;
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    auto [it, inserted] = name_to_id_.emplace(new_path, field.field_id());
+    if (!inserted) {
+      std::string msg = "Duplicate path in name_to_id_: " + it->first;
+      return NotSupported("{}", msg);
+    }
+    shortname_to_id_.emplace(new_short_path, field.field_id());
+    ICEBERG_RETURN_UNEXPECTED(
+        VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  }
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const PrimitiveType& type, const std::string& 
path,
+                              const std::string& short_path) {
+  return {};
+}
+
+std::string NameToIdVisitor::BuildPath(std::string_view prefix,
+                                       std::string_view field_name, bool 
case_sensitive) {
+  std::string quoted_name;
+  if (!quoting_func_) {
+    quoted_name = std::string(field_name);
+  } else {
+    quoted_name = quoting_func_(field_name);
+  }
+  if (case_sensitive) {
+    return prefix.empty() ? quoted_name : std::string(prefix) + "." + 
quoted_name;
+  }
+  std::string lower_name = quoted_name;
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);

Review Comment:
   nit: `iceberg/util/string_utils.h` has a `StringUtils::ToLower`



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    id_to_field_.emplace(field.field_id(), std::cref(field));
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  return {};
+}
+
+NameToIdVisitor::NameToIdVisitor(
+    std::unordered_map<std::string, int32_t>& name_to_id, bool case_sensitive,
+    std::function<std::string(std::string_view)> quoting_func)
+    : name_to_id_(name_to_id),
+      case_sensitive_(case_sensitive),
+      quoting_func_(std::move(quoting_func)) {}
+
+NameToIdVisitor::~NameToIdVisitor() { Merge(); }
+
+Status NameToIdVisitor::Visit(const ListType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& field = type.fields()[0];
+  std::string new_path = BuildPath(path, field.name(), case_sensitive_);
+  std::string new_short_path;
+  if (field.type()->type_id() == TypeId::kStruct) {
+    new_short_path = short_path;
+  } else {
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+  }
+  auto it = name_to_id_.emplace(new_path, field.field_id());
+  if (!it.second) {
+    return NotSupported("Duplicate path in name_to_id_: {}", new_path);

Review Comment:
   ```suggestion
       return InvalidSchema("Duplicate path found: {}, prev id: {}, curr id: 
{}", it->first, it->second, field.field_id());
   ```



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    id_to_field_.emplace(field.field_id(), std::cref(field));
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  return {};
+}
+
+NameToIdVisitor::NameToIdVisitor(
+    std::unordered_map<std::string, int32_t>& name_to_id, bool case_sensitive,
+    std::function<std::string(std::string_view)> quoting_func)
+    : name_to_id_(name_to_id),
+      case_sensitive_(case_sensitive),
+      quoting_func_(std::move(quoting_func)) {}
+
+NameToIdVisitor::~NameToIdVisitor() { Merge(); }
+
+Status NameToIdVisitor::Visit(const ListType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& field = type.fields()[0];
+  std::string new_path = BuildPath(path, field.name(), case_sensitive_);
+  std::string new_short_path;
+  if (field.type()->type_id() == TypeId::kStruct) {
+    new_short_path = short_path;
+  } else {
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+  }
+  auto it = name_to_id_.emplace(new_path, field.field_id());
+  if (!it.second) {
+    return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+  }
+  shortname_to_id_.emplace(new_short_path, field.field_id());

Review Comment:
   ```suggestion
     shortname_to_id_.try_emplace(new_short_path, field.field_id());
   ```



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    id_to_field_.emplace(field.field_id(), std::cref(field));
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  return {};
+}
+
+NameToIdVisitor::NameToIdVisitor(
+    std::unordered_map<std::string, int32_t>& name_to_id, bool case_sensitive,
+    std::function<std::string(std::string_view)> quoting_func)
+    : name_to_id_(name_to_id),
+      case_sensitive_(case_sensitive),
+      quoting_func_(std::move(quoting_func)) {}
+
+NameToIdVisitor::~NameToIdVisitor() { Merge(); }
+
+Status NameToIdVisitor::Visit(const ListType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& field = type.fields()[0];
+  std::string new_path = BuildPath(path, field.name(), case_sensitive_);
+  std::string new_short_path;
+  if (field.type()->type_id() == TypeId::kStruct) {
+    new_short_path = short_path;
+  } else {
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+  }
+  auto it = name_to_id_.emplace(new_path, field.field_id());
+  if (!it.second) {
+    return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+  }
+  shortname_to_id_.emplace(new_short_path, field.field_id());
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const MapType& type, const std::string& path,
+                              const std::string& short_path) {
+  std::string new_path, new_short_path;
+  const auto& fields = type.fields();
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    if (field.name() == MapType::kValueName &&
+        field.type()->type_id() == TypeId::kStruct) {
+      new_short_path = short_path;
+    } else {
+      new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    }
+    auto it = name_to_id_.emplace(new_path, field.field_id());
+    if (!it.second) {
+      return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+    }
+    shortname_to_id_.emplace(new_short_path, field.field_id());

Review Comment:
   ```suggestion
       shortname_to_id_.try_emplace(new_short_path, field.field_id());
   ```



##########
src/iceberg/schema.cc:
##########
@@ -19,13 +19,58 @@
 
 #include "iceberg/schema.h"
 
+#include <algorithm>
 #include <format>
+#include <functional>
 
 #include "iceberg/type.h"
 #include "iceberg/util/formatter.h"  // IWYU pragma: keep
+#include "iceberg/util/macros.h"
+#include "iceberg/util/visit_type.h"
 
 namespace iceberg {
 
+class IdToFieldVisitor {
+ public:
+  explicit IdToFieldVisitor(
+      std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>&
+          id_to_field);
+  Status Visit(const Type& type);
+  Status VisitNestedType(const Type& type);
+
+ private:
+  std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field_;
+};
+
+class NameToIdVisitor {
+ public:
+  explicit NameToIdVisitor(
+      std::unordered_map<std::string, int32_t>& name_to_id, bool 
case_sensitive_ = true,
+      std::function<std::string(std::string_view)> quoting_func_ =
+          [](std::string_view s) { return std::string(s); });
+  ~NameToIdVisitor();
+
+  Status Visit(const ListType& type, const std::string& path,
+               const std::string& short_path);
+  Status Visit(const MapType& type, const std::string& path,
+               const std::string& short_path);
+  Status Visit(const StructType& type, const std::string& path,
+               const std::string& short_path);
+  Status Visit(const PrimitiveType& type, const std::string& path,
+               const std::string& short_path);
+
+ private:
+  std::string BuildPath(std::string_view prefix, std::string_view field_name,
+                        bool case_sensitive);
+  void Merge();
+
+ private:
+  bool case_sensitive_;
+  std::unordered_map<std::string, int32_t>& name_to_id_;
+  std::unordered_map<std::string, int32_t> shortname_to_id_;

Review Comment:
   ```suggestion
     std::unordered_map<std::string, int32_t> short_name_to_id_;
   ```



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    id_to_field_.emplace(field.field_id(), std::cref(field));
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  return {};
+}
+
+NameToIdVisitor::NameToIdVisitor(
+    std::unordered_map<std::string, int32_t>& name_to_id, bool case_sensitive,
+    std::function<std::string(std::string_view)> quoting_func)
+    : name_to_id_(name_to_id),
+      case_sensitive_(case_sensitive),
+      quoting_func_(std::move(quoting_func)) {}
+
+NameToIdVisitor::~NameToIdVisitor() { Merge(); }
+
+Status NameToIdVisitor::Visit(const ListType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& field = type.fields()[0];
+  std::string new_path = BuildPath(path, field.name(), case_sensitive_);
+  std::string new_short_path;
+  if (field.type()->type_id() == TypeId::kStruct) {
+    new_short_path = short_path;
+  } else {
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+  }
+  auto it = name_to_id_.emplace(new_path, field.field_id());
+  if (!it.second) {
+    return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+  }
+  shortname_to_id_.emplace(new_short_path, field.field_id());
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const MapType& type, const std::string& path,
+                              const std::string& short_path) {
+  std::string new_path, new_short_path;
+  const auto& fields = type.fields();
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    if (field.name() == MapType::kValueName &&
+        field.type()->type_id() == TypeId::kStruct) {
+      new_short_path = short_path;
+    } else {
+      new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    }
+    auto it = name_to_id_.emplace(new_path, field.field_id());
+    if (!it.second) {
+      return NotSupported("Duplicate path in name_to_id_: {}", new_path);

Review Comment:
   Same as above comment



##########
src/iceberg/schema.cc:
##########
@@ -44,4 +89,179 @@ bool Schema::Equals(const Schema& other) const {
   return schema_id_ == other.schema_id_ && fields_ == other.fields_;
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldByName(
+    std::string_view name, bool case_sensitive) const {
+  if (case_sensitive) {
+    ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+    auto it = name_to_id_.find(std::string(name));
+    if (it == name_to_id_.end()) return std::nullopt;
+    return FindFieldById(it->second);
+  }
+  ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+  std::string lower_name(name);
+  std::ranges::transform(lower_name, lower_name.begin(), ::tolower);
+  auto it = lowercase_name_to_id_.find(lower_name);
+  if (it == lowercase_name_to_id_.end()) return std::nullopt;
+  return FindFieldById(it->second);
+}
+
+Status Schema::InitIdToFieldMap() const {
+  if (!id_to_field_.empty()) {
+    return {};
+  }
+  IdToFieldVisitor visitor(id_to_field_);
+  ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*this, &visitor));
+  return {};
+}
+
+Status Schema::InitNameToIdMap() const {
+  if (!name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(name_to_id_, /*case_sensitive=*/true);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Status Schema::InitLowerCaseNameToIdMap() const {
+  if (!lowercase_name_to_id_.empty()) {
+    return {};
+  }
+  NameToIdVisitor visitor(lowercase_name_to_id_, /*case_sensitive=*/false);
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*this, &visitor, /*path=*/"", /*short_path=*/""));
+  return {};
+}
+
+Result<std::optional<std::reference_wrapper<const SchemaField>>> 
Schema::FindFieldById(
+    int32_t field_id) const {
+  ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+  auto it = id_to_field_.find(field_id);
+  if (it == id_to_field_.end()) {
+    return std::nullopt;
+  }
+  return it->second;
+}
+
+IdToFieldVisitor::IdToFieldVisitor(
+    std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& 
id_to_field)
+    : id_to_field_(id_to_field) {}
+
+Status IdToFieldVisitor::Visit(const Type& type) {
+  if (type.is_nested()) {
+    ICEBERG_RETURN_UNEXPECTED(VisitNestedType(type));
+  }
+  return {};
+}
+
+Status IdToFieldVisitor::VisitNestedType(const Type& type) {
+  const auto& nested = iceberg::internal::checked_cast<const 
NestedType&>(type);
+  const auto& fields = nested.fields();
+  for (const auto& field : fields) {
+    id_to_field_.emplace(field.field_id(), std::cref(field));
+    ICEBERG_RETURN_UNEXPECTED(Visit(*field.type()));
+  }
+  return {};
+}
+
+NameToIdVisitor::NameToIdVisitor(
+    std::unordered_map<std::string, int32_t>& name_to_id, bool case_sensitive,
+    std::function<std::string(std::string_view)> quoting_func)
+    : name_to_id_(name_to_id),
+      case_sensitive_(case_sensitive),
+      quoting_func_(std::move(quoting_func)) {}
+
+NameToIdVisitor::~NameToIdVisitor() { Merge(); }
+
+Status NameToIdVisitor::Visit(const ListType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& field = type.fields()[0];
+  std::string new_path = BuildPath(path, field.name(), case_sensitive_);
+  std::string new_short_path;
+  if (field.type()->type_id() == TypeId::kStruct) {
+    new_short_path = short_path;
+  } else {
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+  }
+  auto it = name_to_id_.emplace(new_path, field.field_id());
+  if (!it.second) {
+    return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+  }
+  shortname_to_id_.emplace(new_short_path, field.field_id());
+  ICEBERG_RETURN_UNEXPECTED(
+      VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const MapType& type, const std::string& path,
+                              const std::string& short_path) {
+  std::string new_path, new_short_path;
+  const auto& fields = type.fields();
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    if (field.name() == MapType::kValueName &&
+        field.type()->type_id() == TypeId::kStruct) {
+      new_short_path = short_path;
+    } else {
+      new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    }
+    auto it = name_to_id_.emplace(new_path, field.field_id());
+    if (!it.second) {
+      return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+    }
+    shortname_to_id_.emplace(new_short_path, field.field_id());
+    ICEBERG_RETURN_UNEXPECTED(
+        VisitTypeInline(*field.type(), this, new_path, new_short_path));
+  }
+  return {};
+}
+
+Status NameToIdVisitor::Visit(const StructType& type, const std::string& path,
+                              const std::string& short_path) {
+  const auto& fields = type.fields();
+  std::string new_path, new_short_path;
+  for (const auto& field : fields) {
+    new_path = BuildPath(path, field.name(), case_sensitive_);
+    new_short_path = BuildPath(short_path, field.name(), case_sensitive_);
+    auto it = name_to_id_.emplace(new_path, field.field_id());
+    if (!it.second) {
+      return NotSupported("Duplicate path in name_to_id_: {}", new_path);
+    }
+    shortname_to_id_.emplace(new_short_path, field.field_id());

Review Comment:
   ditto



##########
test/schema_test.cc:
##########
@@ -81,3 +82,420 @@ TEST(SchemaTest, Equality) {
   ASSERT_EQ(schema1, schema5);
   ASSERT_EQ(schema5, schema1);
 }
+
+class NestedTypeTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    field1_ = std::make_unique<iceberg::SchemaField>(1, "Foo", 
iceberg::int32(), true);
+    field2_ = std::make_unique<iceberg::SchemaField>(2, "Bar", 
iceberg::string(), true);
+    field3_ = std::make_unique<iceberg::SchemaField>(3, "Foobar", 
iceberg::int32(), true);
+
+    iceberg::StructType structtype = iceberg::StructType(
+        std::vector<iceberg::SchemaField>{*field1_, *field2_, *field3_});
+
+    auto listype = iceberg::ListType(iceberg::SchemaField::MakeRequired(
+        4, "element", std::make_shared<iceberg::StructType>(structtype)));
+
+    auto maptype =
+        iceberg::MapType(iceberg::SchemaField::MakeRequired(5, "key", 
iceberg::int32()),
+                         iceberg::SchemaField::MakeRequired(
+                             6, "value", 
std::make_shared<iceberg::ListType>(listype)));
+
+    field4_ = std::make_unique<iceberg::SchemaField>(
+        4, "element", std::make_shared<iceberg::StructType>(structtype), 
false);
+    field5_ = std::make_unique<iceberg::SchemaField>(5, "key", 
iceberg::int32(), false);
+    field6_ = std::make_unique<iceberg::SchemaField>(
+        6, "value", std::make_shared<iceberg::ListType>(listype), false);
+    field7_ = std::make_unique<iceberg::SchemaField>(
+        7, "Value", std::make_shared<iceberg::MapType>(maptype), false);
+
+    schema_ =
+        
std::make_shared<iceberg::Schema>(std::vector<iceberg::SchemaField>{*field7_}, 
1);
+  }
+
+  std::shared_ptr<iceberg::Schema> schema_;
+  std::unique_ptr<iceberg::SchemaField> field1_;
+  std::unique_ptr<iceberg::SchemaField> field2_;
+  std::unique_ptr<iceberg::SchemaField> field3_;
+  std::unique_ptr<iceberg::SchemaField> field4_;
+  std::unique_ptr<iceberg::SchemaField> field5_;
+  std::unique_ptr<iceberg::SchemaField> field6_;
+  std::unique_ptr<iceberg::SchemaField> field7_;
+};
+
+TEST_F(NestedTypeTest, TestFindById) {
+  ASSERT_THAT(schema_->FindFieldById(7), ::testing::Optional(*field7_));
+  ASSERT_THAT(schema_->FindFieldById(6), ::testing::Optional(*field6_));
+  ASSERT_THAT(schema_->FindFieldById(5), ::testing::Optional(*field5_));
+  ASSERT_THAT(schema_->FindFieldById(4), ::testing::Optional(*field4_));
+  ASSERT_THAT(schema_->FindFieldById(3), ::testing::Optional(*field3_));
+  ASSERT_THAT(schema_->FindFieldById(2), ::testing::Optional(*field2_));
+  ASSERT_THAT(schema_->FindFieldById(1), ::testing::Optional(*field1_));
+
+  ASSERT_THAT(schema_->FindFieldById(10), ::testing::Optional(std::nullopt));
+}
+
+TEST_F(NestedTypeTest, TestFindByName) {
+  ASSERT_THAT(schema_->FindFieldByName("Value"), 
::testing::Optional(*field7_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value"), 
::testing::Optional(*field6_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.key"), 
::testing::Optional(*field5_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.element"),
+              ::testing::Optional(*field4_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.element.Foobar"),
+              ::testing::Optional(*field3_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.element.Bar"),
+              ::testing::Optional(*field2_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.element.Foo"),
+              ::testing::Optional(*field1_));
+
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.element.FoO"),
+              ::testing::Optional(std::nullopt));
+}
+
+TEST_F(NestedTypeTest, TestFindByNameCaseInsensitive) {
+  ASSERT_THAT(schema_->FindFieldByName("vALue", false), 
::testing::Optional(*field7_));
+  ASSERT_THAT(schema_->FindFieldByName("vALue.VALUE", false),
+              ::testing::Optional(*field6_));
+  ASSERT_THAT(schema_->FindFieldByName("valUe.kEy", false),
+              ::testing::Optional(*field5_));
+  ASSERT_THAT(schema_->FindFieldByName("vaLue.vAlue.elEment", false),
+              ::testing::Optional(*field4_));
+  ASSERT_THAT(schema_->FindFieldByName("vaLue.vAlue.eLement.fOObar", false),
+              ::testing::Optional(*field3_));
+  ASSERT_THAT(schema_->FindFieldByName("valUe.vaLUe.elemEnt.Bar", false),
+              ::testing::Optional(*field2_));
+  ASSERT_THAT(schema_->FindFieldByName("valUe.valUe.ELEMENT.FOO", false),
+              ::testing::Optional(*field1_));
+  ASSERT_THAT(schema_->FindFieldByName("valUe.valUe.ELEMENT.FO", false),
+              ::testing::Optional(std::nullopt));
+}
+
+TEST_F(NestedTypeTest, TestFindByShortNameCaseInsensitive) {
+  ASSERT_THAT(schema_->FindFieldByName("vaLue.value.FOO", false),
+              ::testing::Optional(*field1_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.Bar", false),
+              ::testing::Optional(*field2_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.FooBAR", false),
+              ::testing::Optional(*field3_));
+  ASSERT_THAT(schema_->FindFieldByName("Value.value.FooBAR.a", false),
+              ::testing::Optional(std::nullopt));
+}
+
+class NestType2Test : public ::testing::Test {

Review Comment:
   BTW, `NestType2Test` does not help understand what is about. Please use a 
better name.



##########
src/iceberg/schema.cc:
##########
@@ -19,12 +19,46 @@
 
 #include "iceberg/schema.h"
 
+#include <algorithm>
 #include <format>
 
 #include "iceberg/type.h"
 #include "iceberg/util/formatter.h"  // IWYU pragma: keep
+#include "iceberg/util/macros.h"
+#include "iceberg/util/visit_type.h"
 
 namespace iceberg {
+class IdToFieldVisitor {

Review Comment:
   I think it's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to