This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a29c6cebfd Core: Add last updated timestamp and snapshotId for 
Partitions table (#7581)
a29c6cebfd is described below

commit a29c6cebfd39b8c628cf901e7c37cf18a2fd064a
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Fri Jun 23 14:32:13 2023 -0700

    Core: Add last updated timestamp and snapshotId for Partitions table (#7581)
---
 .../java/org/apache/iceberg/PartitionsTable.java   |  78 +++++---
 .../apache/iceberg/MetadataTableScanTestBase.java  |  12 +-
 .../org/apache/iceberg/TestMetadataTableScans.java | 165 +++++++++--------
 ...stMetadataTableScansWithPartitionEvolution.java |  32 ++--
 .../spark/source/TestIcebergSourceTablesBase.java  | 164 ++++++++++++++++-
 .../spark/source/TestIcebergSourceTablesBase.java  | 176 +++++++++++++++++-
 .../spark/source/TestIcebergSourceTablesBase.java  | 176 +++++++++++++++++-
 .../spark/source/TestIcebergSourceTablesBase.java  | 198 +++++++++++++++++++--
 8 files changed, 855 insertions(+), 146 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java 
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index b46239352d..f072a7343f 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -72,7 +72,17 @@ public class PartitionsTable extends BaseMetadataTable {
                 8,
                 "equality_delete_file_count",
                 Types.IntegerType.get(),
-                "Count of equality delete files"));
+                "Count of equality delete files"),
+            Types.NestedField.optional(
+                9,
+                "last_updated_ms",
+                Types.TimestampType.withZone(),
+                "Commit time of snapshot that last updated this partition"),
+            Types.NestedField.optional(
+                10,
+                "last_updated_snapshot_id",
+                Types.LongType.get(),
+                "Id of snapshot that last updated this partition"));
     this.unpartitionedTable = 
Partitioning.partitionType(table).fields().isEmpty();
   }
 
@@ -90,7 +100,9 @@ public class PartitionsTable extends BaseMetadataTable {
           "position_delete_record_count",
           "position_delete_file_count",
           "equality_delete_record_count",
-          "equality_delete_file_count");
+          "equality_delete_file_count",
+          "last_updated_ms",
+          "last_updated_snapshot_id");
     }
     return schema;
   }
@@ -116,7 +128,9 @@ public class PartitionsTable extends BaseMetadataTable {
                   root.posDeleteRecordCount,
                   root.posDeleteFileCount,
                   root.eqDeleteRecordCount,
-                  root.eqDeleteFileCount));
+                  root.eqDeleteFileCount,
+                  root.lastUpdatedMs,
+                  root.lastUpdatedSnapshotId));
     } else {
       return StaticDataTask.of(
           
io().newInputFile(table().operations().current().metadataFileLocation()),
@@ -136,19 +150,22 @@ public class PartitionsTable extends BaseMetadataTable {
         partition.posDeleteRecordCount,
         partition.posDeleteFileCount,
         partition.eqDeleteRecordCount,
-        partition.eqDeleteFileCount);
+        partition.eqDeleteFileCount,
+        partition.lastUpdatedMs,
+        partition.lastUpdatedSnapshotId);
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
     Types.StructType partitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap(partitionType);
-
-    try (CloseableIterable<ContentFile<?>> files = planFiles(scan)) {
-      for (ContentFile<?> file : files) {
+    try (CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries = 
planEntries(scan)) {
+      for (ManifestEntry<? extends ContentFile<?>> entry : entries) {
+        Snapshot snapshot = table.snapshot(entry.snapshotId());
+        ContentFile<?> file = entry.file();
         StructLike partition =
             PartitionUtil.coercePartition(
                 partitionType, table.specs().get(file.specId()), 
file.partition());
-        partitions.get(partition).update(file);
+        partitions.get(partition).update(file, snapshot);
       }
     } catch (IOException e) {
       throw new UncheckedIOException(e);
@@ -158,25 +175,32 @@ public class PartitionsTable extends BaseMetadataTable {
   }
 
   @VisibleForTesting
-  static CloseableIterable<ContentFile<?>> planFiles(StaticTableScan scan) {
+  static CloseableIterable<ManifestEntry<?>> planEntries(StaticTableScan scan) 
{
     Table table = scan.table();
 
     CloseableIterable<ManifestFile> filteredManifests =
         filteredManifests(scan, table, 
scan.snapshot().allManifests(table.io()));
 
-    Iterable<CloseableIterable<ContentFile<?>>> tasks =
-        CloseableIterable.transform(
-            filteredManifests,
-            manifest ->
-                CloseableIterable.transform(
-                    ManifestFiles.open(manifest, table.io(), table.specs())
-                        .caseSensitive(scan.isCaseSensitive())
-                        .select(scanColumns(manifest.content())), // don't 
select stats columns
-                    t -> (ContentFile<?>) t));
+    Iterable<CloseableIterable<ManifestEntry<?>>> tasks =
+        CloseableIterable.transform(filteredManifests, manifest -> 
readEntries(manifest, scan));
 
     return new ParallelIterable<>(tasks, scan.planExecutor());
   }
 
+  private static CloseableIterable<ManifestEntry<?>> readEntries(
+      ManifestFile manifest, StaticTableScan scan) {
+    Table table = scan.table();
+    return CloseableIterable.transform(
+        ManifestFiles.open(manifest, table.io(), table.specs())
+            .caseSensitive(scan.isCaseSensitive())
+            .select(scanColumns(manifest.content())) // don't select stats 
columns
+            .entries(),
+        t ->
+            (ManifestEntry<? extends ContentFile<?>>)
+                // defensive copy of manifest entry without stats columns
+                t.copyWithoutStats());
+  }
+
   private static List<String> scanColumns(ManifestContent content) {
     switch (content) {
       case DATA:
@@ -249,19 +273,29 @@ public class PartitionsTable extends BaseMetadataTable {
     private int posDeleteFileCount;
     private long eqDeleteRecordCount;
     private int eqDeleteFileCount;
+    private Long lastUpdatedMs;
+    private Long lastUpdatedSnapshotId;
 
     Partition(StructLike key, Types.StructType keyType) {
       this.partitionData = toPartitionData(key, keyType);
       this.specId = 0;
-      this.dataRecordCount = 0;
+      this.dataRecordCount = 0L;
       this.dataFileCount = 0;
-      this.posDeleteRecordCount = 0;
+      this.posDeleteRecordCount = 0L;
       this.posDeleteFileCount = 0;
-      this.eqDeleteRecordCount = 0;
+      this.eqDeleteRecordCount = 0L;
       this.eqDeleteFileCount = 0;
     }
 
-    void update(ContentFile<?> file) {
+    void update(ContentFile<?> file, Snapshot snapshot) {
+      if (snapshot != null) {
+        long snapshotCommitTime = snapshot.timestampMillis() * 1000;
+        if (this.lastUpdatedMs == null || snapshotCommitTime > 
this.lastUpdatedMs) {
+          this.lastUpdatedMs = snapshotCommitTime;
+          this.lastUpdatedSnapshotId = snapshot.snapshotId();
+        }
+      }
+
       switch (file.content()) {
         case DATA:
           this.dataRecordCount += file.recordCount();
diff --git 
a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java 
b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
index 4c8f25c45b..b5ef31a50c 100644
--- a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
@@ -81,18 +81,20 @@ public abstract class MetadataTableScanTestBase extends 
TableTestBase {
   }
 
   protected void validateSingleFieldPartition(
-      CloseableIterable<ContentFile<?>> files, int partitionValue) {
+      CloseableIterable<ManifestEntry<?>> files, int partitionValue) {
     validatePartition(files, 0, partitionValue);
   }
 
   protected void validatePartition(
-      CloseableIterable<ContentFile<?>> files, int position, int 
partitionValue) {
+      CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries,
+      int position,
+      int partitionValue) {
     Assert.assertTrue(
         "File scan tasks do not include correct file",
-        StreamSupport.stream(files.spliterator(), false)
+        StreamSupport.stream(entries.spliterator(), false)
             .anyMatch(
-                file -> {
-                  StructLike partition = file.partition();
+                entry -> {
+                  StructLike partition = entry.file().partition();
                   if (position >= partition.size()) {
                     return false;
                   }
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 9f3ee5c9c3..2e34f2e6da 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -317,18 +317,18 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     TableScan scanNoFilter = 
partitionsTable.newScan().select("partition.data_bucket");
     Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
 
-    CloseableIterable<ContentFile<?>> files =
-        PartitionsTable.planFiles((StaticTableScan) scanNoFilter);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scanNoFilter);
     if (formatVersion == 2) {
-      Assert.assertEquals(8, Iterators.size(files.iterator()));
+      Assert.assertEquals(8, Iterators.size(entries.iterator()));
     } else {
-      Assert.assertEquals(4, Iterators.size(files.iterator()));
+      Assert.assertEquals(4, Iterators.size(entries.iterator()));
     }
 
-    validateSingleFieldPartition(files, 0);
-    validateSingleFieldPartition(files, 1);
-    validateSingleFieldPartition(files, 2);
-    validateSingleFieldPartition(files, 3);
+    validateSingleFieldPartition(entries, 0);
+    validateSingleFieldPartition(entries, 1);
+    validateSingleFieldPartition(entries, 2);
+    validateSingleFieldPartition(entries, 3);
   }
 
   @Test
@@ -342,18 +342,18 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
 
     TableScan scanWithProjection = 
partitionsTable.newScan().select("file_count");
     Assert.assertEquals(expected, scanWithProjection.schema().asStruct());
-    CloseableIterable<ContentFile<?>> files =
-        PartitionsTable.planFiles((StaticTableScan) scanWithProjection);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scanWithProjection);
     if (formatVersion == 2) {
-      Assert.assertEquals(8, Iterators.size(files.iterator()));
+      Assert.assertEquals(8, Iterators.size(entries.iterator()));
     } else {
-      Assert.assertEquals(4, Iterators.size(files.iterator()));
+      Assert.assertEquals(4, Iterators.size(entries.iterator()));
     }
 
-    validateSingleFieldPartition(files, 0);
-    validateSingleFieldPartition(files, 1);
-    validateSingleFieldPartition(files, 2);
-    validateSingleFieldPartition(files, 3);
+    validateSingleFieldPartition(entries, 0);
+    validateSingleFieldPartition(entries, 1);
+    validateSingleFieldPartition(entries, 2);
+    validateSingleFieldPartition(entries, 3);
   }
 
   @Test
@@ -361,14 +361,15 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     table.newFastAppend().appendFile(FILE_WITH_STATS).commit();
 
     Table partitionsTable = new PartitionsTable(table);
-    CloseableIterable<ContentFile<?>> tasksAndEq =
-        PartitionsTable.planFiles((StaticTableScan) partitionsTable.newScan());
-    for (ContentFile<?> file : tasksAndEq) {
-      Assert.assertNull(file.columnSizes());
-      Assert.assertNull(file.valueCounts());
-      Assert.assertNull(file.nullValueCounts());
-      Assert.assertNull(file.lowerBounds());
-      Assert.assertNull(file.upperBounds());
+    CloseableIterable<ManifestEntry<?>> tasksAndEq =
+        PartitionsTable.planEntries((StaticTableScan) 
partitionsTable.newScan());
+    for (ManifestEntry<? extends ContentFile<?>> task : tasksAndEq) {
+      Assert.assertNull(task.file().columnSizes());
+      Assert.assertNull(task.file().valueCounts());
+      Assert.assertNull(task.file().nullValueCounts());
+      Assert.assertNull(task.file().nanValueCounts());
+      Assert.assertNull(task.file().lowerBounds());
+      Assert.assertNull(task.file().upperBounds());
     }
   }
 
@@ -383,15 +384,15 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
             Expressions.equal("partition.data_bucket", 0),
             Expressions.greaterThan("record_count", 0));
     TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
-    CloseableIterable<ContentFile<?>> files =
-        PartitionsTable.planFiles((StaticTableScan) scanAndEq);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scanAndEq);
     if (formatVersion == 2) {
-      Assert.assertEquals(2, Iterators.size(files.iterator()));
+      Assert.assertEquals(2, Iterators.size(entries.iterator()));
     } else {
-      Assert.assertEquals(1, Iterators.size(files.iterator()));
+      Assert.assertEquals(1, Iterators.size(entries.iterator()));
     }
 
-    validateSingleFieldPartition(files, 0);
+    validateSingleFieldPartition(entries, 0);
   }
 
   @Test
@@ -405,16 +406,16 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
             Expressions.lessThan("partition.data_bucket", 2),
             Expressions.greaterThan("record_count", 0));
     TableScan scanLtAnd = partitionsTable.newScan().filter(ltAnd);
-    CloseableIterable<ContentFile<?>> files =
-        PartitionsTable.planFiles((StaticTableScan) scanLtAnd);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scanLtAnd);
     if (formatVersion == 2) {
-      Assert.assertEquals(4, Iterators.size(files.iterator()));
+      Assert.assertEquals(4, Iterators.size(entries.iterator()));
     } else {
-      Assert.assertEquals(2, Iterators.size(files.iterator()));
+      Assert.assertEquals(2, Iterators.size(entries.iterator()));
     }
 
-    validateSingleFieldPartition(files, 0);
-    validateSingleFieldPartition(files, 1);
+    validateSingleFieldPartition(entries, 0);
+    validateSingleFieldPartition(entries, 1);
   }
 
   @Test
@@ -429,17 +430,18 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
             Expressions.greaterThan("record_count", 0));
     TableScan scanOr = partitionsTable.newScan().filter(or);
 
-    CloseableIterable<ContentFile<?>> files = 
PartitionsTable.planFiles((StaticTableScan) scanOr);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scanOr);
     if (formatVersion == 2) {
-      Assert.assertEquals(8, Iterators.size(files.iterator()));
+      Assert.assertEquals(8, Iterators.size(entries.iterator()));
     } else {
-      Assert.assertEquals(4, Iterators.size(files.iterator()));
+      Assert.assertEquals(4, Iterators.size(entries.iterator()));
     }
 
-    validateSingleFieldPartition(files, 0);
-    validateSingleFieldPartition(files, 1);
-    validateSingleFieldPartition(files, 2);
-    validateSingleFieldPartition(files, 3);
+    validateSingleFieldPartition(entries, 0);
+    validateSingleFieldPartition(entries, 1);
+    validateSingleFieldPartition(entries, 2);
+    validateSingleFieldPartition(entries, 3);
   }
 
   @Test
@@ -449,15 +451,16 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
 
     Expression not = 
Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
     TableScan scanNot = partitionsTable.newScan().filter(not);
-    CloseableIterable<ContentFile<?>> files = 
PartitionsTable.planFiles((StaticTableScan) scanNot);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scanNot);
     if (formatVersion == 2) {
-      Assert.assertEquals(4, Iterators.size(files.iterator()));
+      Assert.assertEquals(4, Iterators.size(entries.iterator()));
     } else {
-      Assert.assertEquals(2, Iterators.size(files.iterator()));
+      Assert.assertEquals(2, Iterators.size(entries.iterator()));
     }
 
-    validateSingleFieldPartition(files, 2);
-    validateSingleFieldPartition(files, 3);
+    validateSingleFieldPartition(entries, 2);
+    validateSingleFieldPartition(entries, 3);
   }
 
   @Test
@@ -468,15 +471,16 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
 
     Expression set = Expressions.in("partition.data_bucket", 2, 3);
     TableScan scanSet = partitionsTable.newScan().filter(set);
-    CloseableIterable<ContentFile<?>> files = 
PartitionsTable.planFiles((StaticTableScan) scanSet);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scanSet);
     if (formatVersion == 2) {
-      Assert.assertEquals(4, Iterators.size(files.iterator()));
+      Assert.assertEquals(4, Iterators.size(entries.iterator()));
     } else {
-      Assert.assertEquals(2, Iterators.size(files.iterator()));
+      Assert.assertEquals(2, Iterators.size(entries.iterator()));
     }
 
-    validateSingleFieldPartition(files, 2);
-    validateSingleFieldPartition(files, 3);
+    validateSingleFieldPartition(entries, 2);
+    validateSingleFieldPartition(entries, 3);
   }
 
   @Test
@@ -487,18 +491,18 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
 
     Expression unary = Expressions.notNull("partition.data_bucket");
     TableScan scanUnary = partitionsTable.newScan().filter(unary);
-    CloseableIterable<ContentFile<?>> files =
-        PartitionsTable.planFiles((StaticTableScan) scanUnary);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scanUnary);
     if (formatVersion == 2) {
-      Assert.assertEquals(8, Iterators.size(files.iterator()));
+      Assert.assertEquals(8, Iterators.size(entries.iterator()));
     } else {
-      Assert.assertEquals(4, Iterators.size(files.iterator()));
+      Assert.assertEquals(4, Iterators.size(entries.iterator()));
     }
 
-    validateSingleFieldPartition(files, 0);
-    validateSingleFieldPartition(files, 1);
-    validateSingleFieldPartition(files, 2);
-    validateSingleFieldPartition(files, 3);
+    validateSingleFieldPartition(entries, 0);
+    validateSingleFieldPartition(entries, 1);
+    validateSingleFieldPartition(entries, 2);
+    validateSingleFieldPartition(entries, 3);
   }
 
   @Test
@@ -788,13 +792,14 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
         Expressions.and(
             Expressions.equal("partition.id", 10), 
Expressions.greaterThan("record_count", 0));
     TableScan scan = metadataTable.newScan().filter(filter);
-    CloseableIterable<ContentFile<?>> files = 
PartitionsTable.planFiles((StaticTableScan) scan);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scan);
     if (formatVersion == 2) {
       // Four data files and delete files of old spec, one new data file of 
new spec
-      Assert.assertEquals(9, Iterables.size(files));
+      Assert.assertEquals(9, Iterables.size(entries));
     } else {
       // Four data files of old spec, one new data file of new spec
-      Assert.assertEquals(5, Iterables.size(files));
+      Assert.assertEquals(5, Iterables.size(entries));
     }
 
     filter =
@@ -802,15 +807,15 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
             Expressions.equal("partition.data_bucket", 0),
             Expressions.greaterThan("record_count", 0));
     scan = metadataTable.newScan().filter(filter);
-    files = PartitionsTable.planFiles((StaticTableScan) scan);
+    entries = PartitionsTable.planEntries((StaticTableScan) scan);
 
     if (formatVersion == 2) {
       // 1 original data file and delete file written by old spec, plus 1 new 
data file written by
       // new spec
-      Assert.assertEquals(3, Iterables.size(files));
+      Assert.assertEquals(3, Iterables.size(entries));
     } else {
       // 1 original data file written by old spec, plus 1 new data file 
written by new spec
-      Assert.assertEquals(2, Iterables.size(files));
+      Assert.assertEquals(2, Iterables.size(entries));
     }
   }
 
@@ -852,14 +857,15 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
         Expressions.and(
             Expressions.equal("partition.id", 10), 
Expressions.greaterThan("record_count", 0));
     TableScan scan = metadataTable.newScan().filter(filter);
-    CloseableIterable<ContentFile<?>> files = 
PartitionsTable.planFiles((StaticTableScan) scan);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scan);
 
     if (formatVersion == 2) {
       // Four data and delete files of original spec, one data file written by 
new spec
-      Assert.assertEquals(9, Iterables.size(files));
+      Assert.assertEquals(9, Iterables.size(entries));
     } else {
       // Four data files of original spec, one data file written by new spec
-      Assert.assertEquals(5, Iterables.size(files));
+      Assert.assertEquals(5, Iterables.size(entries));
     }
 
     // Filter for a dropped partition spec field.  Correct behavior is that 
only old partitions are
@@ -869,11 +875,11 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
             Expressions.equal("partition.data_bucket", 0),
             Expressions.greaterThan("record_count", 0));
     scan = metadataTable.newScan().filter(filter);
-    files = PartitionsTable.planFiles((StaticTableScan) scan);
+    entries = PartitionsTable.planEntries((StaticTableScan) scan);
 
     if (formatVersion == 1) {
       // 1 original data file written by old spec
-      Assert.assertEquals(1, Iterables.size(files));
+      Assert.assertEquals(1, Iterables.size(entries));
     } else {
       // 1 original data and 1 delete files written by old spec, plus both of 
new data file/delete
       // file written by new spec
@@ -888,7 +894,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
       // schema.
       // The Partition table final schema is a union of fields of all specs, 
including dropped
       // fields.
-      Assert.assertEquals(4, Iterables.size(files));
+      Assert.assertEquals(4, Iterables.size(entries));
     }
   }
 
@@ -939,10 +945,10 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
             Expressions.equal("partition.partition", 0),
             Expressions.greaterThan("record_count", 0));
     TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
-    CloseableIterable<ContentFile<?>> files =
-        PartitionsTable.planFiles((StaticTableScan) scanAndEq);
-    Assert.assertEquals(1, Iterators.size(files.iterator()));
-    validateSingleFieldPartition(files, 0);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scanAndEq);
+    Assert.assertEquals(1, Iterators.size(entries.iterator()));
+    validateSingleFieldPartition(entries, 0);
   }
 
   @Test
@@ -1010,11 +1016,12 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
                           true); // daemon threads will be terminated abruptly 
when the JVM exits
                       return thread;
                     }));
-    CloseableIterable<ContentFile<?>> files = 
PartitionsTable.planFiles((StaticTableScan) scan);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scan);
     if (formatVersion == 2) {
-      Assert.assertEquals(8, Iterators.size(files.iterator()));
+      Assert.assertEquals(8, Iterators.size(entries.iterator()));
     } else {
-      Assert.assertEquals(4, Iterators.size(files.iterator()));
+      Assert.assertEquals(4, Iterators.size(entries.iterator()));
     }
 
     Assert.assertTrue("Thread should be created in provided pool", 
planThreadsIndex.get() > 0);
diff --git 
a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
index b6e1ecae4f..92fa080dfe 100644
--- 
a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
+++ 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
@@ -140,7 +140,7 @@ public class TestMetadataTableScansWithPartitionEvolution 
extends MetadataTableS
   }
 
   @Test
-  public void testPartitionsTableScanWithAddPartitionOnNestedField() throws 
IOException {
+  public void testPartitionsTableScanWithAddPartitionOnNestedField() {
     Table partitionsTable = new PartitionsTable(table);
     Types.StructType idPartition =
         new Schema(
@@ -154,15 +154,15 @@ public class TestMetadataTableScansWithPartitionEvolution 
extends MetadataTableS
 
     TableScan scanNoFilter = partitionsTable.newScan().select("partition");
     Assert.assertEquals(idPartition, scanNoFilter.schema().asStruct());
-    CloseableIterable<ContentFile<?>> files =
-        PartitionsTable.planFiles((StaticTableScan) scanNoFilter);
-    Assert.assertEquals(4, Iterators.size(files.iterator()));
-    validatePartition(files, 0, 0);
-    validatePartition(files, 0, 1);
-    validatePartition(files, 0, 2);
-    validatePartition(files, 0, 3);
-    validatePartition(files, 1, 2);
-    validatePartition(files, 1, 3);
+    CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) scanNoFilter);
+    Assert.assertEquals(4, Iterators.size(entries.iterator()));
+    validatePartition(entries, 0, 0);
+    validatePartition(entries, 0, 1);
+    validatePartition(entries, 0, 2);
+    validatePartition(entries, 0, 3);
+    validatePartition(entries, 1, 2);
+    validatePartition(entries, 1, 3);
   }
 
   @Test
@@ -241,16 +241,16 @@ public class TestMetadataTableScansWithPartitionEvolution 
extends MetadataTableS
     // must contain the partition column even when the current spec is 
non-partitioned.
     
Assertions.assertThat(partitionsTable.schema().findField("partition")).isNotNull();
 
-    try (CloseableIterable<ContentFile<?>> files =
-        PartitionsTable.planFiles((StaticTableScan) 
partitionsTable.newScan())) {
+    try (CloseableIterable<ManifestEntry<?>> entries =
+        PartitionsTable.planEntries((StaticTableScan) 
partitionsTable.newScan())) {
       // four partitioned data files and one non-partitioned data file.
-      Assertions.assertThat(files).hasSize(5);
+      Assertions.assertThat(entries).hasSize(5);
 
       // check for null partition value.
-      Assertions.assertThat(StreamSupport.stream(files.spliterator(), false))
+      Assertions.assertThat(StreamSupport.stream(entries.spliterator(), false))
           .anyMatch(
-              file -> {
-                StructLike partition = file.partition();
+              entry -> {
+                StructLike partition = entry.file().partition();
                 return Objects.equals(null, partition.get(0, Object.class));
               });
     }
diff --git 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 2dda346890..ff8bf0e7be 100644
--- 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -1217,7 +1218,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   @Test
   public void testUnpartitionedPartitionsTable() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"unpartitioned_partitions_test");
-    createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+    Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
 
     Dataset<Row> df =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -1251,7 +1252,17 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
                 8,
                 "equality_delete_file_count",
                 Types.IntegerType.get(),
-                "Count of equality delete files"));
+                "Count of equality delete files"),
+            optional(
+                9,
+                "last_updated_ms",
+                Types.TimestampType.withZone(),
+                "Commit time of snapshot that last updated this partition"),
+            optional(
+                10,
+                "last_updated_snapshot_id",
+                Types.LongType.get(),
+                "Id of snapshot that last updated this partition"));
 
     Table partitionsTable = loadTable(tableIdentifier, "partitions");
 
@@ -1264,6 +1275,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         new 
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), 
"partitions"));
     GenericData.Record expectedRow =
         builder
+            .set("last_updated_ms", table.currentSnapshot().timestampMillis() 
* 1000)
+            .set("last_updated_snapshot_id", 
table.currentSnapshot().snapshotId())
             .set("record_count", 1L)
             .set("file_count", 1)
             .set("position_delete_record_count", 0L)
@@ -1309,6 +1322,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         .mode("append")
         .save(loadLocation(tableIdentifier));
 
+    table.refresh();
+    long secondCommitId = table.currentSnapshot().snapshotId();
+
     List<Row> actual =
         spark
             .read()
@@ -1334,6 +1350,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
             .build());
     expected.add(
         builder
@@ -1345,6 +1363,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(secondCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", secondCommitId)
             .build());
 
     Assert.assertEquals("Partitions table should have two rows", 2, 
expected.size());
@@ -1387,11 +1407,149 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
             .load(loadLocation(tableIdentifier, "partitions"))
             .filter("partition.id < 2 or record_count=1")
             .collectAsList();
-    Assert.assertEquals("Actual results should have one row", 2, 
nonFiltered.size());
+    Assert.assertEquals("Actual results should have two row", 2, 
nonFiltered.size());
+    for (int i = 0; i < 2; i += 1) {
+      TestHelpers.assertEqualsSafe(
+          partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
+    }
+  }
+
+  @Test
+  public void testPartitionsTableLastUpdatedSnapshot() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"partitions_test");
+    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
+    Table partitionsTable = loadTable(tableIdentifier, "partitions");
+    Dataset<Row> df1 =
+        spark.createDataFrame(
+            Lists.newArrayList(new SimpleRecord(1, "1"), new SimpleRecord(2, 
"2")),
+            SimpleRecord.class);
+    Dataset<Row> df2 =
+        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "20")), 
SimpleRecord.class);
+
+    df1.select("id", "data")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    long firstCommitId = table.currentSnapshot().snapshotId();
+
+    // add a second file
+    df2.select("id", "data")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    long secondCommitId = table.currentSnapshot().snapshotId();
+
+    // check if rewrite manifest does not override metadata about data file's 
creating snapshot
+    RewriteManifests.Result rewriteManifestResult =
+        SparkActions.get().rewriteManifests(table).execute();
+    Assert.assertEquals(
+        "rewrite replaced 2 manifests",
+        2,
+        Iterables.size(rewriteManifestResult.rewrittenManifests()));
+    Assert.assertEquals(
+        "rewrite added 1 manifests", 1, 
Iterables.size(rewriteManifestResult.addedManifests()));
+
+    List<Row> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .orderBy("partition.id")
+            .collectAsList();
+
+    GenericRecordBuilder builder =
+        new 
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), 
"partitions"));
+    GenericRecordBuilder partitionBuilder =
+        new GenericRecordBuilder(
+            AvroSchemaUtil.convert(
+                partitionsTable.schema().findType("partition").asStructType(), 
"partition"));
+    List<GenericData.Record> expected = Lists.newArrayList();
+    expected.add(
+        builder
+            .set("partition", partitionBuilder.set("id", 1).build())
+            .set("record_count", 1L)
+            .set("file_count", 1)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
+            .build());
+    expected.add(
+        builder
+            .set("partition", partitionBuilder.set("id", 2).build())
+            .set("record_count", 2L)
+            .set("file_count", 2)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(secondCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", secondCommitId)
+            .build());
+
+    Assert.assertEquals("Partitions table should have two rows", 2, 
expected.size());
+    Assert.assertEquals("Actual results should have two rows", 2, 
actual.size());
     for (int i = 0; i < 2; i += 1) {
       TestHelpers.assertEqualsSafe(
           partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
     }
+
+    // check predicate push down
+    List<Row> filtered =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .filter("partition.id < 2")
+            .collectAsList();
+    Assert.assertEquals("Actual results should have one row", 1, 
filtered.size());
+    TestHelpers.assertEqualsSafe(
+        partitionsTable.schema().asStruct(), expected.get(0), filtered.get(0));
+
+    // check for snapshot expiration
+    // if snapshot with firstCommitId is expired,
+    // we expect the partition of id=1 will no longer have last updated 
timestamp and snapshotId
+    
SparkActions.get().expireSnapshots(table).expireSnapshotId(firstCommitId).execute();
+    GenericData.Record newPartitionRecord =
+        builder
+            .set("partition", partitionBuilder.set("id", 1).build())
+            .set("record_count", 1L)
+            .set("file_count", 1)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", null)
+            .set("last_updated_snapshot_id", null)
+            .build();
+    expected.remove(0);
+    expected.add(0, newPartitionRecord);
+
+    List<Row> actualAfterSnapshotExpiration =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .collectAsList();
+    Assert.assertEquals(
+        "Actual results should have two row", 2, 
actualAfterSnapshotExpiration.size());
+    for (int i = 0; i < 2; i += 1) {
+      TestHelpers.assertEqualsSafe(
+          partitionsTable.schema().asStruct(),
+          expected.get(i),
+          actualAfterSnapshotExpiration.get(i));
+    }
   }
 
   @Test
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index b89b78b23f..46ac0e96fe 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -1222,7 +1223,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   @Test
   public void testUnpartitionedPartitionsTable() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"unpartitioned_partitions_test");
-    createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+    Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
 
     Dataset<Row> df =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -1256,7 +1257,17 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
                 8,
                 "equality_delete_file_count",
                 Types.IntegerType.get(),
-                "Count of equality delete files"));
+                "Count of equality delete files"),
+            optional(
+                9,
+                "last_updated_ms",
+                Types.TimestampType.withZone(),
+                "Commit time of snapshot that last updated this partition"),
+            optional(
+                10,
+                "last_updated_snapshot_id",
+                Types.LongType.get(),
+                "Id of snapshot that last updated this partition"));
 
     Table partitionsTable = loadTable(tableIdentifier, "partitions");
 
@@ -1269,6 +1280,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         new 
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), 
"partitions"));
     GenericData.Record expectedRow =
         builder
+            .set("last_updated_ms", table.currentSnapshot().timestampMillis() 
* 1000)
+            .set("last_updated_snapshot_id", 
table.currentSnapshot().snapshotId())
             .set("record_count", 1L)
             .set("file_count", 1)
             .set("position_delete_record_count", 0L)
@@ -1314,6 +1327,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         .mode("append")
         .save(loadLocation(tableIdentifier));
 
+    table.refresh();
+    long secondCommitId = table.currentSnapshot().snapshotId();
+
     List<Row> actual =
         spark
             .read()
@@ -1339,6 +1355,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
             .build());
     expected.add(
         builder
@@ -1350,6 +1368,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(secondCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", secondCommitId)
             .build());
 
     Assert.assertEquals("Partitions table should have two rows", 2, 
expected.size());
@@ -1392,13 +1412,151 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
             .load(loadLocation(tableIdentifier, "partitions"))
             .filter("partition.id < 2 or record_count=1")
             .collectAsList();
-    Assert.assertEquals("Actual results should have one row", 2, 
nonFiltered.size());
+    Assert.assertEquals("Actual results should have two row", 2, 
nonFiltered.size());
     for (int i = 0; i < 2; i += 1) {
       TestHelpers.assertEqualsSafe(
           partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
     }
   }
 
+  @Test
+  public void testPartitionsTableLastUpdatedSnapshot() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"partitions_test");
+    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
+    Table partitionsTable = loadTable(tableIdentifier, "partitions");
+    Dataset<Row> df1 =
+        spark.createDataFrame(
+            Lists.newArrayList(new SimpleRecord(1, "1"), new SimpleRecord(2, 
"2")),
+            SimpleRecord.class);
+    Dataset<Row> df2 =
+        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "20")), 
SimpleRecord.class);
+
+    df1.select("id", "data")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    long firstCommitId = table.currentSnapshot().snapshotId();
+
+    // add a second file
+    df2.select("id", "data")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    long secondCommitId = table.currentSnapshot().snapshotId();
+
+    // check if rewrite manifest does not override metadata about data file's 
creating snapshot
+    RewriteManifests.Result rewriteManifestResult =
+        SparkActions.get().rewriteManifests(table).execute();
+    Assert.assertEquals(
+        "rewrite replaced 2 manifests",
+        2,
+        Iterables.size(rewriteManifestResult.rewrittenManifests()));
+    Assert.assertEquals(
+        "rewrite added 1 manifests", 1, 
Iterables.size(rewriteManifestResult.addedManifests()));
+
+    List<Row> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .orderBy("partition.id")
+            .collectAsList();
+
+    GenericRecordBuilder builder =
+        new 
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), 
"partitions"));
+    GenericRecordBuilder partitionBuilder =
+        new GenericRecordBuilder(
+            AvroSchemaUtil.convert(
+                partitionsTable.schema().findType("partition").asStructType(), 
"partition"));
+    List<GenericData.Record> expected = Lists.newArrayList();
+    expected.add(
+        builder
+            .set("partition", partitionBuilder.set("id", 1).build())
+            .set("record_count", 1L)
+            .set("file_count", 1)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
+            .build());
+    expected.add(
+        builder
+            .set("partition", partitionBuilder.set("id", 2).build())
+            .set("record_count", 2L)
+            .set("file_count", 2)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(secondCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", secondCommitId)
+            .build());
+
+    Assert.assertEquals("Partitions table should have two rows", 2, 
expected.size());
+    Assert.assertEquals("Actual results should have two rows", 2, 
actual.size());
+    for (int i = 0; i < 2; i += 1) {
+      TestHelpers.assertEqualsSafe(
+          partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
+    }
+
+    // check predicate push down
+    List<Row> filtered =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .filter("partition.id < 2")
+            .collectAsList();
+    Assert.assertEquals("Actual results should have one row", 1, 
filtered.size());
+    TestHelpers.assertEqualsSafe(
+        partitionsTable.schema().asStruct(), expected.get(0), filtered.get(0));
+
+    // check for snapshot expiration
+    // if snapshot with firstCommitId is expired,
+    // we expect the partition of id=1 will no longer have last updated 
timestamp and snapshotId
+    
SparkActions.get().expireSnapshots(table).expireSnapshotId(firstCommitId).execute();
+    GenericData.Record newPartitionRecord =
+        builder
+            .set("partition", partitionBuilder.set("id", 1).build())
+            .set("record_count", 1L)
+            .set("file_count", 1)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", null)
+            .set("last_updated_snapshot_id", null)
+            .build();
+    expected.remove(0);
+    expected.add(0, newPartitionRecord);
+
+    List<Row> actualAfterSnapshotExpiration =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .collectAsList();
+    Assert.assertEquals(
+        "Actual results should have two row", 2, 
actualAfterSnapshotExpiration.size());
+    for (int i = 0; i < 2; i += 1) {
+      TestHelpers.assertEqualsSafe(
+          partitionsTable.schema().asStruct(),
+          expected.get(i),
+          actualAfterSnapshotExpiration.get(i));
+    }
+  }
+
   @Test
   public void testPartitionsTableDeleteStats() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"partitions_test");
@@ -1416,6 +1574,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
+    long firstCommitId = table.currentSnapshot().snapshotId();
 
     // add a second file
     df2.select("id", "data")
@@ -1428,6 +1587,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
     DeleteFile deleteFile = writePosDeleteFile(table);
     table.newRowDelta().addDeletes(deleteFile).commit();
+    table.refresh();
+    long posDeleteCommitId = table.currentSnapshot().snapshotId();
 
     List<Row> actual =
         spark
@@ -1455,6 +1616,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
             .build());
     expected.add(
         builder
@@ -1466,7 +1629,10 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", posDeleteCommitId)
             .build());
+
     for (int i = 0; i < 2; i += 1) {
       TestHelpers.assertEqualsSafe(
           partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
@@ -1475,6 +1641,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // test equality delete
     DeleteFile eqDeleteFile = writeEqDeleteFile(table);
     table.newRowDelta().addDeletes(eqDeleteFile).commit();
+    table.refresh();
+    long eqDeleteCommitId = table.currentSnapshot().snapshotId();
     actual =
         spark
             .read()
@@ -1495,6 +1663,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 1L) // should be incremented 
now
             .set("equality_delete_file_count", 1) // should be incremented now
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", eqDeleteCommitId)
             .build());
     for (int i = 0; i < 2; i += 1) {
       TestHelpers.assertEqualsSafe(
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 5c2327c6c5..36c0597883 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -48,6 +48,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -1227,7 +1228,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   @Test
   public void testUnpartitionedPartitionsTable() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"unpartitioned_partitions_test");
-    createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+    Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
 
     Dataset<Row> df =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -1261,7 +1262,17 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
                 8,
                 "equality_delete_file_count",
                 Types.IntegerType.get(),
-                "Count of equality delete files"));
+                "Count of equality delete files"),
+            optional(
+                9,
+                "last_updated_ms",
+                Types.TimestampType.withZone(),
+                "Commit time of snapshot that last updated this partition"),
+            optional(
+                10,
+                "last_updated_snapshot_id",
+                Types.LongType.get(),
+                "Id of snapshot that last updated this partition"));
 
     Table partitionsTable = loadTable(tableIdentifier, "partitions");
 
@@ -1274,6 +1285,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         new 
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), 
"partitions"));
     GenericData.Record expectedRow =
         builder
+            .set("last_updated_ms", table.currentSnapshot().timestampMillis() 
* 1000)
+            .set("last_updated_snapshot_id", 
table.currentSnapshot().snapshotId())
             .set("record_count", 1L)
             .set("file_count", 1)
             .set("position_delete_record_count", 0L)
@@ -1319,6 +1332,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         .mode("append")
         .save(loadLocation(tableIdentifier));
 
+    table.refresh();
+    long secondCommitId = table.currentSnapshot().snapshotId();
+
     List<Row> actual =
         spark
             .read()
@@ -1344,6 +1360,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
             .build());
     expected.add(
         builder
@@ -1355,6 +1373,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(secondCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", secondCommitId)
             .build());
 
     Assert.assertEquals("Partitions table should have two rows", 2, 
expected.size());
@@ -1397,13 +1417,151 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
             .load(loadLocation(tableIdentifier, "partitions"))
             .filter("partition.id < 2 or record_count=1")
             .collectAsList();
-    Assert.assertEquals("Actual results should have one row", 2, 
nonFiltered.size());
+    Assert.assertEquals("Actual results should have two row", 2, 
nonFiltered.size());
     for (int i = 0; i < 2; i += 1) {
       TestHelpers.assertEqualsSafe(
           partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
     }
   }
 
+  @Test
+  public void testPartitionsTableLastUpdatedSnapshot() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"partitions_test");
+    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
+    Table partitionsTable = loadTable(tableIdentifier, "partitions");
+    Dataset<Row> df1 =
+        spark.createDataFrame(
+            Lists.newArrayList(new SimpleRecord(1, "1"), new SimpleRecord(2, 
"2")),
+            SimpleRecord.class);
+    Dataset<Row> df2 =
+        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "20")), 
SimpleRecord.class);
+
+    df1.select("id", "data")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    long firstCommitId = table.currentSnapshot().snapshotId();
+
+    // add a second file
+    df2.select("id", "data")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    long secondCommitId = table.currentSnapshot().snapshotId();
+
+    // check if rewrite manifest does not override metadata about data file's 
creating snapshot
+    RewriteManifests.Result rewriteManifestResult =
+        SparkActions.get().rewriteManifests(table).execute();
+    Assert.assertEquals(
+        "rewrite replaced 2 manifests",
+        2,
+        Iterables.size(rewriteManifestResult.rewrittenManifests()));
+    Assert.assertEquals(
+        "rewrite added 1 manifests", 1, 
Iterables.size(rewriteManifestResult.addedManifests()));
+
+    List<Row> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .orderBy("partition.id")
+            .collectAsList();
+
+    GenericRecordBuilder builder =
+        new 
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), 
"partitions"));
+    GenericRecordBuilder partitionBuilder =
+        new GenericRecordBuilder(
+            AvroSchemaUtil.convert(
+                partitionsTable.schema().findType("partition").asStructType(), 
"partition"));
+    List<GenericData.Record> expected = Lists.newArrayList();
+    expected.add(
+        builder
+            .set("partition", partitionBuilder.set("id", 1).build())
+            .set("record_count", 1L)
+            .set("file_count", 1)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
+            .build());
+    expected.add(
+        builder
+            .set("partition", partitionBuilder.set("id", 2).build())
+            .set("record_count", 2L)
+            .set("file_count", 2)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(secondCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", secondCommitId)
+            .build());
+
+    Assert.assertEquals("Partitions table should have two rows", 2, 
expected.size());
+    Assert.assertEquals("Actual results should have two rows", 2, 
actual.size());
+    for (int i = 0; i < 2; i += 1) {
+      TestHelpers.assertEqualsSafe(
+          partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
+    }
+
+    // check predicate push down
+    List<Row> filtered =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .filter("partition.id < 2")
+            .collectAsList();
+    Assert.assertEquals("Actual results should have one row", 1, 
filtered.size());
+    TestHelpers.assertEqualsSafe(
+        partitionsTable.schema().asStruct(), expected.get(0), filtered.get(0));
+
+    // check for snapshot expiration
+    // if snapshot with firstCommitId is expired,
+    // we expect the partition of id=1 will no longer have last updated 
timestamp and snapshotId
+    
SparkActions.get().expireSnapshots(table).expireSnapshotId(firstCommitId).execute();
+    GenericData.Record newPartitionRecord =
+        builder
+            .set("partition", partitionBuilder.set("id", 1).build())
+            .set("record_count", 1L)
+            .set("file_count", 1)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", null)
+            .set("last_updated_snapshot_id", null)
+            .build();
+    expected.remove(0);
+    expected.add(0, newPartitionRecord);
+
+    List<Row> actualAfterSnapshotExpiration =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .collectAsList();
+    Assert.assertEquals(
+        "Actual results should have two row", 2, 
actualAfterSnapshotExpiration.size());
+    for (int i = 0; i < 2; i += 1) {
+      TestHelpers.assertEqualsSafe(
+          partitionsTable.schema().asStruct(),
+          expected.get(i),
+          actualAfterSnapshotExpiration.get(i));
+    }
+  }
+
   @Test
   public void testPartitionsTableDeleteStats() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"partitions_test");
@@ -1421,6 +1579,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
+    long firstCommitId = table.currentSnapshot().snapshotId();
 
     // add a second file
     df2.select("id", "data")
@@ -1433,6 +1592,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
     DeleteFile deleteFile = writePosDeleteFile(table);
     table.newRowDelta().addDeletes(deleteFile).commit();
+    table.refresh();
+    long posDeleteCommitId = table.currentSnapshot().snapshotId();
 
     List<Row> actual =
         spark
@@ -1460,6 +1621,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
             .build());
     expected.add(
         builder
@@ -1471,7 +1634,10 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", posDeleteCommitId)
             .build());
+
     for (int i = 0; i < 2; i += 1) {
       TestHelpers.assertEqualsSafe(
           partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
@@ -1480,6 +1646,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // test equality delete
     DeleteFile eqDeleteFile = writeEqDeleteFile(table);
     table.newRowDelta().addDeletes(eqDeleteFile).commit();
+    table.refresh();
+    long eqDeleteCommitId = table.currentSnapshot().snapshotId();
     actual =
         spark
             .read()
@@ -1500,6 +1668,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 1L) // should be incremented 
now
             .set("equality_delete_file_count", 1) // should be incremented now
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", eqDeleteCommitId)
             .build());
     for (int i = 0; i < 2; i += 1) {
       TestHelpers.assertEqualsSafe(
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index a6687a4ce7..303931e21b 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -204,7 +205,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   }
 
   @Test
-  public void testEntriesTablePartitionedPrune() throws Exception {
+  public void testEntriesTablePartitionedPrune() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
 
@@ -233,7 +234,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   }
 
   @Test
-  public void testEntriesTableDataFilePrune() throws Exception {
+  public void testEntriesTableDataFilePrune() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
     Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
 
@@ -266,7 +267,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   }
 
   @Test
-  public void testEntriesTableDataFilePruneMulti() throws Exception {
+  public void testEntriesTableDataFilePruneMulti() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
     Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
 
@@ -304,7 +305,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   }
 
   @Test
-  public void testFilesSelectMap() throws Exception {
+  public void testFilesSelectMap() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
     Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
 
@@ -644,7 +645,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   }
 
   @Test
-  public void testAllMetadataTablesWithStagedCommits() throws Exception {
+  public void testAllMetadataTablesWithStagedCommits() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"stage_aggregate_table_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
 
@@ -691,8 +692,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     Assert.assertTrue(
         "Stage table should have some snapshots", 
table.snapshots().iterator().hasNext());
-    Assert.assertEquals(
-        "Stage table should have null currentSnapshot", null, 
table.currentSnapshot());
+    Assert.assertNull("Stage table should have null currentSnapshot", 
table.currentSnapshot());
     Assert.assertEquals("Actual results should have two rows", 2, 
actualAllData.size());
     Assert.assertEquals("Actual results should have two rows", 2, 
actualAllManifests.size());
     Assert.assertEquals("Actual results should have two rows", 2, 
actualAllEntries.size());
@@ -1212,8 +1212,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
                 snapshotManifest ->
                     manifestRecord(
                         manifestTable, snapshotManifest.first(), 
snapshotManifest.second()))
+            .sorted(Comparator.comparing(o -> o.get("path").toString()))
             .collect(Collectors.toList());
-    expected.sort(Comparator.comparing(o -> o.get("path").toString()));
 
     Assert.assertEquals("Manifests table should have 5 manifest rows", 5, 
actual.size());
     for (int i = 0; i < expected.size(); i += 1) {
@@ -1225,7 +1225,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   @Test
   public void testUnpartitionedPartitionsTable() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"unpartitioned_partitions_test");
-    createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+    Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.unpartitioned());
 
     Dataset<Row> df =
         spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
@@ -1259,7 +1259,17 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
                 8,
                 "equality_delete_file_count",
                 Types.IntegerType.get(),
-                "Count of equality delete files"));
+                "Count of equality delete files"),
+            optional(
+                9,
+                "last_updated_ms",
+                Types.TimestampType.withZone(),
+                "Commit time of snapshot that last updated this partition"),
+            optional(
+                10,
+                "last_updated_snapshot_id",
+                Types.LongType.get(),
+                "Id of snapshot that last updated this partition"));
 
     Table partitionsTable = loadTable(tableIdentifier, "partitions");
 
@@ -1272,6 +1282,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         new 
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), 
"partitions"));
     GenericData.Record expectedRow =
         builder
+            .set("last_updated_ms", table.currentSnapshot().timestampMillis() 
* 1000)
+            .set("last_updated_snapshot_id", 
table.currentSnapshot().snapshotId())
             .set("record_count", 1L)
             .set("file_count", 1)
             .set("position_delete_record_count", 0L)
@@ -1317,6 +1329,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         .mode("append")
         .save(loadLocation(tableIdentifier));
 
+    table.refresh();
+    long secondCommitId = table.currentSnapshot().snapshotId();
+
     List<Row> actual =
         spark
             .read()
@@ -1342,6 +1357,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
             .build());
     expected.add(
         builder
@@ -1353,6 +1370,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(secondCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", secondCommitId)
             .build());
 
     Assert.assertEquals("Partitions table should have two rows", 2, 
expected.size());
@@ -1395,13 +1414,151 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
             .load(loadLocation(tableIdentifier, "partitions"))
             .filter("partition.id < 2 or record_count=1")
             .collectAsList();
-    Assert.assertEquals("Actual results should have one row", 2, 
nonFiltered.size());
+    Assert.assertEquals("Actual results should have two row", 2, 
nonFiltered.size());
     for (int i = 0; i < 2; i += 1) {
       TestHelpers.assertEqualsSafe(
           partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
     }
   }
 
+  @Test
+  public void testPartitionsTableLastUpdatedSnapshot() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"partitions_test");
+    Table table = createTable(tableIdentifier, SCHEMA, SPEC);
+    Table partitionsTable = loadTable(tableIdentifier, "partitions");
+    Dataset<Row> df1 =
+        spark.createDataFrame(
+            Lists.newArrayList(new SimpleRecord(1, "1"), new SimpleRecord(2, 
"2")),
+            SimpleRecord.class);
+    Dataset<Row> df2 =
+        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "20")), 
SimpleRecord.class);
+
+    df1.select("id", "data")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    long firstCommitId = table.currentSnapshot().snapshotId();
+
+    // add a second file
+    df2.select("id", "data")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    long secondCommitId = table.currentSnapshot().snapshotId();
+
+    // check if rewrite manifest does not override metadata about data file's 
creating snapshot
+    RewriteManifests.Result rewriteManifestResult =
+        SparkActions.get().rewriteManifests(table).execute();
+    Assert.assertEquals(
+        "rewrite replaced 2 manifests",
+        2,
+        Iterables.size(rewriteManifestResult.rewrittenManifests()));
+    Assert.assertEquals(
+        "rewrite added 1 manifests", 1, 
Iterables.size(rewriteManifestResult.addedManifests()));
+
+    List<Row> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .orderBy("partition.id")
+            .collectAsList();
+
+    GenericRecordBuilder builder =
+        new 
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), 
"partitions"));
+    GenericRecordBuilder partitionBuilder =
+        new GenericRecordBuilder(
+            AvroSchemaUtil.convert(
+                partitionsTable.schema().findType("partition").asStructType(), 
"partition"));
+    List<GenericData.Record> expected = Lists.newArrayList();
+    expected.add(
+        builder
+            .set("partition", partitionBuilder.set("id", 1).build())
+            .set("record_count", 1L)
+            .set("file_count", 1)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
+            .build());
+    expected.add(
+        builder
+            .set("partition", partitionBuilder.set("id", 2).build())
+            .set("record_count", 2L)
+            .set("file_count", 2)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(secondCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", secondCommitId)
+            .build());
+
+    Assert.assertEquals("Partitions table should have two rows", 2, 
expected.size());
+    Assert.assertEquals("Actual results should have two rows", 2, 
actual.size());
+    for (int i = 0; i < 2; i += 1) {
+      TestHelpers.assertEqualsSafe(
+          partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
+    }
+
+    // check predicate push down
+    List<Row> filtered =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .filter("partition.id < 2")
+            .collectAsList();
+    Assert.assertEquals("Actual results should have one row", 1, 
filtered.size());
+    TestHelpers.assertEqualsSafe(
+        partitionsTable.schema().asStruct(), expected.get(0), filtered.get(0));
+
+    // check for snapshot expiration
+    // if snapshot with firstCommitId is expired,
+    // we expect the partition of id=1 will no longer have last updated 
timestamp and snapshotId
+    
SparkActions.get().expireSnapshots(table).expireSnapshotId(firstCommitId).execute();
+    GenericData.Record newPartitionRecord =
+        builder
+            .set("partition", partitionBuilder.set("id", 1).build())
+            .set("record_count", 1L)
+            .set("file_count", 1)
+            .set("position_delete_record_count", 0L)
+            .set("position_delete_file_count", 0)
+            .set("equality_delete_record_count", 0L)
+            .set("equality_delete_file_count", 0)
+            .set("spec_id", 0)
+            .set("last_updated_ms", null)
+            .set("last_updated_snapshot_id", null)
+            .build();
+    expected.remove(0);
+    expected.add(0, newPartitionRecord);
+
+    List<Row> actualAfterSnapshotExpiration =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "partitions"))
+            .collectAsList();
+    Assert.assertEquals(
+        "Actual results should have two row", 2, 
actualAfterSnapshotExpiration.size());
+    for (int i = 0; i < 2; i += 1) {
+      TestHelpers.assertEqualsSafe(
+          partitionsTable.schema().asStruct(),
+          expected.get(i),
+          actualAfterSnapshotExpiration.get(i));
+    }
+  }
+
   @Test
   public void testPartitionsTableDeleteStats() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"partitions_test");
@@ -1419,6 +1576,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
+    long firstCommitId = table.currentSnapshot().snapshotId();
 
     // add a second file
     df2.select("id", "data")
@@ -1431,6 +1589,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
     DeleteFile deleteFile = writePosDeleteFile(table);
     table.newRowDelta().addDeletes(deleteFile).commit();
+    table.refresh();
+    long posDeleteCommitId = table.currentSnapshot().snapshotId();
 
     List<Row> actual =
         spark
@@ -1458,6 +1618,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(firstCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", firstCommitId)
             .build());
     expected.add(
         builder
@@ -1469,7 +1631,10 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", posDeleteCommitId)
             .build());
+
     for (int i = 0; i < 2; i += 1) {
       TestHelpers.assertEqualsSafe(
           partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
@@ -1478,6 +1643,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     // test equality delete
     DeleteFile eqDeleteFile = writeEqDeleteFile(table);
     table.newRowDelta().addDeletes(eqDeleteFile).commit();
+    table.refresh();
+    long eqDeleteCommitId = table.currentSnapshot().snapshotId();
     actual =
         spark
             .read()
@@ -1497,7 +1664,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 1L) // should be incremented 
now
             .set("equality_delete_file_count", 1) // should be incremented now
-            .set("spec_id", 0)
+            .set("last_updated_ms", 
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
+            .set("last_updated_snapshot_id", eqDeleteCommitId)
             .build());
     for (int i = 0; i < 2; i += 1) {
       TestHelpers.assertEqualsSafe(
@@ -1771,7 +1939,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   }
 
   @Test
-  public void testFilesTablePartitionId() throws Exception {
+  public void testFilesTablePartitionId() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
     Table table =
         createTable(
@@ -1811,7 +1979,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   }
 
   @Test
-  public void testAllManifestTableSnapshotFiltering() throws Exception {
+  public void testAllManifestTableSnapshotFiltering() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"all_manifest_snapshot_filtering");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table manifestTable = loadTable(tableIdentifier, "all_manifests");
@@ -1880,8 +2048,8 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
                 snapshotManifest ->
                     manifestRecord(
                         manifestTable, snapshotManifest.first(), 
snapshotManifest.second()))
+            .sorted(Comparator.comparing(o -> o.get("path").toString()))
             .collect(Collectors.toList());
-    expected.sort(Comparator.comparing(o -> o.get("path").toString()));
 
     Assert.assertEquals("Manifests table should have 3 manifest rows", 3, 
actual.size());
     for (int i = 0; i < expected.size(); i += 1) {

Reply via email to