This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 246392dcbbe HIVE-27793: Iceberg: Support setting current snapshot with
SnapshotRef (#4797). (zhangbutao, reviewed by Ayush Saxena)
246392dcbbe is described below
commit 246392dcbbe466e36d0ea98c1d7eeaa48e579ad1
Author: Butao Zhang <[email protected]>
AuthorDate: Sat Oct 14 16:34:30 2023 +0800
HIVE-27793: Iceberg: Support setting current snapshot with SnapshotRef
(#4797). (zhangbutao, reviewed by Ayush Saxena)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 4 +--
.../apache/iceberg/mr/hive/IcebergTableUtil.java | 23 +++++++++++---
.../mr/hive/TestHiveIcebergSetCurrentSnapshot.java | 37 ++++++++++++++++++++++
.../hadoop/hive/ql/parse/AlterClauseParser.g | 2 +-
.../table/execute/AlterTableExecuteAnalyzer.java | 2 +-
.../hive/ql/parse/AlterTableExecuteSpec.java | 14 ++++----
6 files changed, 66 insertions(+), 16 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 98fb25b1d37..8214c9e6803 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -817,8 +817,8 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec =
(AlterTableExecuteSpec.SetCurrentSnapshotSpec)
executeSpec.getOperationParams();
LOG.debug("Executing set current snapshot operation on iceberg table
{}.{} to version {}", hmsTable.getDbName(),
- hmsTable.getTableName(), setSnapshotVersionSpec.getSnapshotId());
- IcebergTableUtil.setCurrentSnapshot(icebergTable,
setSnapshotVersionSpec.getSnapshotId());
+ hmsTable.getTableName(),
setSnapshotVersionSpec.getSnapshotIdOrRefName());
+ IcebergTableUtil.setCurrentSnapshot(icebergTable,
setSnapshotVersionSpec.getSnapshotIdOrRefName());
break;
case FAST_FORWARD:
AlterTableExecuteSpec.FastForwardSpec fastForwardSpec =
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 9bcd321bf70..8527e25cbfe 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.mr.hive;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
@@ -28,10 +29,12 @@ import
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.TransformSpec;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdatePartitionSpec;
@@ -235,13 +238,23 @@ public class IcebergTableUtil {
/**
* Set the current snapshot for the iceberg table
* @param table the iceberg table
- * @param value parameter of the rollback, that can be a timestamp in millis
or a snapshot id
+ * @param value parameter of the rollback, that can be a snapshot id or a
SnapshotRef name
*/
- public static void setCurrentSnapshot(Table table, Long value) {
+ public static void setCurrentSnapshot(Table table, String value) {
ManageSnapshots manageSnapshots = table.manageSnapshots();
- LOG.debug("Rolling the iceberg table {} from snapshot id {} to snapshot ID
{}", table.name(),
- table.currentSnapshot().snapshotId(), value);
- manageSnapshots.setCurrentSnapshot(value);
+ long snapshotId;
+ try {
+ snapshotId = Long.parseLong(value);
+ LOG.debug("Rolling the iceberg table {} from snapshot id {} to snapshot
ID {}", table.name(),
+ table.currentSnapshot().snapshotId(), snapshotId);
+ } catch (NumberFormatException e) {
+ String refName = PlanUtils.stripQuotes(value);
+ snapshotId =
Optional.ofNullable(table.refs().get(refName)).map(SnapshotRef::snapshotId).orElseThrow(()
->
+ new IllegalArgumentException(String.format("SnapshotRef %s does not
exist", refName)));
+ LOG.debug("Rolling the iceberg table {} from snapshot id {} to the
snapshot ID {} of SnapshotRef {}",
+ table.name(), table.currentSnapshot().snapshotId(), snapshotId,
refName);
+ }
+ manageSnapshots.setCurrentSnapshot(snapshotId);
manageSnapshots.commit();
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSetCurrentSnapshot.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSetCurrentSnapshot.java
index e45e4b26ee0..0fddae1b83b 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSetCurrentSnapshot.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSetCurrentSnapshot.java
@@ -21,9 +21,12 @@ package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
/**
@@ -60,4 +63,38 @@ public class TestHiveIcebergSetCurrentSnapshot extends
HiveIcebergStorageHandler
HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
currentResult),
HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
result4), 0);
}
+
+ @Test
+ public void testSetCurrentSnapshotBySnapshotRef() throws IOException,
InterruptedException {
+ // enough to test once
+ Assume.assumeTrue(fileFormat == FileFormat.ORC && isVectorized &&
+ testTableType == TestTables.TestTableType.HIVE_CATALOG);
+ TableIdentifier identifier = TableIdentifier.of("default", "source");
+ Table table =
+ testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
5);
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " CREATE TAG
test_tag");
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
SET_CURRENT_SNAPSHOT('test_tag')");
+ table.refresh();
+ Assert.assertEquals(table.currentSnapshot().snapshotId(),
table.refs().get("test_tag").snapshotId());
+
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " CREATE
BRANCH test_branch");
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
SET_CURRENT_SNAPSHOT('test_branch')");
+ table.refresh();
+ Assert.assertEquals(table.currentSnapshot().snapshotId(),
table.refs().get("test_branch").snapshotId());
+
+ AssertHelpers.assertThrows("should throw exception",
IllegalArgumentException.class,
+ "SnapshotRef unknown_ref does not exist", () -> {
+ shell.executeStatement("ALTER TABLE " + identifier.name() + "
EXECUTE SET_CURRENT_SNAPSHOT('unknown_ref')");
+ });
+
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
SET_CURRENT_SNAPSHOT" +
+ "(" + table.currentSnapshot().snapshotId() + ")");
+
+ AssertHelpers.assertThrows("should throw exception",
IllegalArgumentException.class,
+ "SnapshotRef " + table.currentSnapshot().snapshotId() + " does not
exist", () -> {
+ shell.executeStatement("ALTER TABLE " + identifier.name() + "
EXECUTE SET_CURRENT_SNAPSHOT" +
+ "('" + table.currentSnapshot().snapshotId() + "')");
+ });
+ }
}
diff --git
a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
index 46a00fe5c87..28d60d6262f 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
@@ -477,7 +477,7 @@ alterStatementSuffixExecute
-> ^(TOK_ALTERTABLE_EXECUTE KW_ROLLBACK $rollbackParam)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS LPAREN (expireParam=StringLiteral) RPAREN
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $expireParam)
- | KW_EXECUTE KW_SET_CURRENT_SNAPSHOT LPAREN (snapshotParam=Number) RPAREN
+ | KW_EXECUTE KW_SET_CURRENT_SNAPSHOT LPAREN (snapshotParam=expression)
RPAREN
-> ^(TOK_ALTERTABLE_EXECUTE KW_SET_CURRENT_SNAPSHOT $snapshotParam)
| KW_EXECUTE KW_FAST_FORWARD sourceBranch=StringLiteral
(targetBranch=StringLiteral)?
-> ^(TOK_ALTERTABLE_EXECUTE KW_FAST_FORWARD $sourceBranch $targetBranch?)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
index ddc12935700..8d4a902b56b 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
@@ -128,7 +128,7 @@ public class AlterTableExecuteAnalyzer extends
AbstractAlterTableAnalyzer {
ASTNode childNode) throws SemanticException {
AlterTableExecuteSpec<AlterTableExecuteSpec.SetCurrentSnapshotSpec> spec =
new AlterTableExecuteSpec(SET_CURRENT_SNAPSHOT,
- new
AlterTableExecuteSpec.SetCurrentSnapshotSpec(Long.valueOf(childNode.getText())));
+ new
AlterTableExecuteSpec.SetCurrentSnapshotSpec(childNode.getText()));
return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
index c469b24415f..5102959f087 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
@@ -161,23 +161,23 @@ public class AlterTableExecuteSpec<T> {
/**
* Value object class, that stores the set snapshot version operation
specific parameters
* <ul>
- * <li>snapshot Id: it should be a valid snapshot version</li>
+ * <li>snapshot Id: it should be a valid snapshot version or a SnapshotRef
name</li>
* </ul>
*/
public static class SetCurrentSnapshotSpec {
- private final long snapshotId;
+ private final String snapshotIdOrRefName;
- public SetCurrentSnapshotSpec(Long snapshotId) {
- this.snapshotId = snapshotId;
+ public SetCurrentSnapshotSpec(String snapshotIdOrRefName) {
+ this.snapshotIdOrRefName = snapshotIdOrRefName;
}
- public Long getSnapshotId() {
- return snapshotId;
+ public String getSnapshotIdOrRefName() {
+ return snapshotIdOrRefName;
}
@Override
public String toString() {
- return MoreObjects.toStringHelper(this).add("snapshotId",
snapshotId).toString();
+ return MoreObjects.toStringHelper(this).add("snapshotIdOrRefName",
snapshotIdOrRefName).toString();
}
}