This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4c06b4c35d Spark 3.3: SQL Extensions for CREATE TAG (#6637)
4c06b4c35d is described below
commit 4c06b4c35d2bc7c3e639755218db1b1f5f6d1c74
Author: Liwei Li <[email protected]>
AuthorDate: Sat Feb 11 06:00:04 2023 +0800
Spark 3.3: SQL Extensions for CREATE TAG (#6637)
Co-authored-by: Amogh Jahagirdar <[email protected]>
Co-authored-by: chidayong <[email protected]>
---
.../IcebergSqlExtensions.g4 | 18 +-
.../IcebergSparkSqlExtensionsParser.scala | 5 +-
.../IcebergSqlExtensionsAstBuilder.scala | 31 +++
.../plans/logical/CreateOrReplaceTag.scala | 38 +++
.../sql/catalyst/plans/logical/TagOptions.scala | 22 ++
.../datasources/v2/CreateOrReplaceTagExec.scala | 72 ++++++
.../v2/ExtendedDataSourceV2Strategy.scala | 4 +
.../iceberg/spark/extensions/TestTagDDL.java | 274 +++++++++++++++++++++
8 files changed, 460 insertions(+), 4 deletions(-)
diff --git
a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
index d1ab06f852..ef070057a3 100644
---
a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
+++
b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
@@ -73,7 +73,13 @@ statement
| ALTER TABLE multipartIdentifier WRITE writeSpec
#setWriteDistributionAndOrdering
| ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList
#setIdentifierFields
| ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList
#dropIdentifierFields
- | ALTER TABLE multipartIdentifier createReplaceBranchClause
#createOrReplaceBranch
+ | ALTER TABLE multipartIdentifier createReplaceBranchClause
#createOrReplaceBranch
+ | ALTER TABLE multipartIdentifier createReplaceTagClause
#createOrReplaceTag
+ ;
+
+createReplaceTagClause
+ : (CREATE OR)? REPLACE TAG identifier tagOptions
+ | CREATE TAG (IF NOT EXISTS)? identifier tagOptions
;
createReplaceBranchClause
@@ -81,8 +87,13 @@ createReplaceBranchClause
| CREATE BRANCH (IF NOT EXISTS)? identifier branchOptions
;
+tagOptions
+ : (AS OF VERSION snapshotId)? (refRetain)?
+ ;
+
branchOptions
- : (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)?;
+ : (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)?
+ ;
snapshotRetention
: WITH SNAPSHOT RETENTION minSnapshotsToKeep
@@ -197,7 +208,7 @@ fieldList
nonReserved
: ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC |
DROP | EXISTS | FIELD | FIRST | HOURS | IF | LAST | NOT | NULLS | OF | OR |
ORDERED | PARTITION | TABLE | WRITE
| DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN
| VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS
- | TRUE | FALSE
+ | TAG | TRUE | FALSE
| MAP
;
@@ -251,6 +262,7 @@ SET: 'SET';
SNAPSHOT: 'SNAPSHOT';
SNAPSHOTS: 'SNAPSHOTS';
TABLE: 'TABLE';
+TAG: 'TAG';
UNORDERED: 'UNORDERED';
VERSION: 'VERSION';
WITH: 'WITH';
diff --git
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
index 8d3250a7de..946c10d193 100644
---
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
+++
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
@@ -210,7 +210,10 @@ class IcebergSparkSqlExtensionsParser(delegate:
ParserInterface) extends ParserI
}
private def isSnapshotRefDdl(normalized: String): Boolean = {
- normalized.contains("create branch") || normalized.contains("replace
branch")
+ normalized.contains("create branch") ||
+ normalized.contains("replace branch") ||
+ normalized.contains("create tag") ||
+ normalized.contains("replace tag")
}
protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser
=> T): T = {
diff --git
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
index 11c60b610c..b02897aa76 100644
---
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
+++
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
@@ -41,6 +41,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.BranchOptions
import org.apache.spark.sql.catalyst.plans.logical.CallArgument
import org.apache.spark.sql.catalyst.plans.logical.CallStatement
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -49,6 +50,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.plans.logical.TagOptions
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.connector.expressions
@@ -130,6 +132,35 @@ class IcebergSqlExtensionsAstBuilder(delegate:
ParserInterface) extends IcebergS
ifNotExists)
}
+ /**
+ * Create an CREATE OR REPLACE TAG logical command.
+ */
+ override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext):
CreateOrReplaceTag = withOrigin(ctx) {
+ val createTagClause = ctx.createReplaceTagClause()
+
+ val tagName = createTagClause.identifier().getText
+
+ val tagOptionsContext = Option(createTagClause.tagOptions())
+ val snapshotId = tagOptionsContext.flatMap(tagOptions =>
Option(tagOptions.snapshotId()))
+ .map(_.getText.toLong)
+ val tagRetain = tagOptionsContext.flatMap(tagOptions =>
Option(tagOptions.refRetain()))
+ val tagRefAgeMs = tagRetain.map(retain =>
+
TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong))
+ val tagOptions = TagOptions(
+ snapshotId,
+ tagRefAgeMs
+ )
+
+ val replace = createTagClause.REPLACE() != null
+ val ifNotExists = createTagClause.EXISTS() != null
+
+ CreateOrReplaceTag(typedVisit[Seq[String]](ctx.multipartIdentifier),
+ tagName,
+ tagOptions,
+ replace,
+ ifNotExists)
+ }
+
/**
* Create an REPLACE PARTITION FIELD logical command.
*/
diff --git
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala
new file mode 100644
index 0000000000..e48f7d8ed0
--- /dev/null
+++
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+case class CreateOrReplaceTag(
+ table: Seq[String],
+ tag: String,
+ tagOptions: TagOptions,
+ replace: Boolean,
+ ifNotExists: Boolean) extends LeafCommand {
+
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override lazy val output: Seq[Attribute] = Nil
+
+ override def simpleString(maxFields: Int): String = {
+ s"CreateOrReplaceTag tag: ${tag} for table: ${table.quoted}"
+ }
+}
diff --git
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala
new file mode 100644
index 0000000000..85e3b95f4a
--- /dev/null
+++
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+case class TagOptions(snapshotId: Option[Long], snapshotRefRetain:
Option[Long])
diff --git
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala
new file mode 100644
index 0000000000..d41f9f03ff
--- /dev/null
+++
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.TagOptions
+import org.apache.spark.sql.connector.catalog._
+
+case class CreateOrReplaceTagExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ tag: String,
+ tagOptions: TagOptions,
+ replace: Boolean,
+ ifNotExists: Boolean) extends LeafV2CommandExec {
+
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override lazy val output: Seq[Attribute] = Nil
+
+ override protected def run(): Seq[InternalRow] = {
+ catalog.loadTable(ident) match {
+ case iceberg: SparkTable =>
+ val snapshotId =
tagOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId())
+ val manageSnapshot = iceberg.table.manageSnapshots()
+ if (!replace) {
+ val ref = iceberg.table().refs().get(tag);
+ if (ref != null && ifNotExists) {
+ return Nil
+ }
+
+ manageSnapshot.createTag(tag, snapshotId)
+ } else {
+ manageSnapshot.replaceTag(tag, snapshotId)
+ }
+
+ if (tagOptions.snapshotRefRetain.nonEmpty) {
+ manageSnapshot.setMaxRefAgeMs(tag, tagOptions.snapshotRefRetain.get)
+ }
+
+ manageSnapshot.commit()
+
+ case table =>
+ throw new UnsupportedOperationException(s"Cannot create tag to
non-Iceberg table: $table")
+ }
+
+ Nil
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"Create tag: ${tag} for table: ${ident.quoted}"
+ }
+}
diff --git
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index 7e343534de..6c7ededf88 100644
---
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -32,6 +32,7 @@ import
org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.Call
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
@@ -66,6 +67,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession)
extends Strategy wi
IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions,
replace, ifNotExists) =>
CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions,
replace, ifNotExists) :: Nil
+ case CreateOrReplaceTag(IcebergCatalogAndIdentifier(catalog, ident), tag,
tagOptions, replace, ifNotExists) =>
+ CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, replace,
ifNotExists) :: Nil
+
case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident),
transform) =>
DropPartitionFieldExec(catalog, ident, transform) :: Nil
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
new file mode 100644
index 0000000000..def040c15e
--- /dev/null
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestTagDDL extends SparkExtensionsTestBase {
+ private static final String[] TIME_UNITS = {"DAYS", "HOURS", "MINUTES"};
+
+ @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1},
config = {2}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties()
+ }
+ };
+ }
+
+ public TestTagDDL(String catalogName, String implementation, Map<String,
String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @Before
+ public void before() {
+ sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testCreateTagWithRetain() throws NoSuchTableException {
+ Table table = insertRows();
+ long firstSnapshotId = table.currentSnapshot().snapshotId();
+ long maxRefAge = 10L;
+
+ List<SimpleRecord> records =
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+ df.writeTo(tableName).append();
+
+ for (String timeUnit : TIME_UNITS) {
+ String tagName = "t1" + timeUnit;
+ sql(
+ "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d %s",
+ tableName, tagName, firstSnapshotId, maxRefAge, timeUnit);
+ table.refresh();
+ SnapshotRef ref = table.refs().get(tagName);
+ Assert.assertEquals(firstSnapshotId, ref.snapshotId());
+ Assert.assertEquals(
+
TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge),
+ ref.maxRefAgeMs().longValue());
+ }
+
+ String tagName = "t1";
+ AssertHelpers.assertThrows(
+ "Illegal statement",
+ IcebergParseException.class,
+ "mismatched input",
+ () ->
+ sql(
+ "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN",
+ tableName, tagName, firstSnapshotId, maxRefAge));
+
+ AssertHelpers.assertThrows(
+ "Illegal statement",
+ IcebergParseException.class,
+ "mismatched input",
+ () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", tableName,
tagName, "abc"));
+
+ AssertHelpers.assertThrows(
+ "Illegal statement",
+ IcebergParseException.class,
+ "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}",
+ () ->
+ sql(
+ "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d
SECONDS",
+ tableName, tagName, firstSnapshotId, maxRefAge));
+ }
+
+ @Test
+ public void testCreateTagUseDefaultConfig() throws NoSuchTableException {
+ Table table = insertRows();
+ long snapshotId = table.currentSnapshot().snapshotId();
+ String tagName = "t1";
+
+ AssertHelpers.assertThrows(
+ "unknown snapshot",
+ ValidationException.class,
+ "unknown snapshot: -1",
+ () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName,
tagName, -1));
+
+ sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName);
+ table.refresh();
+ SnapshotRef ref = table.refs().get(tagName);
+ Assert.assertEquals(snapshotId, ref.snapshotId());
+ Assert.assertNull(ref.maxRefAgeMs());
+
+ AssertHelpers.assertThrows(
+ "Cannot create an exist tag",
+ IllegalArgumentException.class,
+ "already exists",
+ () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName));
+
+ AssertHelpers.assertThrows(
+ "Non-conforming tag name",
+ IcebergParseException.class,
+ "mismatched input '123'",
+ () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "123"));
+
+ table.manageSnapshots().removeTag(tagName).commit();
+ List<SimpleRecord> records =
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+ df.writeTo(tableName).append();
+ snapshotId = table.currentSnapshot().snapshotId();
+ sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName,
snapshotId);
+ table.refresh();
+ ref = table.refs().get(tagName);
+ Assert.assertEquals(snapshotId, ref.snapshotId());
+ Assert.assertNull(ref.maxRefAgeMs());
+ }
+
+ @Test
+ public void testCreateTagIfNotExists() throws NoSuchTableException {
+ long maxSnapshotAge = 2L;
+ Table table = insertRows();
+ String tagName = "t1";
+ sql("ALTER TABLE %s CREATE TAG %s RETAIN %d days", tableName, tagName,
maxSnapshotAge);
+ sql("ALTER TABLE %s CREATE TAG IF NOT EXISTS %s", tableName, tagName);
+
+ table.refresh();
+ SnapshotRef ref = table.refs().get(tagName);
+ Assert.assertEquals(table.currentSnapshot().snapshotId(),
ref.snapshotId());
+ Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge),
ref.maxRefAgeMs().longValue());
+ }
+
+ @Test
+ public void testReplaceTagFailsForBranch() throws NoSuchTableException {
+ String branchName = "branch1";
+ Table table = insertRows();
+ long first = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createBranch(branchName, first).commit();
+ insertRows();
+ long second = table.currentSnapshot().snapshotId();
+
+ AssertHelpers.assertThrows(
+ "Cannot perform replace tag on branches",
+ IllegalArgumentException.class,
+ "Ref branch1 is a branch not a tag",
+ () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName,
second));
+ }
+
+ @Test
+ public void testReplaceTag() throws NoSuchTableException {
+ Table table = insertRows();
+ long first = table.currentSnapshot().snapshotId();
+ String tagName = "t1";
+ long expectedMaxRefAgeMs = 1000;
+ table
+ .manageSnapshots()
+ .createTag(tagName, first)
+ .setMaxRefAgeMs(tagName, expectedMaxRefAgeMs)
+ .commit();
+
+ insertRows();
+ long second = table.currentSnapshot().snapshotId();
+
+ sql("ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", tableName, tagName,
second);
+ table.refresh();
+ SnapshotRef ref = table.refs().get(tagName);
+ Assert.assertEquals(second, ref.snapshotId());
+ Assert.assertEquals(expectedMaxRefAgeMs, ref.maxRefAgeMs().longValue());
+ }
+
+ @Test
+ public void testReplaceTagDoesNotExist() throws NoSuchTableException {
+ Table table = insertRows();
+
+ AssertHelpers.assertThrows(
+ "Cannot perform replace tag on tag which does not exist",
+ IllegalArgumentException.class,
+ "Tag does not exist",
+ () ->
+ sql(
+ "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d",
+ tableName, "someTag", table.currentSnapshot().snapshotId()));
+ }
+
+ @Test
+ public void testReplaceTagWithRetain() throws NoSuchTableException {
+ Table table = insertRows();
+ long first = table.currentSnapshot().snapshotId();
+ String tagName = "t1";
+ table.manageSnapshots().createTag(tagName, first).commit();
+ insertRows();
+ long second = table.currentSnapshot().snapshotId();
+
+ long maxRefAge = 10;
+ for (String timeUnit : TIME_UNITS) {
+ sql(
+ "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d RETAIN %d %s",
+ tableName, tagName, second, maxRefAge, timeUnit);
+
+ table.refresh();
+ SnapshotRef ref = table.refs().get(tagName);
+ Assert.assertEquals(second, ref.snapshotId());
+ Assert.assertEquals(
+ TimeUnit.valueOf(timeUnit).toMillis(maxRefAge),
ref.maxRefAgeMs().longValue());
+ }
+ }
+
+ @Test
+ public void testCreateOrReplace() throws NoSuchTableException {
+ Table table = insertRows();
+ long first = table.currentSnapshot().snapshotId();
+ String tagName = "t1";
+ insertRows();
+ long second = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createTag(tagName, second).commit();
+
+ sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName,
tagName, first);
+ table.refresh();
+ SnapshotRef ref = table.refs().get(tagName);
+ Assert.assertEquals(first, ref.snapshotId());
+ }
+
+ private Table insertRows() throws NoSuchTableException {
+ List<SimpleRecord> records =
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+ df.writeTo(tableName).append();
+ return validationCatalog.loadTable(tableIdent);
+ }
+}