This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 5e139f4850 Core: Don't reset snapshotLog when replacing Table (#9732)
5e139f4850 is described below
commit 5e139f48505b9b622e2cf389eae2bb74d6420510
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Sat Feb 17 17:16:57 2024 +0100
Core: Don't reset snapshotLog when replacing Table (#9732)
---
.../java/org/apache/iceberg/TableMetadata.java | 12 ++-
.../java/org/apache/iceberg/TestTableMetadata.java | 25 +++++
.../spark/extensions/TestMetadataTables.java | 112 +++++++++++++++++++++
3 files changed, 148 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 8e687950a6..9587c57a0f 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -702,7 +702,7 @@ public class TableMetadata implements Serializable {
return new Builder(this)
.upgradeFormatVersion(newFormatVersion)
- .removeRef(SnapshotRef.MAIN_BRANCH)
+ .resetMainBranch()
.setCurrentSchema(freshSchema, newLastColumnId.get())
.setDefaultPartitionSpec(freshSpec)
.setDefaultSortOrder(freshSortOrder)
@@ -1258,6 +1258,16 @@ public class TableMetadata implements Serializable {
return this;
}
+ private Builder resetMainBranch() {
+ this.currentSnapshotId = -1;
+ SnapshotRef ref = refs.remove(SnapshotRef.MAIN_BRANCH);
+ if (ref != null) {
+ changes.add(new
MetadataUpdate.RemoveSnapshotRef(SnapshotRef.MAIN_BRANCH));
+ }
+
+ return this;
+ }
+
public Builder setStatistics(long snapshotId, StatisticsFile
statisticsFile) {
Preconditions.checkNotNull(statisticsFile, "statisticsFile is null");
Preconditions.checkArgument(
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index 0e5b325957..826f3ad1e7 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -1742,4 +1742,29 @@ public class TestTableMetadata {
return localInput(manifestList).location();
}
+
+ @Test
+ public void buildReplacementKeepsSnapshotLog() throws Exception {
+ TableMetadata metadata =
+
TableMetadataParser.fromJson(readTableMetadataInputFile("TableMetadataV2Valid.json"));
+ Assertions.assertThat(metadata.currentSnapshot()).isNotNull();
+ Assertions.assertThat(metadata.snapshots()).hasSize(2);
+ Assertions.assertThat(metadata.snapshotLog()).hasSize(2);
+
+ TableMetadata replacement =
+ metadata.buildReplacement(
+ metadata.schema(),
+ metadata.spec(),
+ metadata.sortOrder(),
+ metadata.location(),
+ metadata.properties());
+
+ Assertions.assertThat(replacement.currentSnapshot()).isNull();
+ Assertions.assertThat(replacement.snapshots())
+ .hasSize(2)
+ .containsExactlyElementsOf(metadata.snapshots());
+ Assertions.assertThat(replacement.snapshotLog())
+ .hasSize(2)
+ .containsExactlyElementsOf(metadata.snapshotLog());
+ }
}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 2143916384..50376589b6 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Comparator;
@@ -30,6 +31,7 @@ import org.apache.avro.generic.GenericData.Record;
import org.apache.commons.collections.ListUtils;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -721,4 +723,114 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
Record partition = (Record) file.get(4);
return partValue.equals(partition.get(0).toString());
}
+
+ @Test
+ public void metadataLogEntriesAfterReplacingTable() throws Exception {
+ sql(
+ "CREATE TABLE %s (id bigint, data string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES "
+ + "('format-version'='2')",
+ tableName);
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ TableMetadata tableMetadata = ((HasTableOperations)
table).operations().current();
+ assertThat(tableMetadata.snapshots()).isEmpty();
+ assertThat(tableMetadata.snapshotLog()).isEmpty();
+ assertThat(tableMetadata.currentSnapshot()).isNull();
+
+ Object[] firstEntry =
+ row(
+ DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() *
1000),
+ tableMetadata.metadataFileLocation(),
+ null,
+ null,
+ null);
+
+ assertThat(sql("SELECT * FROM %s.metadata_log_entries",
tableName)).containsExactly(firstEntry);
+
+ sql("INSERT INTO %s (id, data) VALUES (1, 'a')", tableName);
+
+ tableMetadata = ((HasTableOperations) table).operations().refresh();
+ assertThat(tableMetadata.snapshots()).hasSize(1);
+ assertThat(tableMetadata.snapshotLog()).hasSize(1);
+ Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+
+ Object[] secondEntry =
+ row(
+ DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() *
1000),
+ tableMetadata.metadataFileLocation(),
+ currentSnapshot.snapshotId(),
+ currentSnapshot.schemaId(),
+ currentSnapshot.sequenceNumber());
+
+ assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+ .containsExactly(firstEntry, secondEntry);
+
+ sql("INSERT INTO %s (id, data) VALUES (1, 'a')", tableName);
+
+ tableMetadata = ((HasTableOperations) table).operations().refresh();
+ assertThat(tableMetadata.snapshots()).hasSize(2);
+ assertThat(tableMetadata.snapshotLog()).hasSize(2);
+ currentSnapshot = tableMetadata.currentSnapshot();
+
+ Object[] thirdEntry =
+ row(
+ DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() *
1000),
+ tableMetadata.metadataFileLocation(),
+ currentSnapshot.snapshotId(),
+ currentSnapshot.schemaId(),
+ currentSnapshot.sequenceNumber());
+
+ assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+ .containsExactly(firstEntry, secondEntry, thirdEntry);
+
+ sql(
+ "CREATE OR REPLACE TABLE %s (id bigint, data string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES "
+ + "('format-version'='2')",
+ tableName);
+
+ tableMetadata = ((HasTableOperations) table).operations().refresh();
+ assertThat(tableMetadata.snapshots()).hasSize(2);
+ assertThat(tableMetadata.snapshotLog()).hasSize(2);
+
+ // currentSnapshot is null but the metadata_log_entries will refer to the
last snapshot from the
+ // snapshotLog
+ assertThat(tableMetadata.currentSnapshot()).isNull();
+ HistoryEntry historyEntry = tableMetadata.snapshotLog().get(1);
+ Snapshot lastSnapshot = tableMetadata.snapshot(historyEntry.snapshotId());
+
+ Object[] fourthEntry =
+ row(
+ DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() *
1000),
+ tableMetadata.metadataFileLocation(),
+ lastSnapshot.snapshotId(),
+ lastSnapshot.schemaId(),
+ lastSnapshot.sequenceNumber());
+
+ assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+ .containsExactly(firstEntry, secondEntry, thirdEntry, fourthEntry);
+
+ sql("INSERT INTO %s (id, data) VALUES (1, 'a')", tableName);
+
+ tableMetadata = ((HasTableOperations) table).operations().refresh();
+ assertThat(tableMetadata.snapshots()).hasSize(3);
+ assertThat(tableMetadata.snapshotLog()).hasSize(3);
+ currentSnapshot = tableMetadata.currentSnapshot();
+
+ Object[] fifthEntry =
+ row(
+ DateTimeUtils.toJavaTimestamp(tableMetadata.lastUpdatedMillis() *
1000),
+ tableMetadata.metadataFileLocation(),
+ currentSnapshot.snapshotId(),
+ currentSnapshot.schemaId(),
+ currentSnapshot.sequenceNumber());
+
+ assertThat(sql("SELECT * FROM %s.metadata_log_entries", tableName))
+ .containsExactly(firstEntry, secondEntry, thirdEntry, fourthEntry,
fifthEntry);
+ }
}