This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a29c6cebfd Core: Add last updated timestamp and snapshotId for
Partitions table (#7581)
a29c6cebfd is described below
commit a29c6cebfd39b8c628cf901e7c37cf18a2fd064a
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Fri Jun 23 14:32:13 2023 -0700
Core: Add last updated timestamp and snapshotId for Partitions table (#7581)
---
.../java/org/apache/iceberg/PartitionsTable.java | 78 +++++---
.../apache/iceberg/MetadataTableScanTestBase.java | 12 +-
.../org/apache/iceberg/TestMetadataTableScans.java | 165 +++++++++--------
...stMetadataTableScansWithPartitionEvolution.java | 32 ++--
.../spark/source/TestIcebergSourceTablesBase.java | 164 ++++++++++++++++-
.../spark/source/TestIcebergSourceTablesBase.java | 176 +++++++++++++++++-
.../spark/source/TestIcebergSourceTablesBase.java | 176 +++++++++++++++++-
.../spark/source/TestIcebergSourceTablesBase.java | 198 +++++++++++++++++++--
8 files changed, 855 insertions(+), 146 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index b46239352d..f072a7343f 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -72,7 +72,17 @@ public class PartitionsTable extends BaseMetadataTable {
8,
"equality_delete_file_count",
Types.IntegerType.get(),
- "Count of equality delete files"));
+ "Count of equality delete files"),
+ Types.NestedField.optional(
+ 9,
+ "last_updated_ms",
+ Types.TimestampType.withZone(),
+ "Commit time of snapshot that last updated this partition"),
+ Types.NestedField.optional(
+ 10,
+ "last_updated_snapshot_id",
+ Types.LongType.get(),
+ "Id of snapshot that last updated this partition"));
this.unpartitionedTable =
Partitioning.partitionType(table).fields().isEmpty();
}
@@ -90,7 +100,9 @@ public class PartitionsTable extends BaseMetadataTable {
"position_delete_record_count",
"position_delete_file_count",
"equality_delete_record_count",
- "equality_delete_file_count");
+ "equality_delete_file_count",
+ "last_updated_ms",
+ "last_updated_snapshot_id");
}
return schema;
}
@@ -116,7 +128,9 @@ public class PartitionsTable extends BaseMetadataTable {
root.posDeleteRecordCount,
root.posDeleteFileCount,
root.eqDeleteRecordCount,
- root.eqDeleteFileCount));
+ root.eqDeleteFileCount,
+ root.lastUpdatedMs,
+ root.lastUpdatedSnapshotId));
} else {
return StaticDataTask.of(
io().newInputFile(table().operations().current().metadataFileLocation()),
@@ -136,19 +150,22 @@ public class PartitionsTable extends BaseMetadataTable {
partition.posDeleteRecordCount,
partition.posDeleteFileCount,
partition.eqDeleteRecordCount,
- partition.eqDeleteFileCount);
+ partition.eqDeleteFileCount,
+ partition.lastUpdatedMs,
+ partition.lastUpdatedSnapshotId);
}
private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
Types.StructType partitionType = Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap(partitionType);
-
- try (CloseableIterable<ContentFile<?>> files = planFiles(scan)) {
- for (ContentFile<?> file : files) {
+ try (CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries =
planEntries(scan)) {
+ for (ManifestEntry<? extends ContentFile<?>> entry : entries) {
+ Snapshot snapshot = table.snapshot(entry.snapshotId());
+ ContentFile<?> file = entry.file();
StructLike partition =
PartitionUtil.coercePartition(
partitionType, table.specs().get(file.specId()),
file.partition());
- partitions.get(partition).update(file);
+ partitions.get(partition).update(file, snapshot);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -158,25 +175,32 @@ public class PartitionsTable extends BaseMetadataTable {
}
@VisibleForTesting
- static CloseableIterable<ContentFile<?>> planFiles(StaticTableScan scan) {
+ static CloseableIterable<ManifestEntry<?>> planEntries(StaticTableScan scan)
{
Table table = scan.table();
CloseableIterable<ManifestFile> filteredManifests =
filteredManifests(scan, table,
scan.snapshot().allManifests(table.io()));
- Iterable<CloseableIterable<ContentFile<?>>> tasks =
- CloseableIterable.transform(
- filteredManifests,
- manifest ->
- CloseableIterable.transform(
- ManifestFiles.open(manifest, table.io(), table.specs())
- .caseSensitive(scan.isCaseSensitive())
- .select(scanColumns(manifest.content())), // don't
select stats columns
- t -> (ContentFile<?>) t));
+ Iterable<CloseableIterable<ManifestEntry<?>>> tasks =
+ CloseableIterable.transform(filteredManifests, manifest ->
readEntries(manifest, scan));
return new ParallelIterable<>(tasks, scan.planExecutor());
}
+ private static CloseableIterable<ManifestEntry<?>> readEntries(
+ ManifestFile manifest, StaticTableScan scan) {
+ Table table = scan.table();
+ return CloseableIterable.transform(
+ ManifestFiles.open(manifest, table.io(), table.specs())
+ .caseSensitive(scan.isCaseSensitive())
+ .select(scanColumns(manifest.content())) // don't select stats
columns
+ .entries(),
+ t ->
+ (ManifestEntry<? extends ContentFile<?>>)
+ // defensive copy of manifest entry without stats columns
+ t.copyWithoutStats());
+ }
+
private static List<String> scanColumns(ManifestContent content) {
switch (content) {
case DATA:
@@ -249,19 +273,29 @@ public class PartitionsTable extends BaseMetadataTable {
private int posDeleteFileCount;
private long eqDeleteRecordCount;
private int eqDeleteFileCount;
+ private Long lastUpdatedMs;
+ private Long lastUpdatedSnapshotId;
Partition(StructLike key, Types.StructType keyType) {
this.partitionData = toPartitionData(key, keyType);
this.specId = 0;
- this.dataRecordCount = 0;
+ this.dataRecordCount = 0L;
this.dataFileCount = 0;
- this.posDeleteRecordCount = 0;
+ this.posDeleteRecordCount = 0L;
this.posDeleteFileCount = 0;
- this.eqDeleteRecordCount = 0;
+ this.eqDeleteRecordCount = 0L;
this.eqDeleteFileCount = 0;
}
- void update(ContentFile<?> file) {
+ void update(ContentFile<?> file, Snapshot snapshot) {
+ if (snapshot != null) {
+ long snapshotCommitTime = snapshot.timestampMillis() * 1000;
+ if (this.lastUpdatedMs == null || snapshotCommitTime >
this.lastUpdatedMs) {
+ this.lastUpdatedMs = snapshotCommitTime;
+ this.lastUpdatedSnapshotId = snapshot.snapshotId();
+ }
+ }
+
switch (file.content()) {
case DATA:
this.dataRecordCount += file.recordCount();
diff --git
a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
index 4c8f25c45b..b5ef31a50c 100644
--- a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
@@ -81,18 +81,20 @@ public abstract class MetadataTableScanTestBase extends
TableTestBase {
}
protected void validateSingleFieldPartition(
- CloseableIterable<ContentFile<?>> files, int partitionValue) {
+ CloseableIterable<ManifestEntry<?>> files, int partitionValue) {
validatePartition(files, 0, partitionValue);
}
protected void validatePartition(
- CloseableIterable<ContentFile<?>> files, int position, int
partitionValue) {
+ CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries,
+ int position,
+ int partitionValue) {
Assert.assertTrue(
"File scan tasks do not include correct file",
- StreamSupport.stream(files.spliterator(), false)
+ StreamSupport.stream(entries.spliterator(), false)
.anyMatch(
- file -> {
- StructLike partition = file.partition();
+ entry -> {
+ StructLike partition = entry.file().partition();
if (position >= partition.size()) {
return false;
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 9f3ee5c9c3..2e34f2e6da 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -317,18 +317,18 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
TableScan scanNoFilter =
partitionsTable.newScan().select("partition.data_bucket");
Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
- CloseableIterable<ContentFile<?>> files =
- PartitionsTable.planFiles((StaticTableScan) scanNoFilter);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scanNoFilter);
if (formatVersion == 2) {
- Assert.assertEquals(8, Iterators.size(files.iterator()));
+ Assert.assertEquals(8, Iterators.size(entries.iterator()));
} else {
- Assert.assertEquals(4, Iterators.size(files.iterator()));
+ Assert.assertEquals(4, Iterators.size(entries.iterator()));
}
- validateSingleFieldPartition(files, 0);
- validateSingleFieldPartition(files, 1);
- validateSingleFieldPartition(files, 2);
- validateSingleFieldPartition(files, 3);
+ validateSingleFieldPartition(entries, 0);
+ validateSingleFieldPartition(entries, 1);
+ validateSingleFieldPartition(entries, 2);
+ validateSingleFieldPartition(entries, 3);
}
@Test
@@ -342,18 +342,18 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
TableScan scanWithProjection =
partitionsTable.newScan().select("file_count");
Assert.assertEquals(expected, scanWithProjection.schema().asStruct());
- CloseableIterable<ContentFile<?>> files =
- PartitionsTable.planFiles((StaticTableScan) scanWithProjection);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scanWithProjection);
if (formatVersion == 2) {
- Assert.assertEquals(8, Iterators.size(files.iterator()));
+ Assert.assertEquals(8, Iterators.size(entries.iterator()));
} else {
- Assert.assertEquals(4, Iterators.size(files.iterator()));
+ Assert.assertEquals(4, Iterators.size(entries.iterator()));
}
- validateSingleFieldPartition(files, 0);
- validateSingleFieldPartition(files, 1);
- validateSingleFieldPartition(files, 2);
- validateSingleFieldPartition(files, 3);
+ validateSingleFieldPartition(entries, 0);
+ validateSingleFieldPartition(entries, 1);
+ validateSingleFieldPartition(entries, 2);
+ validateSingleFieldPartition(entries, 3);
}
@Test
@@ -361,14 +361,15 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
table.newFastAppend().appendFile(FILE_WITH_STATS).commit();
Table partitionsTable = new PartitionsTable(table);
- CloseableIterable<ContentFile<?>> tasksAndEq =
- PartitionsTable.planFiles((StaticTableScan) partitionsTable.newScan());
- for (ContentFile<?> file : tasksAndEq) {
- Assert.assertNull(file.columnSizes());
- Assert.assertNull(file.valueCounts());
- Assert.assertNull(file.nullValueCounts());
- Assert.assertNull(file.lowerBounds());
- Assert.assertNull(file.upperBounds());
+ CloseableIterable<ManifestEntry<?>> tasksAndEq =
+ PartitionsTable.planEntries((StaticTableScan)
partitionsTable.newScan());
+ for (ManifestEntry<? extends ContentFile<?>> task : tasksAndEq) {
+ Assert.assertNull(task.file().columnSizes());
+ Assert.assertNull(task.file().valueCounts());
+ Assert.assertNull(task.file().nullValueCounts());
+ Assert.assertNull(task.file().nanValueCounts());
+ Assert.assertNull(task.file().lowerBounds());
+ Assert.assertNull(task.file().upperBounds());
}
}
@@ -383,15 +384,15 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
- CloseableIterable<ContentFile<?>> files =
- PartitionsTable.planFiles((StaticTableScan) scanAndEq);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scanAndEq);
if (formatVersion == 2) {
- Assert.assertEquals(2, Iterators.size(files.iterator()));
+ Assert.assertEquals(2, Iterators.size(entries.iterator()));
} else {
- Assert.assertEquals(1, Iterators.size(files.iterator()));
+ Assert.assertEquals(1, Iterators.size(entries.iterator()));
}
- validateSingleFieldPartition(files, 0);
+ validateSingleFieldPartition(entries, 0);
}
@Test
@@ -405,16 +406,16 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expressions.lessThan("partition.data_bucket", 2),
Expressions.greaterThan("record_count", 0));
TableScan scanLtAnd = partitionsTable.newScan().filter(ltAnd);
- CloseableIterable<ContentFile<?>> files =
- PartitionsTable.planFiles((StaticTableScan) scanLtAnd);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scanLtAnd);
if (formatVersion == 2) {
- Assert.assertEquals(4, Iterators.size(files.iterator()));
+ Assert.assertEquals(4, Iterators.size(entries.iterator()));
} else {
- Assert.assertEquals(2, Iterators.size(files.iterator()));
+ Assert.assertEquals(2, Iterators.size(entries.iterator()));
}
- validateSingleFieldPartition(files, 0);
- validateSingleFieldPartition(files, 1);
+ validateSingleFieldPartition(entries, 0);
+ validateSingleFieldPartition(entries, 1);
}
@Test
@@ -429,17 +430,18 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expressions.greaterThan("record_count", 0));
TableScan scanOr = partitionsTable.newScan().filter(or);
- CloseableIterable<ContentFile<?>> files =
PartitionsTable.planFiles((StaticTableScan) scanOr);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scanOr);
if (formatVersion == 2) {
- Assert.assertEquals(8, Iterators.size(files.iterator()));
+ Assert.assertEquals(8, Iterators.size(entries.iterator()));
} else {
- Assert.assertEquals(4, Iterators.size(files.iterator()));
+ Assert.assertEquals(4, Iterators.size(entries.iterator()));
}
- validateSingleFieldPartition(files, 0);
- validateSingleFieldPartition(files, 1);
- validateSingleFieldPartition(files, 2);
- validateSingleFieldPartition(files, 3);
+ validateSingleFieldPartition(entries, 0);
+ validateSingleFieldPartition(entries, 1);
+ validateSingleFieldPartition(entries, 2);
+ validateSingleFieldPartition(entries, 3);
}
@Test
@@ -449,15 +451,16 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expression not =
Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
TableScan scanNot = partitionsTable.newScan().filter(not);
- CloseableIterable<ContentFile<?>> files =
PartitionsTable.planFiles((StaticTableScan) scanNot);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scanNot);
if (formatVersion == 2) {
- Assert.assertEquals(4, Iterators.size(files.iterator()));
+ Assert.assertEquals(4, Iterators.size(entries.iterator()));
} else {
- Assert.assertEquals(2, Iterators.size(files.iterator()));
+ Assert.assertEquals(2, Iterators.size(entries.iterator()));
}
- validateSingleFieldPartition(files, 2);
- validateSingleFieldPartition(files, 3);
+ validateSingleFieldPartition(entries, 2);
+ validateSingleFieldPartition(entries, 3);
}
@Test
@@ -468,15 +471,16 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expression set = Expressions.in("partition.data_bucket", 2, 3);
TableScan scanSet = partitionsTable.newScan().filter(set);
- CloseableIterable<ContentFile<?>> files =
PartitionsTable.planFiles((StaticTableScan) scanSet);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scanSet);
if (formatVersion == 2) {
- Assert.assertEquals(4, Iterators.size(files.iterator()));
+ Assert.assertEquals(4, Iterators.size(entries.iterator()));
} else {
- Assert.assertEquals(2, Iterators.size(files.iterator()));
+ Assert.assertEquals(2, Iterators.size(entries.iterator()));
}
- validateSingleFieldPartition(files, 2);
- validateSingleFieldPartition(files, 3);
+ validateSingleFieldPartition(entries, 2);
+ validateSingleFieldPartition(entries, 3);
}
@Test
@@ -487,18 +491,18 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expression unary = Expressions.notNull("partition.data_bucket");
TableScan scanUnary = partitionsTable.newScan().filter(unary);
- CloseableIterable<ContentFile<?>> files =
- PartitionsTable.planFiles((StaticTableScan) scanUnary);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scanUnary);
if (formatVersion == 2) {
- Assert.assertEquals(8, Iterators.size(files.iterator()));
+ Assert.assertEquals(8, Iterators.size(entries.iterator()));
} else {
- Assert.assertEquals(4, Iterators.size(files.iterator()));
+ Assert.assertEquals(4, Iterators.size(entries.iterator()));
}
- validateSingleFieldPartition(files, 0);
- validateSingleFieldPartition(files, 1);
- validateSingleFieldPartition(files, 2);
- validateSingleFieldPartition(files, 3);
+ validateSingleFieldPartition(entries, 0);
+ validateSingleFieldPartition(entries, 1);
+ validateSingleFieldPartition(entries, 2);
+ validateSingleFieldPartition(entries, 3);
}
@Test
@@ -788,13 +792,14 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expressions.and(
Expressions.equal("partition.id", 10),
Expressions.greaterThan("record_count", 0));
TableScan scan = metadataTable.newScan().filter(filter);
- CloseableIterable<ContentFile<?>> files =
PartitionsTable.planFiles((StaticTableScan) scan);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scan);
if (formatVersion == 2) {
// Four data files and delete files of old spec, one new data file of
new spec
- Assert.assertEquals(9, Iterables.size(files));
+ Assert.assertEquals(9, Iterables.size(entries));
} else {
// Four data files of old spec, one new data file of new spec
- Assert.assertEquals(5, Iterables.size(files));
+ Assert.assertEquals(5, Iterables.size(entries));
}
filter =
@@ -802,15 +807,15 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
scan = metadataTable.newScan().filter(filter);
- files = PartitionsTable.planFiles((StaticTableScan) scan);
+ entries = PartitionsTable.planEntries((StaticTableScan) scan);
if (formatVersion == 2) {
// 1 original data file and delete file written by old spec, plus 1 new
data file written by
// new spec
- Assert.assertEquals(3, Iterables.size(files));
+ Assert.assertEquals(3, Iterables.size(entries));
} else {
// 1 original data file written by old spec, plus 1 new data file
written by new spec
- Assert.assertEquals(2, Iterables.size(files));
+ Assert.assertEquals(2, Iterables.size(entries));
}
}
@@ -852,14 +857,15 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expressions.and(
Expressions.equal("partition.id", 10),
Expressions.greaterThan("record_count", 0));
TableScan scan = metadataTable.newScan().filter(filter);
- CloseableIterable<ContentFile<?>> files =
PartitionsTable.planFiles((StaticTableScan) scan);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scan);
if (formatVersion == 2) {
// Four data and delete files of original spec, one data file written by
new spec
- Assert.assertEquals(9, Iterables.size(files));
+ Assert.assertEquals(9, Iterables.size(entries));
} else {
// Four data files of original spec, one data file written by new spec
- Assert.assertEquals(5, Iterables.size(files));
+ Assert.assertEquals(5, Iterables.size(entries));
}
// Filter for a dropped partition spec field. Correct behavior is that
only old partitions are
@@ -869,11 +875,11 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
scan = metadataTable.newScan().filter(filter);
- files = PartitionsTable.planFiles((StaticTableScan) scan);
+ entries = PartitionsTable.planEntries((StaticTableScan) scan);
if (formatVersion == 1) {
// 1 original data file written by old spec
- Assert.assertEquals(1, Iterables.size(files));
+ Assert.assertEquals(1, Iterables.size(entries));
} else {
// 1 original data and 1 delete files written by old spec, plus both of
new data file/delete
// file written by new spec
@@ -888,7 +894,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
// schema.
// The Partition table final schema is a union of fields of all specs,
including dropped
// fields.
- Assert.assertEquals(4, Iterables.size(files));
+ Assert.assertEquals(4, Iterables.size(entries));
}
}
@@ -939,10 +945,10 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
Expressions.equal("partition.partition", 0),
Expressions.greaterThan("record_count", 0));
TableScan scanAndEq = partitionsTable.newScan().filter(andEquals);
- CloseableIterable<ContentFile<?>> files =
- PartitionsTable.planFiles((StaticTableScan) scanAndEq);
- Assert.assertEquals(1, Iterators.size(files.iterator()));
- validateSingleFieldPartition(files, 0);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scanAndEq);
+ Assert.assertEquals(1, Iterators.size(entries.iterator()));
+ validateSingleFieldPartition(entries, 0);
}
@Test
@@ -1010,11 +1016,12 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
true); // daemon threads will be terminated abruptly
when the JVM exits
return thread;
}));
- CloseableIterable<ContentFile<?>> files =
PartitionsTable.planFiles((StaticTableScan) scan);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scan);
if (formatVersion == 2) {
- Assert.assertEquals(8, Iterators.size(files.iterator()));
+ Assert.assertEquals(8, Iterators.size(entries.iterator()));
} else {
- Assert.assertEquals(4, Iterators.size(files.iterator()));
+ Assert.assertEquals(4, Iterators.size(entries.iterator()));
}
Assert.assertTrue("Thread should be created in provided pool",
planThreadsIndex.get() > 0);
diff --git
a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
index b6e1ecae4f..92fa080dfe 100644
---
a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
+++
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
@@ -140,7 +140,7 @@ public class TestMetadataTableScansWithPartitionEvolution
extends MetadataTableS
}
@Test
- public void testPartitionsTableScanWithAddPartitionOnNestedField() throws
IOException {
+ public void testPartitionsTableScanWithAddPartitionOnNestedField() {
Table partitionsTable = new PartitionsTable(table);
Types.StructType idPartition =
new Schema(
@@ -154,15 +154,15 @@ public class TestMetadataTableScansWithPartitionEvolution
extends MetadataTableS
TableScan scanNoFilter = partitionsTable.newScan().select("partition");
Assert.assertEquals(idPartition, scanNoFilter.schema().asStruct());
- CloseableIterable<ContentFile<?>> files =
- PartitionsTable.planFiles((StaticTableScan) scanNoFilter);
- Assert.assertEquals(4, Iterators.size(files.iterator()));
- validatePartition(files, 0, 0);
- validatePartition(files, 0, 1);
- validatePartition(files, 0, 2);
- validatePartition(files, 0, 3);
- validatePartition(files, 1, 2);
- validatePartition(files, 1, 3);
+ CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan) scanNoFilter);
+ Assert.assertEquals(4, Iterators.size(entries.iterator()));
+ validatePartition(entries, 0, 0);
+ validatePartition(entries, 0, 1);
+ validatePartition(entries, 0, 2);
+ validatePartition(entries, 0, 3);
+ validatePartition(entries, 1, 2);
+ validatePartition(entries, 1, 3);
}
@Test
@@ -241,16 +241,16 @@ public class TestMetadataTableScansWithPartitionEvolution
extends MetadataTableS
// must contain the partition column even when the current spec is
non-partitioned.
Assertions.assertThat(partitionsTable.schema().findField("partition")).isNotNull();
- try (CloseableIterable<ContentFile<?>> files =
- PartitionsTable.planFiles((StaticTableScan)
partitionsTable.newScan())) {
+ try (CloseableIterable<ManifestEntry<?>> entries =
+ PartitionsTable.planEntries((StaticTableScan)
partitionsTable.newScan())) {
// four partitioned data files and one non-partitioned data file.
- Assertions.assertThat(files).hasSize(5);
+ Assertions.assertThat(entries).hasSize(5);
// check for null partition value.
- Assertions.assertThat(StreamSupport.stream(files.spliterator(), false))
+ Assertions.assertThat(StreamSupport.stream(entries.spliterator(), false))
.anyMatch(
- file -> {
- StructLike partition = file.partition();
+ entry -> {
+ StructLike partition = entry.file().partition();
return Objects.equals(null, partition.get(0, Object.class));
});
}
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 2dda346890..ff8bf0e7be 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -1217,7 +1218,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
@Test
public void testUnpartitionedPartitionsTable() {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"unpartitioned_partitions_test");
- createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+ Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
Dataset<Row> df =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -1251,7 +1252,17 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
8,
"equality_delete_file_count",
Types.IntegerType.get(),
- "Count of equality delete files"));
+ "Count of equality delete files"),
+ optional(
+ 9,
+ "last_updated_ms",
+ Types.TimestampType.withZone(),
+ "Commit time of snapshot that last updated this partition"),
+ optional(
+ 10,
+ "last_updated_snapshot_id",
+ Types.LongType.get(),
+ "Id of snapshot that last updated this partition"));
Table partitionsTable = loadTable(tableIdentifier, "partitions");
@@ -1264,6 +1275,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
GenericData.Record expectedRow =
builder
+ .set("last_updated_ms", table.currentSnapshot().timestampMillis()
* 1000)
+ .set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
.set("position_delete_record_count", 0L)
@@ -1309,6 +1322,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.mode("append")
.save(loadLocation(tableIdentifier));
+ table.refresh();
+ long secondCommitId = table.currentSnapshot().snapshotId();
+
List<Row> actual =
spark
.read()
@@ -1334,6 +1350,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
builder
@@ -1345,6 +1363,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", secondCommitId)
.build());
Assert.assertEquals("Partitions table should have two rows", 2,
expected.size());
@@ -1387,11 +1407,149 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.load(loadLocation(tableIdentifier, "partitions"))
.filter("partition.id < 2 or record_count=1")
.collectAsList();
- Assert.assertEquals("Actual results should have one row", 2,
nonFiltered.size());
+ Assert.assertEquals("Actual results should have two row", 2,
nonFiltered.size());
+ for (int i = 0; i < 2; i += 1) {
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
+ }
+ }
+
+ @Test
+ public void testPartitionsTableLastUpdatedSnapshot() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db",
"partitions_test");
+ Table table = createTable(tableIdentifier, SCHEMA, SPEC);
+ Table partitionsTable = loadTable(tableIdentifier, "partitions");
+ Dataset<Row> df1 =
+ spark.createDataFrame(
+ Lists.newArrayList(new SimpleRecord(1, "1"), new SimpleRecord(2,
"2")),
+ SimpleRecord.class);
+ Dataset<Row> df2 =
+ spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "20")),
SimpleRecord.class);
+
+ df1.select("id", "data")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+ long firstCommitId = table.currentSnapshot().snapshotId();
+
+ // add a second file
+ df2.select("id", "data")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+ long secondCommitId = table.currentSnapshot().snapshotId();
+
+ // check if rewrite manifest does not override metadata about data file's
creating snapshot
+ RewriteManifests.Result rewriteManifestResult =
+ SparkActions.get().rewriteManifests(table).execute();
+ Assert.assertEquals(
+ "rewrite replaced 2 manifests",
+ 2,
+ Iterables.size(rewriteManifestResult.rewrittenManifests()));
+ Assert.assertEquals(
+ "rewrite added 1 manifests", 1,
Iterables.size(rewriteManifestResult.addedManifests()));
+
+ List<Row> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .orderBy("partition.id")
+ .collectAsList();
+
+ GenericRecordBuilder builder =
+ new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
+ GenericRecordBuilder partitionBuilder =
+ new GenericRecordBuilder(
+ AvroSchemaUtil.convert(
+ partitionsTable.schema().findType("partition").asStructType(),
"partition"));
+ List<GenericData.Record> expected = Lists.newArrayList();
+ expected.add(
+ builder
+ .set("partition", partitionBuilder.set("id", 1).build())
+ .set("record_count", 1L)
+ .set("file_count", 1)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
+ .build());
+ expected.add(
+ builder
+ .set("partition", partitionBuilder.set("id", 2).build())
+ .set("record_count", 2L)
+ .set("file_count", 2)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", secondCommitId)
+ .build());
+
+ Assert.assertEquals("Partitions table should have two rows", 2,
expected.size());
+ Assert.assertEquals("Actual results should have two rows", 2,
actual.size());
for (int i = 0; i < 2; i += 1) {
TestHelpers.assertEqualsSafe(
partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
}
+
+ // check predicate push down
+ List<Row> filtered =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .filter("partition.id < 2")
+ .collectAsList();
+ Assert.assertEquals("Actual results should have one row", 1,
filtered.size());
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(), expected.get(0), filtered.get(0));
+
+ // check for snapshot expiration
+ // if snapshot with firstCommitId is expired,
+ // we expect the partition of id=1 will no longer have last updated
timestamp and snapshotId
+
SparkActions.get().expireSnapshots(table).expireSnapshotId(firstCommitId).execute();
+ GenericData.Record newPartitionRecord =
+ builder
+ .set("partition", partitionBuilder.set("id", 1).build())
+ .set("record_count", 1L)
+ .set("file_count", 1)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms", null)
+ .set("last_updated_snapshot_id", null)
+ .build();
+ expected.remove(0);
+ expected.add(0, newPartitionRecord);
+
+ List<Row> actualAfterSnapshotExpiration =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .collectAsList();
+ Assert.assertEquals(
+ "Actual results should have two row", 2,
actualAfterSnapshotExpiration.size());
+ for (int i = 0; i < 2; i += 1) {
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(),
+ expected.get(i),
+ actualAfterSnapshotExpiration.get(i));
+ }
}
@Test
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index b89b78b23f..46ac0e96fe 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -1222,7 +1223,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
@Test
public void testUnpartitionedPartitionsTable() {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"unpartitioned_partitions_test");
- createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+ Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
Dataset<Row> df =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -1256,7 +1257,17 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
8,
"equality_delete_file_count",
Types.IntegerType.get(),
- "Count of equality delete files"));
+ "Count of equality delete files"),
+ optional(
+ 9,
+ "last_updated_ms",
+ Types.TimestampType.withZone(),
+ "Commit time of snapshot that last updated this partition"),
+ optional(
+ 10,
+ "last_updated_snapshot_id",
+ Types.LongType.get(),
+ "Id of snapshot that last updated this partition"));
Table partitionsTable = loadTable(tableIdentifier, "partitions");
@@ -1269,6 +1280,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
GenericData.Record expectedRow =
builder
+ .set("last_updated_ms", table.currentSnapshot().timestampMillis()
* 1000)
+ .set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
.set("position_delete_record_count", 0L)
@@ -1314,6 +1327,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.mode("append")
.save(loadLocation(tableIdentifier));
+ table.refresh();
+ long secondCommitId = table.currentSnapshot().snapshotId();
+
List<Row> actual =
spark
.read()
@@ -1339,6 +1355,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
builder
@@ -1350,6 +1368,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", secondCommitId)
.build());
Assert.assertEquals("Partitions table should have two rows", 2,
expected.size());
@@ -1392,13 +1412,151 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.load(loadLocation(tableIdentifier, "partitions"))
.filter("partition.id < 2 or record_count=1")
.collectAsList();
- Assert.assertEquals("Actual results should have one row", 2,
nonFiltered.size());
+ Assert.assertEquals("Actual results should have two row", 2,
nonFiltered.size());
for (int i = 0; i < 2; i += 1) {
TestHelpers.assertEqualsSafe(
partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
}
}
+ @Test
+ public void testPartitionsTableLastUpdatedSnapshot() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db",
"partitions_test");
+ Table table = createTable(tableIdentifier, SCHEMA, SPEC);
+ Table partitionsTable = loadTable(tableIdentifier, "partitions");
+ Dataset<Row> df1 =
+ spark.createDataFrame(
+ Lists.newArrayList(new SimpleRecord(1, "1"), new SimpleRecord(2,
"2")),
+ SimpleRecord.class);
+ Dataset<Row> df2 =
+ spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "20")),
SimpleRecord.class);
+
+ df1.select("id", "data")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+ long firstCommitId = table.currentSnapshot().snapshotId();
+
+ // add a second file
+ df2.select("id", "data")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+ long secondCommitId = table.currentSnapshot().snapshotId();
+
+ // check if rewrite manifest does not override metadata about data file's
creating snapshot
+ RewriteManifests.Result rewriteManifestResult =
+ SparkActions.get().rewriteManifests(table).execute();
+ Assert.assertEquals(
+ "rewrite replaced 2 manifests",
+ 2,
+ Iterables.size(rewriteManifestResult.rewrittenManifests()));
+ Assert.assertEquals(
+ "rewrite added 1 manifests", 1,
Iterables.size(rewriteManifestResult.addedManifests()));
+
+ List<Row> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .orderBy("partition.id")
+ .collectAsList();
+
+ GenericRecordBuilder builder =
+ new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
+ GenericRecordBuilder partitionBuilder =
+ new GenericRecordBuilder(
+ AvroSchemaUtil.convert(
+ partitionsTable.schema().findType("partition").asStructType(),
"partition"));
+ List<GenericData.Record> expected = Lists.newArrayList();
+ expected.add(
+ builder
+ .set("partition", partitionBuilder.set("id", 1).build())
+ .set("record_count", 1L)
+ .set("file_count", 1)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
+ .build());
+ expected.add(
+ builder
+ .set("partition", partitionBuilder.set("id", 2).build())
+ .set("record_count", 2L)
+ .set("file_count", 2)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", secondCommitId)
+ .build());
+
+ Assert.assertEquals("Partitions table should have two rows", 2,
expected.size());
+ Assert.assertEquals("Actual results should have two rows", 2,
actual.size());
+ for (int i = 0; i < 2; i += 1) {
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
+ }
+
+ // check predicate push down
+ List<Row> filtered =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .filter("partition.id < 2")
+ .collectAsList();
+ Assert.assertEquals("Actual results should have one row", 1,
filtered.size());
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(), expected.get(0), filtered.get(0));
+
+ // check for snapshot expiration
+ // if snapshot with firstCommitId is expired,
+ // we expect the partition of id=1 will no longer have last updated
timestamp and snapshotId
+
SparkActions.get().expireSnapshots(table).expireSnapshotId(firstCommitId).execute();
+ GenericData.Record newPartitionRecord =
+ builder
+ .set("partition", partitionBuilder.set("id", 1).build())
+ .set("record_count", 1L)
+ .set("file_count", 1)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms", null)
+ .set("last_updated_snapshot_id", null)
+ .build();
+ expected.remove(0);
+ expected.add(0, newPartitionRecord);
+
+ List<Row> actualAfterSnapshotExpiration =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .collectAsList();
+ Assert.assertEquals(
+ "Actual results should have two row", 2,
actualAfterSnapshotExpiration.size());
+ for (int i = 0; i < 2; i += 1) {
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(),
+ expected.get(i),
+ actualAfterSnapshotExpiration.get(i));
+ }
+ }
+
@Test
public void testPartitionsTableDeleteStats() {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"partitions_test");
@@ -1416,6 +1574,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
+ long firstCommitId = table.currentSnapshot().snapshotId();
// add a second file
df2.select("id", "data")
@@ -1428,6 +1587,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
DeleteFile deleteFile = writePosDeleteFile(table);
table.newRowDelta().addDeletes(deleteFile).commit();
+ table.refresh();
+ long posDeleteCommitId = table.currentSnapshot().snapshotId();
List<Row> actual =
spark
@@ -1455,6 +1616,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
builder
@@ -1466,7 +1629,10 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", posDeleteCommitId)
.build());
+
for (int i = 0; i < 2; i += 1) {
TestHelpers.assertEqualsSafe(
partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
@@ -1475,6 +1641,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// test equality delete
DeleteFile eqDeleteFile = writeEqDeleteFile(table);
table.newRowDelta().addDeletes(eqDeleteFile).commit();
+ table.refresh();
+ long eqDeleteCommitId = table.currentSnapshot().snapshotId();
actual =
spark
.read()
@@ -1495,6 +1663,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 1L) // should be incremented
now
.set("equality_delete_file_count", 1) // should be incremented now
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
for (int i = 0; i < 2; i += 1) {
TestHelpers.assertEqualsSafe(
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 5c2327c6c5..36c0597883 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -48,6 +48,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -1227,7 +1228,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
@Test
public void testUnpartitionedPartitionsTable() {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"unpartitioned_partitions_test");
- createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+ Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
Dataset<Row> df =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -1261,7 +1262,17 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
8,
"equality_delete_file_count",
Types.IntegerType.get(),
- "Count of equality delete files"));
+ "Count of equality delete files"),
+ optional(
+ 9,
+ "last_updated_ms",
+ Types.TimestampType.withZone(),
+ "Commit time of snapshot that last updated this partition"),
+ optional(
+ 10,
+ "last_updated_snapshot_id",
+ Types.LongType.get(),
+ "Id of snapshot that last updated this partition"));
Table partitionsTable = loadTable(tableIdentifier, "partitions");
@@ -1274,6 +1285,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
GenericData.Record expectedRow =
builder
+ .set("last_updated_ms", table.currentSnapshot().timestampMillis()
* 1000)
+ .set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
.set("position_delete_record_count", 0L)
@@ -1319,6 +1332,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.mode("append")
.save(loadLocation(tableIdentifier));
+ table.refresh();
+ long secondCommitId = table.currentSnapshot().snapshotId();
+
List<Row> actual =
spark
.read()
@@ -1344,6 +1360,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
builder
@@ -1355,6 +1373,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", secondCommitId)
.build());
Assert.assertEquals("Partitions table should have two rows", 2,
expected.size());
@@ -1397,13 +1417,151 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.load(loadLocation(tableIdentifier, "partitions"))
.filter("partition.id < 2 or record_count=1")
.collectAsList();
- Assert.assertEquals("Actual results should have one row", 2,
nonFiltered.size());
+ Assert.assertEquals("Actual results should have two row", 2,
nonFiltered.size());
for (int i = 0; i < 2; i += 1) {
TestHelpers.assertEqualsSafe(
partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
}
}
+ @Test
+ public void testPartitionsTableLastUpdatedSnapshot() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db",
"partitions_test");
+ Table table = createTable(tableIdentifier, SCHEMA, SPEC);
+ Table partitionsTable = loadTable(tableIdentifier, "partitions");
+ Dataset<Row> df1 =
+ spark.createDataFrame(
+ Lists.newArrayList(new SimpleRecord(1, "1"), new SimpleRecord(2,
"2")),
+ SimpleRecord.class);
+ Dataset<Row> df2 =
+ spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "20")),
SimpleRecord.class);
+
+ df1.select("id", "data")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+ long firstCommitId = table.currentSnapshot().snapshotId();
+
+ // add a second file
+ df2.select("id", "data")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+ long secondCommitId = table.currentSnapshot().snapshotId();
+
+ // check if rewrite manifest does not override metadata about data file's
creating snapshot
+ RewriteManifests.Result rewriteManifestResult =
+ SparkActions.get().rewriteManifests(table).execute();
+ Assert.assertEquals(
+ "rewrite replaced 2 manifests",
+ 2,
+ Iterables.size(rewriteManifestResult.rewrittenManifests()));
+ Assert.assertEquals(
+ "rewrite added 1 manifests", 1,
Iterables.size(rewriteManifestResult.addedManifests()));
+
+ List<Row> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .orderBy("partition.id")
+ .collectAsList();
+
+ GenericRecordBuilder builder =
+ new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
+ GenericRecordBuilder partitionBuilder =
+ new GenericRecordBuilder(
+ AvroSchemaUtil.convert(
+ partitionsTable.schema().findType("partition").asStructType(),
"partition"));
+ List<GenericData.Record> expected = Lists.newArrayList();
+ expected.add(
+ builder
+ .set("partition", partitionBuilder.set("id", 1).build())
+ .set("record_count", 1L)
+ .set("file_count", 1)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
+ .build());
+ expected.add(
+ builder
+ .set("partition", partitionBuilder.set("id", 2).build())
+ .set("record_count", 2L)
+ .set("file_count", 2)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", secondCommitId)
+ .build());
+
+ Assert.assertEquals("Partitions table should have two rows", 2,
expected.size());
+ Assert.assertEquals("Actual results should have two rows", 2,
actual.size());
+ for (int i = 0; i < 2; i += 1) {
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
+ }
+
+ // check predicate push down
+ List<Row> filtered =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .filter("partition.id < 2")
+ .collectAsList();
+ Assert.assertEquals("Actual results should have one row", 1,
filtered.size());
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(), expected.get(0), filtered.get(0));
+
+ // check for snapshot expiration
+ // if snapshot with firstCommitId is expired,
+ // we expect the partition of id=1 will no longer have last updated
timestamp and snapshotId
+
SparkActions.get().expireSnapshots(table).expireSnapshotId(firstCommitId).execute();
+ GenericData.Record newPartitionRecord =
+ builder
+ .set("partition", partitionBuilder.set("id", 1).build())
+ .set("record_count", 1L)
+ .set("file_count", 1)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms", null)
+ .set("last_updated_snapshot_id", null)
+ .build();
+ expected.remove(0);
+ expected.add(0, newPartitionRecord);
+
+ List<Row> actualAfterSnapshotExpiration =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .collectAsList();
+ Assert.assertEquals(
+ "Actual results should have two row", 2,
actualAfterSnapshotExpiration.size());
+ for (int i = 0; i < 2; i += 1) {
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(),
+ expected.get(i),
+ actualAfterSnapshotExpiration.get(i));
+ }
+ }
+
@Test
public void testPartitionsTableDeleteStats() {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"partitions_test");
@@ -1421,6 +1579,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
+ long firstCommitId = table.currentSnapshot().snapshotId();
// add a second file
df2.select("id", "data")
@@ -1433,6 +1592,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
DeleteFile deleteFile = writePosDeleteFile(table);
table.newRowDelta().addDeletes(deleteFile).commit();
+ table.refresh();
+ long posDeleteCommitId = table.currentSnapshot().snapshotId();
List<Row> actual =
spark
@@ -1460,6 +1621,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
builder
@@ -1471,7 +1634,10 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", posDeleteCommitId)
.build());
+
for (int i = 0; i < 2; i += 1) {
TestHelpers.assertEqualsSafe(
partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
@@ -1480,6 +1646,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// test equality delete
DeleteFile eqDeleteFile = writeEqDeleteFile(table);
table.newRowDelta().addDeletes(eqDeleteFile).commit();
+ table.refresh();
+ long eqDeleteCommitId = table.currentSnapshot().snapshotId();
actual =
spark
.read()
@@ -1500,6 +1668,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 1L) // should be incremented
now
.set("equality_delete_file_count", 1) // should be incremented now
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
for (int i = 0; i < 2; i += 1) {
TestHelpers.assertEqualsSafe(
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index a6687a4ce7..303931e21b 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -204,7 +205,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
@Test
- public void testEntriesTablePartitionedPrune() throws Exception {
+ public void testEntriesTablePartitionedPrune() {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
@@ -233,7 +234,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
@Test
- public void testEntriesTableDataFilePrune() throws Exception {
+ public void testEntriesTableDataFilePrune() {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
@@ -266,7 +267,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
@Test
- public void testEntriesTableDataFilePruneMulti() throws Exception {
+ public void testEntriesTableDataFilePruneMulti() {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
@@ -304,7 +305,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
@Test
- public void testFilesSelectMap() throws Exception {
+ public void testFilesSelectMap() {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
@@ -644,7 +645,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
@Test
- public void testAllMetadataTablesWithStagedCommits() throws Exception {
+ public void testAllMetadataTablesWithStagedCommits() {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"stage_aggregate_table_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
@@ -691,8 +692,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
Assert.assertTrue(
"Stage table should have some snapshots",
table.snapshots().iterator().hasNext());
- Assert.assertEquals(
- "Stage table should have null currentSnapshot", null,
table.currentSnapshot());
+ Assert.assertNull("Stage table should have null currentSnapshot",
table.currentSnapshot());
Assert.assertEquals("Actual results should have two rows", 2,
actualAllData.size());
Assert.assertEquals("Actual results should have two rows", 2,
actualAllManifests.size());
Assert.assertEquals("Actual results should have two rows", 2,
actualAllEntries.size());
@@ -1212,8 +1212,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
snapshotManifest ->
manifestRecord(
manifestTable, snapshotManifest.first(),
snapshotManifest.second()))
+ .sorted(Comparator.comparing(o -> o.get("path").toString()))
.collect(Collectors.toList());
- expected.sort(Comparator.comparing(o -> o.get("path").toString()));
Assert.assertEquals("Manifests table should have 5 manifest rows", 5,
actual.size());
for (int i = 0; i < expected.size(); i += 1) {
@@ -1225,7 +1225,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
@Test
public void testUnpartitionedPartitionsTable() {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"unpartitioned_partitions_test");
- createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+ Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.unpartitioned());
Dataset<Row> df =
spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
@@ -1259,7 +1259,17 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
8,
"equality_delete_file_count",
Types.IntegerType.get(),
- "Count of equality delete files"));
+ "Count of equality delete files"),
+ optional(
+ 9,
+ "last_updated_ms",
+ Types.TimestampType.withZone(),
+ "Commit time of snapshot that last updated this partition"),
+ optional(
+ 10,
+ "last_updated_snapshot_id",
+ Types.LongType.get(),
+ "Id of snapshot that last updated this partition"));
Table partitionsTable = loadTable(tableIdentifier, "partitions");
@@ -1272,6 +1282,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
GenericData.Record expectedRow =
builder
+ .set("last_updated_ms", table.currentSnapshot().timestampMillis()
* 1000)
+ .set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
.set("position_delete_record_count", 0L)
@@ -1317,6 +1329,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.mode("append")
.save(loadLocation(tableIdentifier));
+ table.refresh();
+ long secondCommitId = table.currentSnapshot().snapshotId();
+
List<Row> actual =
spark
.read()
@@ -1342,6 +1357,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
builder
@@ -1353,6 +1370,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", secondCommitId)
.build());
Assert.assertEquals("Partitions table should have two rows", 2,
expected.size());
@@ -1395,13 +1414,151 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.load(loadLocation(tableIdentifier, "partitions"))
.filter("partition.id < 2 or record_count=1")
.collectAsList();
- Assert.assertEquals("Actual results should have one row", 2,
nonFiltered.size());
+ Assert.assertEquals("Actual results should have two row", 2,
nonFiltered.size());
for (int i = 0; i < 2; i += 1) {
TestHelpers.assertEqualsSafe(
partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
}
}
+ @Test
+ public void testPartitionsTableLastUpdatedSnapshot() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db",
"partitions_test");
+ Table table = createTable(tableIdentifier, SCHEMA, SPEC);
+ Table partitionsTable = loadTable(tableIdentifier, "partitions");
+ Dataset<Row> df1 =
+ spark.createDataFrame(
+ Lists.newArrayList(new SimpleRecord(1, "1"), new SimpleRecord(2,
"2")),
+ SimpleRecord.class);
+ Dataset<Row> df2 =
+ spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "20")),
SimpleRecord.class);
+
+ df1.select("id", "data")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+ long firstCommitId = table.currentSnapshot().snapshotId();
+
+ // add a second file
+ df2.select("id", "data")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+ long secondCommitId = table.currentSnapshot().snapshotId();
+
+ // check if rewrite manifest does not override metadata about data file's
creating snapshot
+ RewriteManifests.Result rewriteManifestResult =
+ SparkActions.get().rewriteManifests(table).execute();
+ Assert.assertEquals(
+ "rewrite replaced 2 manifests",
+ 2,
+ Iterables.size(rewriteManifestResult.rewrittenManifests()));
+ Assert.assertEquals(
+ "rewrite added 1 manifests", 1,
Iterables.size(rewriteManifestResult.addedManifests()));
+
+ List<Row> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .orderBy("partition.id")
+ .collectAsList();
+
+ GenericRecordBuilder builder =
+ new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
+ GenericRecordBuilder partitionBuilder =
+ new GenericRecordBuilder(
+ AvroSchemaUtil.convert(
+ partitionsTable.schema().findType("partition").asStructType(),
"partition"));
+ List<GenericData.Record> expected = Lists.newArrayList();
+ expected.add(
+ builder
+ .set("partition", partitionBuilder.set("id", 1).build())
+ .set("record_count", 1L)
+ .set("file_count", 1)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
+ .build());
+ expected.add(
+ builder
+ .set("partition", partitionBuilder.set("id", 2).build())
+ .set("record_count", 2L)
+ .set("file_count", 2)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", secondCommitId)
+ .build());
+
+ Assert.assertEquals("Partitions table should have two rows", 2,
expected.size());
+ Assert.assertEquals("Actual results should have two rows", 2,
actual.size());
+ for (int i = 0; i < 2; i += 1) {
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
+ }
+
+ // check predicate push down
+ List<Row> filtered =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .filter("partition.id < 2")
+ .collectAsList();
+ Assert.assertEquals("Actual results should have one row", 1,
filtered.size());
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(), expected.get(0), filtered.get(0));
+
+ // check for snapshot expiration
+ // if snapshot with firstCommitId is expired,
+ // we expect the partition of id=1 will no longer have last updated
timestamp and snapshotId
+
SparkActions.get().expireSnapshots(table).expireSnapshotId(firstCommitId).execute();
+ GenericData.Record newPartitionRecord =
+ builder
+ .set("partition", partitionBuilder.set("id", 1).build())
+ .set("record_count", 1L)
+ .set("file_count", 1)
+ .set("position_delete_record_count", 0L)
+ .set("position_delete_file_count", 0)
+ .set("equality_delete_record_count", 0L)
+ .set("equality_delete_file_count", 0)
+ .set("spec_id", 0)
+ .set("last_updated_ms", null)
+ .set("last_updated_snapshot_id", null)
+ .build();
+ expected.remove(0);
+ expected.add(0, newPartitionRecord);
+
+ List<Row> actualAfterSnapshotExpiration =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "partitions"))
+ .collectAsList();
+ Assert.assertEquals(
+ "Actual results should have two row", 2,
actualAfterSnapshotExpiration.size());
+ for (int i = 0; i < 2; i += 1) {
+ TestHelpers.assertEqualsSafe(
+ partitionsTable.schema().asStruct(),
+ expected.get(i),
+ actualAfterSnapshotExpiration.get(i));
+ }
+ }
+
@Test
public void testPartitionsTableDeleteStats() {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"partitions_test");
@@ -1419,6 +1576,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.save(loadLocation(tableIdentifier));
table.refresh();
+ long firstCommitId = table.currentSnapshot().snapshotId();
// add a second file
df2.select("id", "data")
@@ -1431,6 +1589,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
DeleteFile deleteFile = writePosDeleteFile(table);
table.newRowDelta().addDeletes(deleteFile).commit();
+ table.refresh();
+ long posDeleteCommitId = table.currentSnapshot().snapshotId();
List<Row> actual =
spark
@@ -1458,6 +1618,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
builder
@@ -1469,7 +1631,10 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", posDeleteCommitId)
.build());
+
for (int i = 0; i < 2; i += 1) {
TestHelpers.assertEqualsSafe(
partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
@@ -1478,6 +1643,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// test equality delete
DeleteFile eqDeleteFile = writeEqDeleteFile(table);
table.newRowDelta().addDeletes(eqDeleteFile).commit();
+ table.refresh();
+ long eqDeleteCommitId = table.currentSnapshot().snapshotId();
actual =
spark
.read()
@@ -1497,7 +1664,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 1L) // should be incremented
now
.set("equality_delete_file_count", 1) // should be incremented now
- .set("spec_id", 0)
+ .set("last_updated_ms",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
for (int i = 0; i < 2; i += 1) {
TestHelpers.assertEqualsSafe(
@@ -1771,7 +1939,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
@Test
- public void testFilesTablePartitionId() throws Exception {
+ public void testFilesTablePartitionId() {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table =
createTable(
@@ -1811,7 +1979,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
@Test
- public void testAllManifestTableSnapshotFiltering() throws Exception {
+ public void testAllManifestTableSnapshotFiltering() {
TableIdentifier tableIdentifier = TableIdentifier.of("db",
"all_manifest_snapshot_filtering");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table manifestTable = loadTable(tableIdentifier, "all_manifests");
@@ -1880,8 +2048,8 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
snapshotManifest ->
manifestRecord(
manifestTable, snapshotManifest.first(),
snapshotManifest.second()))
+ .sorted(Comparator.comparing(o -> o.get("path").toString()))
.collect(Collectors.toList());
- expected.sort(Comparator.comparing(o -> o.get("path").toString()));
Assert.assertEquals("Manifests table should have 3 manifest rows", 3,
actual.size());
for (int i = 0; i < expected.size(); i += 1) {