wgtmac commented on code in PR #420:
URL: https://github.com/apache/iceberg-cpp/pull/420#discussion_r2634368713


##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);
+  }
+  return AncestorsOf(table, start);
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->snapshot_id == ancestor_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
+}
+
+Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                              int64_t 
ancestor_parent_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->parent_snapshot_id.has_value() &&
+        snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return AncestorsOf(table, current);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& 
table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  return ToIds(ancestors);
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorAfter(
+    const Table& table, TimePointMs timestamp_ms) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  if (!current) {
+    // there are no snapshots or ancestors
+    return std::nullopt;
+  }
+
+  std::shared_ptr<Snapshot> last_snapshot = nullptr;
+  auto ancestors = AncestorsOf(table, current);
+  for (const auto& snapshot : ancestors) {
+    auto snapshot_timestamp_ms = snapshot->timestamp_ms;
+    if (snapshot_timestamp_ms < timestamp_ms) {
+      return last_snapshot ? std::make_optional(last_snapshot) : std::nullopt;
+    } else if (snapshot_timestamp_ms == timestamp_ms) {
+      return snapshot;
+    }
+    last_snapshot = snapshot;
+  }
+
+  if (last_snapshot && !last_snapshot->parent_snapshot_id.has_value()) {
+    // this is the first snapshot in the table, return it
+    return last_snapshot;
+  }
+
+  return ValidationFailed("Cannot find snapshot older than {}",
+                          FormatTimestamp(timestamp_ms));
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::SnapshotIdsBetween(const Table& 
table,
+                                                              int64_t 
from_snapshot_id,
+                                                              int64_t 
to_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto to_snapshot, 
table.SnapshotById(to_snapshot_id));
+  if (!to_snapshot) {
+    return InvalidArgument("Cannot find snapshot: {}", to_snapshot_id);
+  }
+
+  // Create a lookup function that returns null when snapshot_id equals 
from_snapshot_id
+  // This effectively stops traversal at from_snapshot_id (exclusive)
+  auto lookup = [&table,
+                 from_snapshot_id](int64_t id) -> 
Result<std::shared_ptr<Snapshot>> {
+    if (id == from_snapshot_id) {
+      return nullptr;
+    }
+    return table.SnapshotById(id);
+  };
+
+  auto ancestors = AncestorsOf(to_snapshot, lookup);
+  return ToIds(ancestors);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::AncestorIdsBetween(
+    const Table& table, int64_t latest_snapshot_id,
+    const std::optional<int64_t>& oldest_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors, AncestorsBetween(table, latest_snapshot_id, 
oldest_snapshot_id));
+  return ToIds(ancestors);
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsBetween(
+    const Table& table, int64_t latest_snapshot_id,
+    const std::optional<int64_t>& oldest_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(latest_snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", latest_snapshot_id);
+  }
+
+  if (oldest_snapshot_id.has_value()) {
+    if (latest_snapshot_id == oldest_snapshot_id.value()) {
+      return std::vector<std::shared_ptr<Snapshot>>();
+    }
+
+    auto lookup = [&table, oldest_snapshot_id = oldest_snapshot_id.value()](
+                      int64_t id) -> Result<std::shared_ptr<Snapshot>> {
+      if (id == oldest_snapshot_id) {
+        return nullptr;
+      }
+      return table.SnapshotById(id);
+    };
+    return AncestorsOf(start, lookup);
+  } else {
+    return AncestorsOf(table, start);
+  }
+}
+
+std::vector<std::shared_ptr<Snapshot>> SnapshotUtil::AncestorsOf(
+    const Table& table, const std::shared_ptr<Snapshot>& snapshot) {
+  std::vector<std::shared_ptr<Snapshot>> result;
+  if (!snapshot) {
+    return result;
+  }
+
+  std::shared_ptr<Snapshot> current = snapshot;
+  while (current) {
+    result.push_back(current);
+    if (!current->parent_snapshot_id.has_value()) {
+      break;
+    }
+    auto parent_result = 
table.SnapshotById(current->parent_snapshot_id.value());
+    if (!parent_result.has_value()) {

Review Comment:
   We might need to check if the error is `NotFound` and return the original 
error for other kind of errors.



##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);

Review Comment:
   ```suggestion
       return NotFound("Cannot find snapshot: {}", snapshot_id);
   ```



##########
src/iceberg/util/snapshot_util_internal.h:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+/// \brief Utility functions for working with snapshots
+class ICEBERG_EXPORT SnapshotUtil {
+ public:
+  /// \brief Returns a vector of ancestors of the given snapshot.
+  ///
+  /// \param table The table
+  /// \param snapshot_id The snapshot ID to start from
+  /// \return A vector of ancestor snapshots
+  static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(const 
Table& table,
+                                                                    int64_t 
snapshot_id);
+
+  /// \brief Returns whether ancestor_snapshot_id is an ancestor of 
snapshot_id.
+  ///
+  /// \param table The table to check
+  /// \param snapshot_id The snapshot ID to check
+  /// \param ancestor_snapshot_id The ancestor snapshot ID to check for
+  /// \return true if ancestor_snapshot_id is an ancestor of snapshot_id
+  static Result<bool> IsAncestorOf(const Table& table, int64_t snapshot_id,
+                                   int64_t ancestor_snapshot_id);
+
+  /// \brief Returns whether ancestor_snapshot_id is an ancestor of the 
table's current
+  /// state.
+  ///
+  /// \param table The table to check
+  /// \param ancestor_snapshot_id The ancestor snapshot ID to check for
+  /// \return true if ancestor_snapshot_id is an ancestor of the current 
snapshot
+  static Result<bool> IsAncestorOf(const Table& table, int64_t 
ancestor_snapshot_id);
+
+  /// \brief Returns whether some ancestor of snapshot_id has parentId matches
+  /// ancestor_parent_snapshot_id.
+  ///
+  /// \param table The table to check
+  /// \param snapshot_id The snapshot ID to check
+  /// \param ancestor_parent_snapshot_id The ancestor parent snapshot ID to 
check for
+  /// \return true if any ancestor has the given parent ID
+  static Result<bool> IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                         int64_t ancestor_parent_snapshot_id);
+
+  /// \brief Returns a vector that traverses the table's snapshots from the 
current to the
+  /// last known ancestor.
+  ///
+  /// \param table The table
+  /// \return A vector from the table's current snapshot to its last known 
ancestor
+  static Result<std::vector<std::shared_ptr<Snapshot>>> CurrentAncestors(
+      const Table& table);
+
+  /// \brief Return the snapshot IDs for the ancestors of the current table 
state.
+  ///
+  /// Ancestor IDs are ordered by commit time, descending. The first ID is the 
current
+  /// snapshot, followed by its parent, and so on.
+  ///
+  /// \param table The table
+  /// \return A vector of snapshot IDs of the known ancestor snapshots, 
including the
+  /// current ID
+  static Result<std::vector<int64_t>> CurrentAncestorIds(const Table& table);
+
+  /// \brief Traverses the history of the table's current snapshot and finds 
the oldest
+  /// Snapshot.
+  ///
+  /// \param table The table
+  /// \return The oldest snapshot, or nullopt if there is no current snapshot

Review Comment:
   It would be good to document that all returned `std::shared_ptr<Snapshot>` 
cannot be nullptr.



##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);
+  }
+  return AncestorsOf(table, start);
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->snapshot_id == ancestor_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
+}
+
+Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                              int64_t 
ancestor_parent_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->parent_snapshot_id.has_value() &&
+        snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return AncestorsOf(table, current);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& 
table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  return ToIds(ancestors);
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorAfter(
+    const Table& table, TimePointMs timestamp_ms) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  if (!current) {
+    // there are no snapshots or ancestors
+    return std::nullopt;
+  }
+
+  std::shared_ptr<Snapshot> last_snapshot = nullptr;
+  auto ancestors = AncestorsOf(table, current);
+  for (const auto& snapshot : ancestors) {
+    auto snapshot_timestamp_ms = snapshot->timestamp_ms;
+    if (snapshot_timestamp_ms < timestamp_ms) {
+      return last_snapshot ? std::make_optional(last_snapshot) : std::nullopt;
+    } else if (snapshot_timestamp_ms == timestamp_ms) {
+      return snapshot;
+    }
+    last_snapshot = snapshot;
+  }
+
+  if (last_snapshot && !last_snapshot->parent_snapshot_id.has_value()) {
+    // this is the first snapshot in the table, return it
+    return last_snapshot;
+  }
+
+  return ValidationFailed("Cannot find snapshot older than {}",

Review Comment:
   ```suggestion
     return NotFound("Cannot find snapshot older than {}",
   ```



##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);
+  }
+  return AncestorsOf(table, start);
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->snapshot_id == ancestor_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
+}
+
+Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                              int64_t 
ancestor_parent_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->parent_snapshot_id.has_value() &&
+        snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return AncestorsOf(table, current);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& 
table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  return ToIds(ancestors);
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorAfter(
+    const Table& table, TimePointMs timestamp_ms) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  if (!current) {

Review Comment:
   `table.current_snapshot()` should not return a nullptr, so this should be an 
error instead.



##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);
+  }
+  return AncestorsOf(table, start);
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {

Review Comment:
   nit: use std::ranges for this kind of processing.



##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);
+  }
+  return AncestorsOf(table, start);
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->snapshot_id == ancestor_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
+}
+
+Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                              int64_t 
ancestor_parent_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->parent_snapshot_id.has_value() &&
+        snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return AncestorsOf(table, current);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& 
table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  return ToIds(ancestors);
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorAfter(
+    const Table& table, TimePointMs timestamp_ms) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  if (!current) {
+    // there are no snapshots or ancestors
+    return std::nullopt;
+  }
+
+  std::shared_ptr<Snapshot> last_snapshot = nullptr;
+  auto ancestors = AncestorsOf(table, current);
+  for (const auto& snapshot : ancestors) {
+    auto snapshot_timestamp_ms = snapshot->timestamp_ms;
+    if (snapshot_timestamp_ms < timestamp_ms) {
+      return last_snapshot ? std::make_optional(last_snapshot) : std::nullopt;
+    } else if (snapshot_timestamp_ms == timestamp_ms) {
+      return snapshot;
+    }
+    last_snapshot = snapshot;
+  }
+
+  if (last_snapshot && !last_snapshot->parent_snapshot_id.has_value()) {
+    // this is the first snapshot in the table, return it
+    return last_snapshot;
+  }
+
+  return ValidationFailed("Cannot find snapshot older than {}",
+                          FormatTimestamp(timestamp_ms));
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::SnapshotIdsBetween(const Table& 
table,
+                                                              int64_t 
from_snapshot_id,
+                                                              int64_t 
to_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto to_snapshot, 
table.SnapshotById(to_snapshot_id));
+  if (!to_snapshot) {
+    return InvalidArgument("Cannot find snapshot: {}", to_snapshot_id);
+  }
+
+  // Create a lookup function that returns null when snapshot_id equals 
from_snapshot_id
+  // This effectively stops traversal at from_snapshot_id (exclusive)
+  auto lookup = [&table,
+                 from_snapshot_id](int64_t id) -> 
Result<std::shared_ptr<Snapshot>> {
+    if (id == from_snapshot_id) {
+      return nullptr;
+    }
+    return table.SnapshotById(id);
+  };
+
+  auto ancestors = AncestorsOf(to_snapshot, lookup);
+  return ToIds(ancestors);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::AncestorIdsBetween(
+    const Table& table, int64_t latest_snapshot_id,
+    const std::optional<int64_t>& oldest_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors, AncestorsBetween(table, latest_snapshot_id, 
oldest_snapshot_id));
+  return ToIds(ancestors);
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsBetween(
+    const Table& table, int64_t latest_snapshot_id,
+    const std::optional<int64_t>& oldest_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(latest_snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", latest_snapshot_id);
+  }
+
+  if (oldest_snapshot_id.has_value()) {
+    if (latest_snapshot_id == oldest_snapshot_id.value()) {
+      return std::vector<std::shared_ptr<Snapshot>>();
+    }
+
+    auto lookup = [&table, oldest_snapshot_id = oldest_snapshot_id.value()](
+                      int64_t id) -> Result<std::shared_ptr<Snapshot>> {
+      if (id == oldest_snapshot_id) {
+        return nullptr;
+      }
+      return table.SnapshotById(id);
+    };
+    return AncestorsOf(start, lookup);
+  } else {
+    return AncestorsOf(table, start);
+  }
+}
+
+std::vector<std::shared_ptr<Snapshot>> SnapshotUtil::AncestorsOf(
+    const Table& table, const std::shared_ptr<Snapshot>& snapshot) {
+  std::vector<std::shared_ptr<Snapshot>> result;
+  if (!snapshot) {
+    return result;
+  }
+
+  std::shared_ptr<Snapshot> current = snapshot;
+  while (current) {
+    result.push_back(current);
+    if (!current->parent_snapshot_id.has_value()) {
+      break;
+    }
+    auto parent_result = 
table.SnapshotById(current->parent_snapshot_id.value());
+    if (!parent_result.has_value()) {
+      // Parent snapshot not found (e.g., expired), stop traversal
+      break;
+    }
+    current = parent_result.value();

Review Comment:
   ```suggestion
       current = std::move(parent_result.value());
   ```



##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);
+  }
+  return AncestorsOf(table, start);
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->snapshot_id == ancestor_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
+}
+
+Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                              int64_t 
ancestor_parent_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->parent_snapshot_id.has_value() &&
+        snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return AncestorsOf(table, current);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& 
table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  return ToIds(ancestors);
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorAfter(
+    const Table& table, TimePointMs timestamp_ms) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  if (!current) {
+    // there are no snapshots or ancestors
+    return std::nullopt;
+  }
+
+  std::shared_ptr<Snapshot> last_snapshot = nullptr;
+  auto ancestors = AncestorsOf(table, current);
+  for (const auto& snapshot : ancestors) {
+    auto snapshot_timestamp_ms = snapshot->timestamp_ms;
+    if (snapshot_timestamp_ms < timestamp_ms) {
+      return last_snapshot ? std::make_optional(last_snapshot) : std::nullopt;
+    } else if (snapshot_timestamp_ms == timestamp_ms) {
+      return snapshot;
+    }
+    last_snapshot = snapshot;
+  }
+
+  if (last_snapshot && !last_snapshot->parent_snapshot_id.has_value()) {
+    // this is the first snapshot in the table, return it
+    return last_snapshot;
+  }
+
+  return ValidationFailed("Cannot find snapshot older than {}",

Review Comment:
   I'm undecided which is better, ValidationFailed, NotFound or Invalid.



##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);
+  }
+  return AncestorsOf(table, start);
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->snapshot_id == ancestor_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
+}
+
+Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                              int64_t 
ancestor_parent_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->parent_snapshot_id.has_value() &&
+        snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return AncestorsOf(table, current);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& 
table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  return ToIds(ancestors);
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorAfter(
+    const Table& table, TimePointMs timestamp_ms) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  if (!current) {
+    // there are no snapshots or ancestors
+    return std::nullopt;
+  }
+
+  std::shared_ptr<Snapshot> last_snapshot = nullptr;

Review Comment:
   What about using `std::optional<std::shared_ptr<Snapshot>>` so we don't need 
to check at line 111.



##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);
+  }
+  return AncestorsOf(table, start);
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->snapshot_id == ancestor_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
+}
+
+Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                              int64_t 
ancestor_parent_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->parent_snapshot_id.has_value() &&
+        snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return AncestorsOf(table, current);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& 
table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  return ToIds(ancestors);
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorAfter(
+    const Table& table, TimePointMs timestamp_ms) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  if (!current) {
+    // there are no snapshots or ancestors
+    return std::nullopt;
+  }
+
+  std::shared_ptr<Snapshot> last_snapshot = nullptr;
+  auto ancestors = AncestorsOf(table, current);
+  for (const auto& snapshot : ancestors) {
+    auto snapshot_timestamp_ms = snapshot->timestamp_ms;
+    if (snapshot_timestamp_ms < timestamp_ms) {
+      return last_snapshot ? std::make_optional(last_snapshot) : std::nullopt;
+    } else if (snapshot_timestamp_ms == timestamp_ms) {
+      return snapshot;
+    }
+    last_snapshot = snapshot;
+  }
+
+  if (last_snapshot && !last_snapshot->parent_snapshot_id.has_value()) {
+    // this is the first snapshot in the table, return it
+    return last_snapshot;
+  }
+
+  return ValidationFailed("Cannot find snapshot older than {}",
+                          FormatTimestamp(timestamp_ms));
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::SnapshotIdsBetween(const Table& 
table,
+                                                              int64_t 
from_snapshot_id,
+                                                              int64_t 
to_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto to_snapshot, 
table.SnapshotById(to_snapshot_id));
+  if (!to_snapshot) {
+    return InvalidArgument("Cannot find snapshot: {}", to_snapshot_id);

Review Comment:
   `InvalidArgument` should be used for input argument. This is an invalid 
state so perhaps return `Invalid`?



##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);

Review Comment:
   BTW, perhaps this should be a ICEBERG_DCHECK.



##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {}  // namespace
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id));
+  if (!start) {
+    return InvalidArgument("Cannot find snapshot: {}", snapshot_id);
+  }
+  return AncestorsOf(table, start);
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->snapshot_id == ancestor_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
+}
+
+Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                              int64_t 
ancestor_parent_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  for (const auto& snapshot : ancestors) {
+    if (snapshot->parent_snapshot_id.has_value() &&
+        snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return AncestorsOf(table, current);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& 
table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  return ToIds(ancestors);
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorAfter(
+    const Table& table, TimePointMs timestamp_ms) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());

Review Comment:
   Actually it is not an error if current snapshot is not found. We can return 
std::nullopt when the result error is `NotFound`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to