This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new afc83066f8 Spark: Support snapshot_ids in expire snapshots procedure 
(#6992)
afc83066f8 is described below

commit afc83066f838969812de4884eff759a465b86f52
Author: Gustavo Torres <[email protected]>
AuthorDate: Sun Mar 5 18:35:59 2023 -0500

    Spark: Support snapshot_ids in expire snapshots procedure (#6992)
---
 .../IcebergSqlExtensions.g4                        |  6 +++
 .../extensions/TestExpireSnapshotsProcedure.java   | 53 ++++++++++++++++++++++
 .../spark/procedures/ExpireSnapshotsProcedure.java | 11 ++++-
 .../IcebergSqlExtensions.g4                        |  6 +++
 .../extensions/TestExpireSnapshotsProcedure.java   | 53 ++++++++++++++++++++++
 .../spark/procedures/ExpireSnapshotsProcedure.java | 11 ++++-
 .../extensions/TestExpireSnapshotsProcedure.java   | 53 ++++++++++++++++++++++
 .../spark/procedures/ExpireSnapshotsProcedure.java | 11 ++++-
 8 files changed, 201 insertions(+), 3 deletions(-)

diff --git 
a/spark/v3.1/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
 
b/spark/v3.1/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
index 339dbcaaa0..38f85e8f19 100644
--- 
a/spark/v3.1/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
+++ 
b/spark/v3.1/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
@@ -145,6 +145,7 @@ transformArgument
 expression
     : constant
     | stringMap
+    | stringArray
     ;
 
 constant
@@ -162,6 +163,10 @@ booleanValue
     : TRUE | FALSE
     ;
 
+stringArray
+    : ARRAY '(' constant (',' constant)* ')'
+    ;
+
 number
     : MINUS? EXPONENT_VALUE           #exponentLiteral
     | MINUS? DECIMAL_VALUE            #decimalLiteral
@@ -254,6 +259,7 @@ TRUE: 'TRUE';
 FALSE: 'FALSE';
 
 MAP: 'MAP';
+ARRAY: 'ARRAY';
 
 PLUS: '+';
 MINUS: '-';
diff --git 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index 419b7ed0f6..9a3fad7538 100644
--- 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -281,6 +281,59 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 
1L)), output);
   }
 
+  @Test
+  public void testExpireSnapshotsWithSnapshotId() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Assert.assertEquals("Should be 2 snapshots", 2, 
Iterables.size(table.snapshots()));
+
+    // Expiring the snapshot specified by snapshot_id should keep only a 
single snapshot.
+    long firstSnapshotId = table.currentSnapshot().parentId();
+    sql(
+        "CALL %s.system.expire_snapshots(" + "table => '%s'," + "snapshot_ids 
=> ARRAY(%d))",
+        catalogName, tableIdent, firstSnapshotId);
+
+    // There should only be one single snapshot left.
+    table.refresh();
+    Assert.assertEquals("Should be 1 snapshots", 1, 
Iterables.size(table.snapshots()));
+    Assert.assertEquals(
+        "Snapshot ID should not be present",
+        0,
+        Iterables.size(
+            Iterables.filter(
+                table.snapshots(), snapshot -> snapshot.snapshotId() == 
firstSnapshotId)));
+  }
+
+  @Test
+  public void testExpireSnapshotShouldFailForCurrentSnapshot() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals("Should be 2 snapshots", 2, 
Iterables.size(table.snapshots()));
+
+    AssertHelpers.assertThrows(
+        "Should reject call",
+        IllegalArgumentException.class,
+        "Cannot expire",
+        () ->
+            sql(
+                "CALL %s.system.expire_snapshots("
+                    + "table => '%s',"
+                    + "snapshot_ids => ARRAY(%d, %d))",
+                catalogName,
+                tableIdent,
+                table.currentSnapshot().snapshotId(),
+                table.currentSnapshot().parentId()));
+  }
+
   @Test
   public void testExpireSnapshotsProcedureWorksWithSqlComments() {
     // Ensure that systems such as dbt, that add leading comments into the 
generated SQL commands,
diff --git 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 214b1ea52b..4780a2653e 100644
--- 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -52,7 +52,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
         ProcedureParameter.optional("older_than", DataTypes.TimestampType),
         ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
         ProcedureParameter.optional("max_concurrent_deletes", 
DataTypes.IntegerType),
-        ProcedureParameter.optional("stream_results", DataTypes.BooleanType)
+        ProcedureParameter.optional("stream_results", DataTypes.BooleanType),
+        ProcedureParameter.optional("snapshot_ids", 
DataTypes.createArrayType(DataTypes.LongType))
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -89,12 +90,14 @@ public class ExpireSnapshotsProcedure extends BaseProcedure 
{
   }
 
   @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
     Long olderThanMillis = args.isNullAt(1) ? null : 
DateTimeUtil.microsToMillis(args.getLong(1));
     Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
     Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
     Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
+    long[] snapshotIds = args.isNullAt(5) ? null : 
args.getArray(5).toLongArray();
 
     Preconditions.checkArgument(
         maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
@@ -118,6 +121,12 @@ public class ExpireSnapshotsProcedure extends 
BaseProcedure {
             action.executeDeleteWith(expireService(maxConcurrentDeletes));
           }
 
+          if (snapshotIds != null) {
+            for (long snapshotId : snapshotIds) {
+              action.expireSnapshotId(snapshotId);
+            }
+          }
+
           if (streamResult != null) {
             action.option(
                 BaseExpireSnapshotsSparkAction.STREAM_RESULTS, 
Boolean.toString(streamResult));
diff --git 
a/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
 
b/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
index ab2ebace39..e614ee9ed3 100644
--- 
a/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
+++ 
b/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
@@ -149,6 +149,7 @@ transformArgument
 expression
     : constant
     | stringMap
+    | stringArray
     ;
 
 constant
@@ -162,6 +163,10 @@ stringMap
     : MAP '(' constant (',' constant)* ')'
     ;
 
+stringArray
+    : ARRAY '(' constant (',' constant)* ')'
+    ;
+
 booleanValue
     : TRUE | FALSE
     ;
@@ -258,6 +263,7 @@ TRUE: 'TRUE';
 FALSE: 'FALSE';
 
 MAP: 'MAP';
+ARRAY: 'ARRAY';
 
 PLUS: '+';
 MINUS: '-';
diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index 5cb4f17edc..e7f648ed6f 100644
--- 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -358,6 +358,59 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 
0L, 0L, 1L)), output);
   }
 
+  @Test
+  public void testExpireSnapshotsWithSnapshotId() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Assert.assertEquals("Should be 2 snapshots", 2, 
Iterables.size(table.snapshots()));
+
+    // Expiring the snapshot specified by snapshot_id should keep only a 
single snapshot.
+    long firstSnapshotId = table.currentSnapshot().parentId();
+    sql(
+        "CALL %s.system.expire_snapshots(" + "table => '%s'," + "snapshot_ids 
=> ARRAY(%d))",
+        catalogName, tableIdent, firstSnapshotId);
+
+    // There should only be one single snapshot left.
+    table.refresh();
+    Assert.assertEquals("Should be 1 snapshots", 1, 
Iterables.size(table.snapshots()));
+    Assert.assertEquals(
+        "Snapshot ID should not be present",
+        0,
+        Iterables.size(
+            Iterables.filter(
+                table.snapshots(), snapshot -> snapshot.snapshotId() == 
firstSnapshotId)));
+  }
+
+  @Test
+  public void testExpireSnapshotShouldFailForCurrentSnapshot() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals("Should be 2 snapshots", 2, 
Iterables.size(table.snapshots()));
+
+    AssertHelpers.assertThrows(
+        "Should reject call",
+        IllegalArgumentException.class,
+        "Cannot expire",
+        () ->
+            sql(
+                "CALL %s.system.expire_snapshots("
+                    + "table => '%s',"
+                    + "snapshot_ids => ARRAY(%d, %d))",
+                catalogName,
+                tableIdent,
+                table.currentSnapshot().snapshotId(),
+                table.currentSnapshot().parentId()));
+  }
+
   @Test
   public void testExpireSnapshotsProcedureWorksWithSqlComments() {
     // Ensure that systems such as dbt, that inject comments into the 
generated SQL files, will
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index aff4b44f94..36c5dee6bd 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -47,7 +47,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
         ProcedureParameter.optional("older_than", DataTypes.TimestampType),
         ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
         ProcedureParameter.optional("max_concurrent_deletes", 
DataTypes.IntegerType),
-        ProcedureParameter.optional("stream_results", DataTypes.BooleanType)
+        ProcedureParameter.optional("stream_results", DataTypes.BooleanType),
+        ProcedureParameter.optional("snapshot_ids", 
DataTypes.createArrayType(DataTypes.LongType))
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -88,12 +89,14 @@ public class ExpireSnapshotsProcedure extends BaseProcedure 
{
   }
 
   @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
     Long olderThanMillis = args.isNullAt(1) ? null : 
DateTimeUtil.microsToMillis(args.getLong(1));
     Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
     Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
     Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
+    long[] snapshotIds = args.isNullAt(5) ? null : 
args.getArray(5).toLongArray();
 
     Preconditions.checkArgument(
         maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
@@ -117,6 +120,12 @@ public class ExpireSnapshotsProcedure extends 
BaseProcedure {
             action.executeDeleteWith(executorService(maxConcurrentDeletes, 
"expire-snapshots"));
           }
 
+          if (snapshotIds != null) {
+            for (long snapshotId : snapshotIds) {
+              action.expireSnapshotId(snapshotId);
+            }
+          }
+
           if (streamResult != null) {
             action.option(
                 ExpireSnapshotsSparkAction.STREAM_RESULTS, 
Boolean.toString(streamResult));
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index 5cb4f17edc..e7f648ed6f 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -358,6 +358,59 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 
0L, 0L, 1L)), output);
   }
 
+  @Test
+  public void testExpireSnapshotsWithSnapshotId() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Assert.assertEquals("Should be 2 snapshots", 2, 
Iterables.size(table.snapshots()));
+
+    // Expiring the snapshot specified by snapshot_id should keep only a 
single snapshot.
+    long firstSnapshotId = table.currentSnapshot().parentId();
+    sql(
+        "CALL %s.system.expire_snapshots(" + "table => '%s'," + "snapshot_ids 
=> ARRAY(%d))",
+        catalogName, tableIdent, firstSnapshotId);
+
+    // There should only be one single snapshot left.
+    table.refresh();
+    Assert.assertEquals("Should be 1 snapshots", 1, 
Iterables.size(table.snapshots()));
+    Assert.assertEquals(
+        "Snapshot ID should not be present",
+        0,
+        Iterables.size(
+            Iterables.filter(
+                table.snapshots(), snapshot -> snapshot.snapshotId() == 
firstSnapshotId)));
+  }
+
+  @Test
+  public void testExpireSnapshotShouldFailForCurrentSnapshot() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals("Should be 2 snapshots", 2, 
Iterables.size(table.snapshots()));
+
+    AssertHelpers.assertThrows(
+        "Should reject call",
+        IllegalArgumentException.class,
+        "Cannot expire",
+        () ->
+            sql(
+                "CALL %s.system.expire_snapshots("
+                    + "table => '%s',"
+                    + "snapshot_ids => ARRAY(%d, %d))",
+                catalogName,
+                tableIdent,
+                table.currentSnapshot().snapshotId(),
+                table.currentSnapshot().parentId()));
+  }
+
   @Test
   public void testExpireSnapshotsProcedureWorksWithSqlComments() {
     // Ensure that systems such as dbt, that inject comments into the 
generated SQL files, will
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 8d979ea054..a66310f493 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -52,7 +52,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
         ProcedureParameter.optional("older_than", DataTypes.TimestampType),
         ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
         ProcedureParameter.optional("max_concurrent_deletes", 
DataTypes.IntegerType),
-        ProcedureParameter.optional("stream_results", DataTypes.BooleanType)
+        ProcedureParameter.optional("stream_results", DataTypes.BooleanType),
+        ProcedureParameter.optional("snapshot_ids", 
DataTypes.createArrayType(DataTypes.LongType))
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -93,12 +94,14 @@ public class ExpireSnapshotsProcedure extends BaseProcedure 
{
   }
 
   @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
     Long olderThanMillis = args.isNullAt(1) ? null : 
DateTimeUtil.microsToMillis(args.getLong(1));
     Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
     Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
     Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
+    long[] snapshotIds = args.isNullAt(5) ? null : 
args.getArray(5).toLongArray();
 
     Preconditions.checkArgument(
         maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
@@ -132,6 +135,12 @@ public class ExpireSnapshotsProcedure extends 
BaseProcedure {
             }
           }
 
+          if (snapshotIds != null) {
+            for (long snapshotId : snapshotIds) {
+              action.expireSnapshotId(snapshotId);
+            }
+          }
+
           if (streamResult != null) {
             action.option(
                 ExpireSnapshotsSparkAction.STREAM_RESULTS, 
Boolean.toString(streamResult));

Reply via email to