This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a73f10b3e7 Core: Rename last updated timestamp column in
PartitionsTable (#8003)
a73f10b3e7 is described below
commit a73f10b3e7a98d7efcab7e01382b78bffdc7028e
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Thu Jul 6 16:54:18 2023 -0700
Core: Rename last updated timestamp column in PartitionsTable (#8003)
---
.../java/org/apache/iceberg/PartitionsTable.java | 14 +++++++-------
.../spark/source/TestIcebergSourceTablesBase.java | 14 +++++++-------
.../spark/source/TestIcebergSourceTablesBase.java | 20 ++++++++++----------
.../spark/source/TestIcebergSourceTablesBase.java | 20 ++++++++++----------
.../spark/source/TestIcebergSourceTablesBase.java | 20 ++++++++++----------
5 files changed, 44 insertions(+), 44 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index f072a7343f..71c5a29ef0 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -75,7 +75,7 @@ public class PartitionsTable extends BaseMetadataTable {
"Count of equality delete files"),
Types.NestedField.optional(
9,
- "last_updated_ms",
+ "last_updated_at",
Types.TimestampType.withZone(),
"Commit time of snapshot that last updated this partition"),
Types.NestedField.optional(
@@ -101,7 +101,7 @@ public class PartitionsTable extends BaseMetadataTable {
"position_delete_file_count",
"equality_delete_record_count",
"equality_delete_file_count",
- "last_updated_ms",
+ "last_updated_at",
"last_updated_snapshot_id");
}
return schema;
@@ -129,7 +129,7 @@ public class PartitionsTable extends BaseMetadataTable {
root.posDeleteFileCount,
root.eqDeleteRecordCount,
root.eqDeleteFileCount,
- root.lastUpdatedMs,
+ root.lastUpdatedAt,
root.lastUpdatedSnapshotId));
} else {
return StaticDataTask.of(
@@ -151,7 +151,7 @@ public class PartitionsTable extends BaseMetadataTable {
partition.posDeleteFileCount,
partition.eqDeleteRecordCount,
partition.eqDeleteFileCount,
- partition.lastUpdatedMs,
+ partition.lastUpdatedAt,
partition.lastUpdatedSnapshotId);
}
@@ -273,7 +273,7 @@ public class PartitionsTable extends BaseMetadataTable {
private int posDeleteFileCount;
private long eqDeleteRecordCount;
private int eqDeleteFileCount;
- private Long lastUpdatedMs;
+ private Long lastUpdatedAt;
private Long lastUpdatedSnapshotId;
Partition(StructLike key, Types.StructType keyType) {
@@ -290,8 +290,8 @@ public class PartitionsTable extends BaseMetadataTable {
void update(ContentFile<?> file, Snapshot snapshot) {
if (snapshot != null) {
long snapshotCommitTime = snapshot.timestampMillis() * 1000;
- if (this.lastUpdatedMs == null || snapshotCommitTime >
this.lastUpdatedMs) {
- this.lastUpdatedMs = snapshotCommitTime;
+ if (this.lastUpdatedAt == null || snapshotCommitTime >
this.lastUpdatedAt) {
+ this.lastUpdatedAt = snapshotCommitTime;
this.lastUpdatedSnapshotId = snapshot.snapshotId();
}
}
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index ff8bf0e7be..5f06243e6f 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1255,7 +1255,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
"Count of equality delete files"),
optional(
9,
- "last_updated_ms",
+ "last_updated_at",
Types.TimestampType.withZone(),
"Commit time of snapshot that last updated this partition"),
optional(
@@ -1275,7 +1275,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
GenericData.Record expectedRow =
builder
- .set("last_updated_ms", table.currentSnapshot().timestampMillis()
* 1000)
+ .set("last_updated_at", table.currentSnapshot().timestampMillis()
* 1000)
.set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
@@ -1350,7 +1350,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1363,7 +1363,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(secondCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", secondCommitId)
.build());
@@ -1480,7 +1480,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1493,7 +1493,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(secondCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", secondCommitId)
.build());
@@ -1530,7 +1530,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms", null)
+ .set("last_updated_at", null)
.set("last_updated_snapshot_id", null)
.build();
expected.remove(0);
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 46ac0e96fe..c8c682c01d 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1260,7 +1260,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
"Count of equality delete files"),
optional(
9,
- "last_updated_ms",
+ "last_updated_at",
Types.TimestampType.withZone(),
"Commit time of snapshot that last updated this partition"),
optional(
@@ -1280,7 +1280,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
GenericData.Record expectedRow =
builder
- .set("last_updated_ms", table.currentSnapshot().timestampMillis()
* 1000)
+ .set("last_updated_at", table.currentSnapshot().timestampMillis()
* 1000)
.set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
@@ -1355,7 +1355,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1368,7 +1368,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(secondCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", secondCommitId)
.build());
@@ -1485,7 +1485,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1498,7 +1498,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(secondCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", secondCommitId)
.build());
@@ -1535,7 +1535,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms", null)
+ .set("last_updated_at", null)
.set("last_updated_snapshot_id", null)
.build();
expected.remove(0);
@@ -1616,7 +1616,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1629,7 +1629,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", posDeleteCommitId)
.build());
@@ -1663,7 +1663,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 1L) // should be incremented
now
.set("equality_delete_file_count", 1) // should be incremented now
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
for (int i = 0; i < 2; i += 1) {
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 36c0597883..199104c03e 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1265,7 +1265,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
"Count of equality delete files"),
optional(
9,
- "last_updated_ms",
+ "last_updated_at",
Types.TimestampType.withZone(),
"Commit time of snapshot that last updated this partition"),
optional(
@@ -1285,7 +1285,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
GenericData.Record expectedRow =
builder
- .set("last_updated_ms", table.currentSnapshot().timestampMillis()
* 1000)
+ .set("last_updated_at", table.currentSnapshot().timestampMillis()
* 1000)
.set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
@@ -1360,7 +1360,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1373,7 +1373,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(secondCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", secondCommitId)
.build());
@@ -1490,7 +1490,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1503,7 +1503,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(secondCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", secondCommitId)
.build());
@@ -1540,7 +1540,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms", null)
+ .set("last_updated_at", null)
.set("last_updated_snapshot_id", null)
.build();
expected.remove(0);
@@ -1621,7 +1621,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1634,7 +1634,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", posDeleteCommitId)
.build());
@@ -1668,7 +1668,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 1L) // should be incremented
now
.set("equality_delete_file_count", 1) // should be incremented now
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
for (int i = 0; i < 2; i += 1) {
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 303931e21b..daed197cc9 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1262,7 +1262,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
"Count of equality delete files"),
optional(
9,
- "last_updated_ms",
+ "last_updated_at",
Types.TimestampType.withZone(),
"Commit time of snapshot that last updated this partition"),
optional(
@@ -1282,7 +1282,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
GenericData.Record expectedRow =
builder
- .set("last_updated_ms", table.currentSnapshot().timestampMillis()
* 1000)
+ .set("last_updated_at", table.currentSnapshot().timestampMillis()
* 1000)
.set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
@@ -1357,7 +1357,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1370,7 +1370,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(secondCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", secondCommitId)
.build());
@@ -1487,7 +1487,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1500,7 +1500,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(secondCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(secondCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", secondCommitId)
.build());
@@ -1537,7 +1537,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms", null)
+ .set("last_updated_at", null)
.set("last_updated_snapshot_id", null)
.build();
expected.remove(0);
@@ -1618,7 +1618,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(firstCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(firstCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", firstCommitId)
.build());
expected.add(
@@ -1631,7 +1631,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
- .set("last_updated_ms",
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(posDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", posDeleteCommitId)
.build());
@@ -1664,7 +1664,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 1L) // should be incremented
now
.set("equality_delete_file_count", 1) // should be incremented now
- .set("last_updated_ms",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
+ .set("last_updated_at",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
for (int i = 0; i < 2; i += 1) {