This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new dec006e81e6 HIVE-27771: Iceberg: Allow expire snapshot by time range.
(#4776). (Ayush Saxena, reviewed by Denys Kuzmenko)
dec006e81e6 is described below
commit dec006e81e657bc6edb0d57d31076f04e1f088c5
Author: Ayush Saxena <[email protected]>
AuthorDate: Mon Oct 9 16:48:11 2023 +0530
HIVE-27771: Iceberg: Allow expire snapshot by time range. (#4776). (Ayush
Saxena, reviewed by Denys Kuzmenko)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 22 +++++++++++++++++++-
.../mr/hive/TestHiveIcebergExpireSnapshots.java | 17 +++++++++++++++
.../hadoop/hive/ql/parse/AlterClauseParser.g | 2 ++
.../table/execute/AlterTableExecuteAnalyzer.java | 24 +++++++++++++++-------
.../hive/ql/parse/AlterTableExecuteSpec.java | 19 ++++++++++++++++-
5 files changed, 75 insertions(+), 9 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index bb9356df251..4e451403b02 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -834,7 +834,10 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
LOG.info("Executing expire snapshots on iceberg table {} with {}
threads", icebergTable.name(), numThreads);
deleteExecutorService = getDeleteExecutorService(icebergTable.name(),
numThreads);
}
- if (expireSnapshotsSpec.isExpireByIds()) {
+ if (expireSnapshotsSpec.isExpireByTimestampRange()) {
+ expireSnapshotByTimestampRange(icebergTable,
expireSnapshotsSpec.getFromTimestampMillis(),
+ expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService);
+ } else if (expireSnapshotsSpec.isExpireByIds()) {
expireSnapshotByIds(icebergTable,
expireSnapshotsSpec.getIdsToExpire(), deleteExecutorService);
} else {
expireSnapshotOlderThanTimestamp(icebergTable,
expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService);
@@ -846,6 +849,23 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
}
}
+ private void expireSnapshotByTimestampRange(Table icebergTable, Long
fromTimestamp, Long toTimestamp,
+ ExecutorService deleteExecutorService) {
+ ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
+ for (Snapshot snapshot : icebergTable.snapshots()) {
+ if (snapshot.timestampMillis() >= fromTimestamp &&
snapshot.timestampMillis() <= toTimestamp) {
+ expireSnapshots.expireSnapshotId(snapshot.snapshotId());
+ LOG.debug("Expiring snapshot on {} with id: {} and timestamp: {}",
icebergTable.name(), snapshot.snapshotId(),
+ snapshot.timestampMillis());
+ }
+ }
+ LOG.info("Expiring snapshot on {} within time range {} -> {}",
icebergTable.name(), fromTimestamp, toTimestamp);
+ if (deleteExecutorService != null) {
+ expireSnapshots.executeDeleteWith(deleteExecutorService);
+ }
+ expireSnapshots.commit();
+ }
+
private void expireSnapshotOlderThanTimestamp(Table icebergTable, Long
timestamp,
ExecutorService deleteExecutorService) {
ExpireSnapshots expireSnapshots =
icebergTable.expireSnapshots().expireOlderThan(timestamp);
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
index 08277fb7892..a851578ee6c 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
@@ -20,6 +20,8 @@
package org.apache.iceberg.mr.hive;
import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import org.apache.commons.collections4.IterableUtils;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -65,4 +67,19 @@ public class TestHiveIcebergExpireSnapshots extends
HiveIcebergStorageHandlerWit
table.refresh();
Assert.assertEquals(7, IterableUtils.size(table.snapshots()));
}
+
+ @Test
+ public void testExpireSnapshotsWithTimestampRange() throws IOException,
InterruptedException {
+ TableIdentifier identifier = TableIdentifier.of("default", "source");
+ Table table = testTables.createTableWithVersions(shell, identifier.name(),
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 10);
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS000000");
+ String fromTime = simpleDateFormat.format(new
Date(table.history().get(5).timestampMillis()));
+ String toTime = simpleDateFormat.format(new
Date(table.history().get(8).timestampMillis()));
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
EXPIRE_SNAPSHOTS BETWEEN" +
+ " '" + fromTime + "' AND '" + toTime + "'");
+ table.refresh();
+ Assert.assertEquals(6, IterableUtils.size(table.snapshots()));
+ }
}
diff --git
a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
index 48a2aed9617..46a00fe5c87 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
@@ -483,6 +483,8 @@ alterStatementSuffixExecute
-> ^(TOK_ALTERTABLE_EXECUTE KW_FAST_FORWARD $sourceBranch $targetBranch?)
| KW_EXECUTE KW_CHERRY_PICK snapshotId=Number
-> ^(TOK_ALTERTABLE_EXECUTE KW_CHERRY_PICK $snapshotId)
+ | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN (fromTimestamp=StringLiteral)
KW_AND (toTimestamp=StringLiteral)
+ -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp
$toTimestamp)
;
alterStatementSuffixDropBranch
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
index 81ed88849df..ddc12935700 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
@@ -29,6 +29,7 @@ import
org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableAnalyzer;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import java.time.ZoneId;
+import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
@@ -79,7 +81,8 @@ public class AlterTableExecuteAnalyzer extends
AbstractAlterTableAnalyzer {
desc = getRollbackDesc(tableName, partitionSpec, (ASTNode)
command.getChild(1));
break;
case HiveParser.KW_EXPIRE_SNAPSHOTS:
- desc = getExpireSnapshotDesc(tableName, partitionSpec, (ASTNode)
command.getChild(1));
+ desc = getExpireSnapshotDesc(tableName, partitionSpec,
command.getChildren());
+
break;
case HiveParser.KW_SET_CURRENT_SNAPSHOT:
desc = getSetCurrentSnapshotDesc(tableName, partitionSpec, (ASTNode)
command.getChild(1));
@@ -130,18 +133,25 @@ public class AlterTableExecuteAnalyzer extends
AbstractAlterTableAnalyzer {
}
private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName
tableName, Map<String, String> partitionSpec,
- ASTNode childNode) throws SemanticException {
+ List<Node> children) throws SemanticException {
AlterTableExecuteSpec<ExpireSnapshotsSpec> spec;
- // the second child must be the rollback parameter
ZoneId timeZone = SessionState.get() == null ?
new HiveConf().getLocalTimeZone() :
SessionState.get().getConf().getLocalTimeZone();
- String childText = PlanUtils.stripQuotes(childNode.getText().trim());
- if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(childText).matches()) {
- spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new
ExpireSnapshotsSpec(childText));
+ ASTNode firstNode = (ASTNode) children.get(1);
+ String firstNodeText = PlanUtils.stripQuotes(firstNode.getText().trim());
+ if (children.size() == 3) {
+ ASTNode secondNode = (ASTNode) children.get(2);
+ String secondNodeText =
PlanUtils.stripQuotes(secondNode.getText().trim());
+ TimestampTZ fromTime = TimestampTZUtil.parse(firstNodeText, timeZone);
+ TimestampTZ toTime = TimestampTZUtil.parse(secondNodeText, timeZone);
+ spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT,
+ new ExpireSnapshotsSpec(fromTime.toEpochMilli(),
toTime.toEpochMilli()));
+ } else if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(firstNodeText).matches()) {
+ spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new
ExpireSnapshotsSpec(firstNodeText));
} else {
- TimestampTZ time = TimestampTZUtil.parse(childText, timeZone);
+ TimestampTZ time = TimestampTZUtil.parse(firstNodeText, timeZone);
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new
ExpireSnapshotsSpec(time.toEpochMilli()));
}
return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
index fefeb267c29..c469b24415f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
@@ -109,6 +109,8 @@ public class AlterTableExecuteSpec<T> {
private long timestampMillis = -1L;
private String[] idsToExpire = null;
+ private long fromTimestampMillis = -1L;
+
public ExpireSnapshotsSpec(long timestampMillis) {
this.timestampMillis = timestampMillis;
}
@@ -117,10 +119,19 @@ public class AlterTableExecuteSpec<T> {
this.idsToExpire = ids.split(",");
}
+ public ExpireSnapshotsSpec(long fromTimestampMillis, long
toTimestampMillis) {
+ this.fromTimestampMillis = fromTimestampMillis;
+ this.timestampMillis = toTimestampMillis;
+ }
+
public Long getTimestampMillis() {
return timestampMillis;
}
+ public Long getFromTimestampMillis() {
+ return fromTimestampMillis;
+ }
+
public String[] getIdsToExpire() {
return idsToExpire;
}
@@ -129,10 +140,16 @@ public class AlterTableExecuteSpec<T> {
return idsToExpire != null;
}
+ public boolean isExpireByTimestampRange() {
+ return timestampMillis != -1 && fromTimestampMillis != -1;
+ }
+
@Override
public String toString() {
MoreObjects.ToStringHelper stringHelper =
MoreObjects.toStringHelper(this);
- if (isExpireByIds()) {
+ if (isExpireByTimestampRange()) {
+ stringHelper.add("fromTimestampMillis",
fromTimestampMillis).add("toTimestampMillis", timestampMillis);
+ } else if (isExpireByIds()) {
stringHelper.add("idsToExpire", Arrays.toString(idsToExpire));
} else {
stringHelper.add("timestampMillis", timestampMillis);