This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f98115a9d3 Spark 3.x: Backport snapshot references metadata table test 
(#5806)
f98115a9d3 is described below

commit f98115a9d3f00ec886bd73c035018489bf0a7919
Author: Rajarshi Sarkar <[email protected]>
AuthorDate: Thu Sep 29 22:55:16 2022 +0530

    Spark 3.x: Backport snapshot references metadata table test (#5806)
---
 .../spark/extensions/TestMetadataTables.java       | 123 +++++++++++++++++++++
 1 file changed, 123 insertions(+)

diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index cb9bcb072b..64a10bfdcd 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -468,6 +468,129 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
         metadataLogWithProjection);
   }
 
+  @Test
+  public void testSnapshotReferencesMetatable() throws Exception {
+    // Create table and insert data
+    sql(
+        "CREATE TABLE %s (id bigint, data string) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> recordsA =
+        Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
+    spark
+        .createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    List<SimpleRecord> recordsB =
+        Lists.newArrayList(new SimpleRecord(1, "b"), new SimpleRecord(2, "b"));
+    spark
+        .createDataset(recordsB, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    Long currentSnapshotId = table.currentSnapshot().snapshotId();
+
+    // Create branch
+    table
+        .manageSnapshots()
+        .createBranch("testBranch", currentSnapshotId)
+        .setMaxRefAgeMs("testBranch", 10)
+        .setMinSnapshotsToKeep("testBranch", 20)
+        .setMaxSnapshotAgeMs("testBranch", 30)
+        .commit();
+    // Create Tag
+    table
+        .manageSnapshots()
+        .createTag("testTag", currentSnapshotId)
+        .setMaxRefAgeMs("testTag", 50)
+        .commit();
+    // Check refs table
+    List<Row> references = spark.sql("SELECT * FROM " + tableName + 
".refs").collectAsList();
+    Assert.assertEquals("Refs table should return 3 rows", 3, 
references.size());
+    List<Row> branches =
+        spark.sql("SELECT * FROM " + tableName + ".refs WHERE 
type='BRANCH'").collectAsList();
+    Assert.assertEquals("Refs table should return 2 branches", 2, 
branches.size());
+    List<Row> tags =
+        spark.sql("SELECT * FROM " + tableName + ".refs WHERE 
type='TAG'").collectAsList();
+    Assert.assertEquals("Refs table should return 1 tag", 1, tags.size());
+
+    // Check branch entries in refs table
+    List<Row> mainBranch =
+        spark
+            .sql("SELECT * FROM " + tableName + ".refs WHERE name = 'main' AND 
type='BRANCH'")
+            .collectAsList();
+    Assert.assertEquals("main", mainBranch.get(0).getAs("name"));
+    Assert.assertEquals("BRANCH", mainBranch.get(0).getAs("type"));
+    Assert.assertEquals(currentSnapshotId, 
mainBranch.get(0).getAs("snapshot_id"));
+
+    List<Row> testBranch =
+        spark
+            .sql("SELECT * FROM " + tableName + ".refs WHERE name = 
'testBranch' AND type='BRANCH'")
+            .collectAsList();
+    Assert.assertEquals("testBranch", testBranch.get(0).getAs("name"));
+    Assert.assertEquals("BRANCH", testBranch.get(0).getAs("type"));
+    Assert.assertEquals(currentSnapshotId, 
testBranch.get(0).getAs("snapshot_id"));
+    Assert.assertEquals(Long.valueOf(10), 
testBranch.get(0).getAs("max_reference_age_in_ms"));
+    Assert.assertEquals(Integer.valueOf(20), 
testBranch.get(0).getAs("min_snapshots_to_keep"));
+    Assert.assertEquals(Long.valueOf(30), 
testBranch.get(0).getAs("max_snapshot_age_in_ms"));
+
+    // Check tag entries in refs table
+    List<Row> testTag =
+        spark
+            .sql("SELECT * FROM " + tableName + ".refs WHERE name = 'testTag' 
AND type='TAG'")
+            .collectAsList();
+    Assert.assertEquals("testTag", testTag.get(0).getAs("name"));
+    Assert.assertEquals("TAG", testTag.get(0).getAs("type"));
+    Assert.assertEquals(currentSnapshotId, 
testTag.get(0).getAs("snapshot_id"));
+    Assert.assertEquals(Long.valueOf(50), 
testTag.get(0).getAs("max_reference_age_in_ms"));
+
+    // Check projection in refs table
+    List<Row> testTagProjection =
+        spark
+            .sql(
+                "SELECT 
name,type,snapshot_id,max_reference_age_in_ms,min_snapshots_to_keep FROM "
+                    + tableName
+                    + ".refs where type='TAG'")
+            .collectAsList();
+    Assert.assertEquals("testTag", testTagProjection.get(0).getAs("name"));
+    Assert.assertEquals("TAG", testTagProjection.get(0).getAs("type"));
+    Assert.assertEquals(currentSnapshotId, 
testTagProjection.get(0).getAs("snapshot_id"));
+    Assert.assertEquals(
+        Long.valueOf(50), 
testTagProjection.get(0).getAs("max_reference_age_in_ms"));
+    Assert.assertNull(testTagProjection.get(0).getAs("min_snapshots_to_keep"));
+
+    List<Row> mainBranchProjection =
+        spark
+            .sql(
+                "SELECT name, type FROM "
+                    + tableName
+                    + ".refs WHERE name = 'main' AND type = 'BRANCH'")
+            .collectAsList();
+    Assert.assertEquals("main", mainBranchProjection.get(0).getAs("name"));
+    Assert.assertEquals("BRANCH", mainBranchProjection.get(0).getAs("type"));
+
+    List<Row> testBranchProjection =
+        spark
+            .sql(
+                "SELECT type, name, max_reference_age_in_ms, snapshot_id FROM "
+                    + tableName
+                    + ".refs WHERE name = 'testBranch' AND type = 'BRANCH'")
+            .collectAsList();
+    Assert.assertEquals("testBranch", 
testBranchProjection.get(0).getAs("name"));
+    Assert.assertEquals("BRANCH", testBranchProjection.get(0).getAs("type"));
+    Assert.assertEquals(currentSnapshotId, 
testBranchProjection.get(0).getAs("snapshot_id"));
+    Assert.assertEquals(
+        Long.valueOf(10), 
testBranchProjection.get(0).getAs("max_reference_age_in_ms"));
+  }
+
   /**
    * Find matching manifest entries of an Iceberg table
    *

Reply via email to