This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new afc83066f8 Spark: Support snapshot_ids in expire snapshots procedure
(#6992)
afc83066f8 is described below
commit afc83066f838969812de4884eff759a465b86f52
Author: Gustavo Torres <[email protected]>
AuthorDate: Sun Mar 5 18:35:59 2023 -0500
Spark: Support snapshot_ids in expire snapshots procedure (#6992)
---
.../IcebergSqlExtensions.g4 | 6 +++
.../extensions/TestExpireSnapshotsProcedure.java | 53 ++++++++++++++++++++++
.../spark/procedures/ExpireSnapshotsProcedure.java | 11 ++++-
.../IcebergSqlExtensions.g4 | 6 +++
.../extensions/TestExpireSnapshotsProcedure.java | 53 ++++++++++++++++++++++
.../spark/procedures/ExpireSnapshotsProcedure.java | 11 ++++-
.../extensions/TestExpireSnapshotsProcedure.java | 53 ++++++++++++++++++++++
.../spark/procedures/ExpireSnapshotsProcedure.java | 11 ++++-
8 files changed, 201 insertions(+), 3 deletions(-)
diff --git
a/spark/v3.1/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
b/spark/v3.1/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
index 339dbcaaa0..38f85e8f19 100644
---
a/spark/v3.1/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
+++
b/spark/v3.1/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
@@ -145,6 +145,7 @@ transformArgument
expression
: constant
| stringMap
+ | stringArray
;
constant
@@ -162,6 +163,10 @@ booleanValue
: TRUE | FALSE
;
+stringArray
+ : ARRAY '(' constant (',' constant)* ')'
+ ;
+
number
: MINUS? EXPONENT_VALUE #exponentLiteral
| MINUS? DECIMAL_VALUE #decimalLiteral
@@ -254,6 +259,7 @@ TRUE: 'TRUE';
FALSE: 'FALSE';
MAP: 'MAP';
+ARRAY: 'ARRAY';
PLUS: '+';
MINUS: '-';
diff --git
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index 419b7ed0f6..9a3fad7538 100644
---
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -281,6 +281,59 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L,
1L)), output);
}
+ @Test
+ public void testExpireSnapshotsWithSnapshotId() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Assert.assertEquals("Should be 2 snapshots", 2,
Iterables.size(table.snapshots()));
+
+ // Expiring the snapshot specified by snapshot_id should keep only a
single snapshot.
+ long firstSnapshotId = table.currentSnapshot().parentId();
+ sql(
+ "CALL %s.system.expire_snapshots(" + "table => '%s'," + "snapshot_ids
=> ARRAY(%d))",
+ catalogName, tableIdent, firstSnapshotId);
+
+ // There should only be one single snapshot left.
+ table.refresh();
+ Assert.assertEquals("Should be 1 snapshots", 1,
Iterables.size(table.snapshots()));
+ Assert.assertEquals(
+ "Snapshot ID should not be present",
+ 0,
+ Iterables.size(
+ Iterables.filter(
+ table.snapshots(), snapshot -> snapshot.snapshotId() ==
firstSnapshotId)));
+ }
+
+ @Test
+ public void testExpireSnapshotShouldFailForCurrentSnapshot() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertEquals("Should be 2 snapshots", 2,
Iterables.size(table.snapshots()));
+
+ AssertHelpers.assertThrows(
+ "Should reject call",
+ IllegalArgumentException.class,
+ "Cannot expire",
+ () ->
+ sql(
+ "CALL %s.system.expire_snapshots("
+ + "table => '%s',"
+ + "snapshot_ids => ARRAY(%d, %d))",
+ catalogName,
+ tableIdent,
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().parentId()));
+ }
+
@Test
public void testExpireSnapshotsProcedureWorksWithSqlComments() {
// Ensure that systems such as dbt, that add leading comments into the
generated SQL commands,
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 214b1ea52b..4780a2653e 100644
---
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -52,7 +52,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
ProcedureParameter.optional("max_concurrent_deletes",
DataTypes.IntegerType),
- ProcedureParameter.optional("stream_results", DataTypes.BooleanType)
+ ProcedureParameter.optional("stream_results", DataTypes.BooleanType),
+ ProcedureParameter.optional("snapshot_ids",
DataTypes.createArrayType(DataTypes.LongType))
};
private static final StructType OUTPUT_TYPE =
@@ -89,12 +90,14 @@ public class ExpireSnapshotsProcedure extends BaseProcedure
{
}
@Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
Long olderThanMillis = args.isNullAt(1) ? null :
DateTimeUtil.microsToMillis(args.getLong(1));
Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
+ long[] snapshotIds = args.isNullAt(5) ? null :
args.getArray(5).toLongArray();
Preconditions.checkArgument(
maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
@@ -118,6 +121,12 @@ public class ExpireSnapshotsProcedure extends
BaseProcedure {
action.executeDeleteWith(expireService(maxConcurrentDeletes));
}
+ if (snapshotIds != null) {
+ for (long snapshotId : snapshotIds) {
+ action.expireSnapshotId(snapshotId);
+ }
+ }
+
if (streamResult != null) {
action.option(
BaseExpireSnapshotsSparkAction.STREAM_RESULTS,
Boolean.toString(streamResult));
diff --git
a/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
b/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
index ab2ebace39..e614ee9ed3 100644
---
a/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
+++
b/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
@@ -149,6 +149,7 @@ transformArgument
expression
: constant
| stringMap
+ | stringArray
;
constant
@@ -162,6 +163,10 @@ stringMap
: MAP '(' constant (',' constant)* ')'
;
+stringArray
+ : ARRAY '(' constant (',' constant)* ')'
+ ;
+
booleanValue
: TRUE | FALSE
;
@@ -258,6 +263,7 @@ TRUE: 'TRUE';
FALSE: 'FALSE';
MAP: 'MAP';
+ARRAY: 'ARRAY';
PLUS: '+';
MINUS: '-';
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index 5cb4f17edc..e7f648ed6f 100644
---
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -358,6 +358,59 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L,
0L, 0L, 1L)), output);
}
+ @Test
+ public void testExpireSnapshotsWithSnapshotId() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Assert.assertEquals("Should be 2 snapshots", 2,
Iterables.size(table.snapshots()));
+
+ // Expiring the snapshot specified by snapshot_id should keep only a
single snapshot.
+ long firstSnapshotId = table.currentSnapshot().parentId();
+ sql(
+ "CALL %s.system.expire_snapshots(" + "table => '%s'," + "snapshot_ids
=> ARRAY(%d))",
+ catalogName, tableIdent, firstSnapshotId);
+
+ // There should only be one single snapshot left.
+ table.refresh();
+ Assert.assertEquals("Should be 1 snapshots", 1,
Iterables.size(table.snapshots()));
+ Assert.assertEquals(
+ "Snapshot ID should not be present",
+ 0,
+ Iterables.size(
+ Iterables.filter(
+ table.snapshots(), snapshot -> snapshot.snapshotId() ==
firstSnapshotId)));
+ }
+
+ @Test
+ public void testExpireSnapshotShouldFailForCurrentSnapshot() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertEquals("Should be 2 snapshots", 2,
Iterables.size(table.snapshots()));
+
+ AssertHelpers.assertThrows(
+ "Should reject call",
+ IllegalArgumentException.class,
+ "Cannot expire",
+ () ->
+ sql(
+ "CALL %s.system.expire_snapshots("
+ + "table => '%s',"
+ + "snapshot_ids => ARRAY(%d, %d))",
+ catalogName,
+ tableIdent,
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().parentId()));
+ }
+
@Test
public void testExpireSnapshotsProcedureWorksWithSqlComments() {
// Ensure that systems such as dbt, that inject comments into the
generated SQL files, will
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index aff4b44f94..36c5dee6bd 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -47,7 +47,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
ProcedureParameter.optional("max_concurrent_deletes",
DataTypes.IntegerType),
- ProcedureParameter.optional("stream_results", DataTypes.BooleanType)
+ ProcedureParameter.optional("stream_results", DataTypes.BooleanType),
+ ProcedureParameter.optional("snapshot_ids",
DataTypes.createArrayType(DataTypes.LongType))
};
private static final StructType OUTPUT_TYPE =
@@ -88,12 +89,14 @@ public class ExpireSnapshotsProcedure extends BaseProcedure
{
}
@Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
Long olderThanMillis = args.isNullAt(1) ? null :
DateTimeUtil.microsToMillis(args.getLong(1));
Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
+ long[] snapshotIds = args.isNullAt(5) ? null :
args.getArray(5).toLongArray();
Preconditions.checkArgument(
maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
@@ -117,6 +120,12 @@ public class ExpireSnapshotsProcedure extends
BaseProcedure {
action.executeDeleteWith(executorService(maxConcurrentDeletes,
"expire-snapshots"));
}
+ if (snapshotIds != null) {
+ for (long snapshotId : snapshotIds) {
+ action.expireSnapshotId(snapshotId);
+ }
+ }
+
if (streamResult != null) {
action.option(
ExpireSnapshotsSparkAction.STREAM_RESULTS,
Boolean.toString(streamResult));
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index 5cb4f17edc..e7f648ed6f 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -358,6 +358,59 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L,
0L, 0L, 1L)), output);
}
+ @Test
+ public void testExpireSnapshotsWithSnapshotId() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Assert.assertEquals("Should be 2 snapshots", 2,
Iterables.size(table.snapshots()));
+
+ // Expiring the snapshot specified by snapshot_id should keep only a
single snapshot.
+ long firstSnapshotId = table.currentSnapshot().parentId();
+ sql(
+ "CALL %s.system.expire_snapshots(" + "table => '%s'," + "snapshot_ids
=> ARRAY(%d))",
+ catalogName, tableIdent, firstSnapshotId);
+
+ // There should only be one single snapshot left.
+ table.refresh();
+ Assert.assertEquals("Should be 1 snapshots", 1,
Iterables.size(table.snapshots()));
+ Assert.assertEquals(
+ "Snapshot ID should not be present",
+ 0,
+ Iterables.size(
+ Iterables.filter(
+ table.snapshots(), snapshot -> snapshot.snapshotId() ==
firstSnapshotId)));
+ }
+
+ @Test
+ public void testExpireSnapshotShouldFailForCurrentSnapshot() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertEquals("Should be 2 snapshots", 2,
Iterables.size(table.snapshots()));
+
+ AssertHelpers.assertThrows(
+ "Should reject call",
+ IllegalArgumentException.class,
+ "Cannot expire",
+ () ->
+ sql(
+ "CALL %s.system.expire_snapshots("
+ + "table => '%s',"
+ + "snapshot_ids => ARRAY(%d, %d))",
+ catalogName,
+ tableIdent,
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().parentId()));
+ }
+
@Test
public void testExpireSnapshotsProcedureWorksWithSqlComments() {
// Ensure that systems such as dbt, that inject comments into the
generated SQL files, will
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 8d979ea054..a66310f493 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -52,7 +52,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
ProcedureParameter.optional("max_concurrent_deletes",
DataTypes.IntegerType),
- ProcedureParameter.optional("stream_results", DataTypes.BooleanType)
+ ProcedureParameter.optional("stream_results", DataTypes.BooleanType),
+ ProcedureParameter.optional("snapshot_ids",
DataTypes.createArrayType(DataTypes.LongType))
};
private static final StructType OUTPUT_TYPE =
@@ -93,12 +94,14 @@ public class ExpireSnapshotsProcedure extends BaseProcedure
{
}
@Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
Long olderThanMillis = args.isNullAt(1) ? null :
DateTimeUtil.microsToMillis(args.getLong(1));
Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
+ long[] snapshotIds = args.isNullAt(5) ? null :
args.getArray(5).toLongArray();
Preconditions.checkArgument(
maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
@@ -132,6 +135,12 @@ public class ExpireSnapshotsProcedure extends
BaseProcedure {
}
}
+ if (snapshotIds != null) {
+ for (long snapshotId : snapshotIds) {
+ action.expireSnapshotId(snapshotId);
+ }
+ }
+
if (streamResult != null) {
action.option(
ExpireSnapshotsSparkAction.STREAM_RESULTS,
Boolean.toString(streamResult));