This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f98115a9d3 Spark 3.x: Backport snapshot references metadata table test
(#5806)
f98115a9d3 is described below
commit f98115a9d3f00ec886bd73c035018489bf0a7919
Author: Rajarshi Sarkar <[email protected]>
AuthorDate: Thu Sep 29 22:55:16 2022 +0530
Spark 3.x: Backport snapshot references metadata table test (#5806)
---
.../spark/extensions/TestMetadataTables.java | 123 +++++++++++++++++++++
1 file changed, 123 insertions(+)
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index cb9bcb072b..64a10bfdcd 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -468,6 +468,129 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
metadataLogWithProjection);
}
+ @Test
+ public void testSnapshotReferencesMetatable() throws Exception {
+ // Create table and insert data
+ sql(
+ "CREATE TABLE %s (id bigint, data string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES"
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+ tableName);
+
+ List<SimpleRecord> recordsA =
+ Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
+ spark
+ .createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ List<SimpleRecord> recordsB =
+ Lists.newArrayList(new SimpleRecord(1, "b"), new SimpleRecord(2, "b"));
+ spark
+ .createDataset(recordsB, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ Long currentSnapshotId = table.currentSnapshot().snapshotId();
+
+ // Create branch
+ table
+ .manageSnapshots()
+ .createBranch("testBranch", currentSnapshotId)
+ .setMaxRefAgeMs("testBranch", 10)
+ .setMinSnapshotsToKeep("testBranch", 20)
+ .setMaxSnapshotAgeMs("testBranch", 30)
+ .commit();
+ // Create Tag
+ table
+ .manageSnapshots()
+ .createTag("testTag", currentSnapshotId)
+ .setMaxRefAgeMs("testTag", 50)
+ .commit();
+ // Check refs table
+ List<Row> references = spark.sql("SELECT * FROM " + tableName +
".refs").collectAsList();
+ Assert.assertEquals("Refs table should return 3 rows", 3,
references.size());
+ List<Row> branches =
+ spark.sql("SELECT * FROM " + tableName + ".refs WHERE
type='BRANCH'").collectAsList();
+ Assert.assertEquals("Refs table should return 2 branches", 2,
branches.size());
+ List<Row> tags =
+ spark.sql("SELECT * FROM " + tableName + ".refs WHERE
type='TAG'").collectAsList();
+ Assert.assertEquals("Refs table should return 1 tag", 1, tags.size());
+
+ // Check branch entries in refs table
+ List<Row> mainBranch =
+ spark
+ .sql("SELECT * FROM " + tableName + ".refs WHERE name = 'main' AND
type='BRANCH'")
+ .collectAsList();
+ Assert.assertEquals("main", mainBranch.get(0).getAs("name"));
+ Assert.assertEquals("BRANCH", mainBranch.get(0).getAs("type"));
+ Assert.assertEquals(currentSnapshotId,
mainBranch.get(0).getAs("snapshot_id"));
+
+ List<Row> testBranch =
+ spark
+ .sql("SELECT * FROM " + tableName + ".refs WHERE name =
'testBranch' AND type='BRANCH'")
+ .collectAsList();
+ Assert.assertEquals("testBranch", testBranch.get(0).getAs("name"));
+ Assert.assertEquals("BRANCH", testBranch.get(0).getAs("type"));
+ Assert.assertEquals(currentSnapshotId,
testBranch.get(0).getAs("snapshot_id"));
+ Assert.assertEquals(Long.valueOf(10),
testBranch.get(0).getAs("max_reference_age_in_ms"));
+ Assert.assertEquals(Integer.valueOf(20),
testBranch.get(0).getAs("min_snapshots_to_keep"));
+ Assert.assertEquals(Long.valueOf(30),
testBranch.get(0).getAs("max_snapshot_age_in_ms"));
+
+ // Check tag entries in refs table
+ List<Row> testTag =
+ spark
+ .sql("SELECT * FROM " + tableName + ".refs WHERE name = 'testTag'
AND type='TAG'")
+ .collectAsList();
+ Assert.assertEquals("testTag", testTag.get(0).getAs("name"));
+ Assert.assertEquals("TAG", testTag.get(0).getAs("type"));
+ Assert.assertEquals(currentSnapshotId,
testTag.get(0).getAs("snapshot_id"));
+ Assert.assertEquals(Long.valueOf(50),
testTag.get(0).getAs("max_reference_age_in_ms"));
+
+ // Check projection in refs table
+ List<Row> testTagProjection =
+ spark
+ .sql(
+ "SELECT
name,type,snapshot_id,max_reference_age_in_ms,min_snapshots_to_keep FROM "
+ + tableName
+ + ".refs where type='TAG'")
+ .collectAsList();
+ Assert.assertEquals("testTag", testTagProjection.get(0).getAs("name"));
+ Assert.assertEquals("TAG", testTagProjection.get(0).getAs("type"));
+ Assert.assertEquals(currentSnapshotId,
testTagProjection.get(0).getAs("snapshot_id"));
+ Assert.assertEquals(
+ Long.valueOf(50),
testTagProjection.get(0).getAs("max_reference_age_in_ms"));
+ Assert.assertNull(testTagProjection.get(0).getAs("min_snapshots_to_keep"));
+
+ List<Row> mainBranchProjection =
+ spark
+ .sql(
+ "SELECT name, type FROM "
+ + tableName
+ + ".refs WHERE name = 'main' AND type = 'BRANCH'")
+ .collectAsList();
+ Assert.assertEquals("main", mainBranchProjection.get(0).getAs("name"));
+ Assert.assertEquals("BRANCH", mainBranchProjection.get(0).getAs("type"));
+
+ List<Row> testBranchProjection =
+ spark
+ .sql(
+ "SELECT type, name, max_reference_age_in_ms, snapshot_id FROM "
+ + tableName
+ + ".refs WHERE name = 'testBranch' AND type = 'BRANCH'")
+ .collectAsList();
+ Assert.assertEquals("testBranch",
testBranchProjection.get(0).getAs("name"));
+ Assert.assertEquals("BRANCH", testBranchProjection.get(0).getAs("type"));
+ Assert.assertEquals(currentSnapshotId,
testBranchProjection.get(0).getAs("snapshot_id"));
+ Assert.assertEquals(
+ Long.valueOf(10),
testBranchProjection.get(0).getAs("max_reference_age_in_ms"));
+ }
+
/**
* Find matching manifest entries of an Iceberg table
*