This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new dec006e81e6 HIVE-27771: Iceberg: Allow expire snapshot by time range. 
(#4776). (Ayush Saxena, reviewed by Denys Kuzmenko)
dec006e81e6 is described below

commit dec006e81e657bc6edb0d57d31076f04e1f088c5
Author: Ayush Saxena <[email protected]>
AuthorDate: Mon Oct 9 16:48:11 2023 +0530

    HIVE-27771: Iceberg: Allow expire snapshot by time range. (#4776). (Ayush 
Saxena, reviewed by Denys Kuzmenko)
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 22 +++++++++++++++++++-
 .../mr/hive/TestHiveIcebergExpireSnapshots.java    | 17 +++++++++++++++
 .../hadoop/hive/ql/parse/AlterClauseParser.g       |  2 ++
 .../table/execute/AlterTableExecuteAnalyzer.java   | 24 +++++++++++++++-------
 .../hive/ql/parse/AlterTableExecuteSpec.java       | 19 ++++++++++++++++-
 5 files changed, 75 insertions(+), 9 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index bb9356df251..4e451403b02 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -834,7 +834,10 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
         LOG.info("Executing expire snapshots on iceberg table {} with {} 
threads", icebergTable.name(), numThreads);
         deleteExecutorService = getDeleteExecutorService(icebergTable.name(), 
numThreads);
       }
-      if (expireSnapshotsSpec.isExpireByIds()) {
+      if (expireSnapshotsSpec.isExpireByTimestampRange()) {
+        expireSnapshotByTimestampRange(icebergTable, 
expireSnapshotsSpec.getFromTimestampMillis(),
+            expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService);
+      } else if (expireSnapshotsSpec.isExpireByIds()) {
         expireSnapshotByIds(icebergTable, 
expireSnapshotsSpec.getIdsToExpire(), deleteExecutorService);
       } else {
         expireSnapshotOlderThanTimestamp(icebergTable, 
expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService);
@@ -846,6 +849,23 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     }
   }
 
+  private void expireSnapshotByTimestampRange(Table icebergTable, Long 
fromTimestamp, Long toTimestamp,
+      ExecutorService deleteExecutorService) {
+    ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
+    for (Snapshot snapshot : icebergTable.snapshots()) {
+      if (snapshot.timestampMillis() >= fromTimestamp && 
snapshot.timestampMillis() <= toTimestamp) {
+        expireSnapshots.expireSnapshotId(snapshot.snapshotId());
+        LOG.debug("Expiring snapshot on {} with id: {} and timestamp: {}", 
icebergTable.name(), snapshot.snapshotId(),
+            snapshot.timestampMillis());
+      }
+    }
+    LOG.info("Expiring snapshot on {} within time range {} -> {}", 
icebergTable.name(), fromTimestamp, toTimestamp);
+    if (deleteExecutorService != null) {
+      expireSnapshots.executeDeleteWith(deleteExecutorService);
+    }
+    expireSnapshots.commit();
+  }
+
   private void expireSnapshotOlderThanTimestamp(Table icebergTable, Long 
timestamp,
       ExecutorService deleteExecutorService) {
     ExpireSnapshots expireSnapshots = 
icebergTable.expireSnapshots().expireOlderThan(timestamp);
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
index 08277fb7892..a851578ee6c 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
@@ -20,6 +20,8 @@
 package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import org.apache.commons.collections4.IterableUtils;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -65,4 +67,19 @@ public class TestHiveIcebergExpireSnapshots extends 
HiveIcebergStorageHandlerWit
     table.refresh();
     Assert.assertEquals(7,  IterableUtils.size(table.snapshots()));
   }
+
+  @Test
+    public void testExpireSnapshotsWithTimestampRange() throws IOException, 
InterruptedException {
+    TableIdentifier identifier = TableIdentifier.of("default", "source");
+    Table table = testTables.createTableWithVersions(shell, identifier.name(),
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 10);
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS000000");
+    String fromTime = simpleDateFormat.format(new 
Date(table.history().get(5).timestampMillis()));
+    String toTime = simpleDateFormat.format(new 
Date(table.history().get(8).timestampMillis()));
+    shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE 
EXPIRE_SNAPSHOTS BETWEEN" +
+        " '" + fromTime + "' AND '" + toTime + "'");
+    table.refresh();
+    Assert.assertEquals(6, IterableUtils.size(table.snapshots()));
+  }
 }
diff --git 
a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g 
b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
index 48a2aed9617..46a00fe5c87 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
@@ -483,6 +483,8 @@ alterStatementSuffixExecute
     -> ^(TOK_ALTERTABLE_EXECUTE KW_FAST_FORWARD $sourceBranch $targetBranch?)
     | KW_EXECUTE KW_CHERRY_PICK snapshotId=Number
     -> ^(TOK_ALTERTABLE_EXECUTE KW_CHERRY_PICK $snapshotId)
+    | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN (fromTimestamp=StringLiteral) 
KW_AND (toTimestamp=StringLiteral)
+    -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp 
$toTimestamp)
     ;
 
 alterStatementSuffixDropBranch
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
index 81ed88849df..ddc12935700 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableAnalyzer;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.HiveParser;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
 import java.time.ZoneId;
+import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 
@@ -79,7 +81,8 @@ public class AlterTableExecuteAnalyzer extends 
AbstractAlterTableAnalyzer {
         desc = getRollbackDesc(tableName, partitionSpec, (ASTNode) 
command.getChild(1));
         break;
       case HiveParser.KW_EXPIRE_SNAPSHOTS:
-        desc = getExpireSnapshotDesc(tableName, partitionSpec, (ASTNode) 
command.getChild(1));
+        desc = getExpireSnapshotDesc(tableName, partitionSpec,  
command.getChildren());
+
         break;
       case HiveParser.KW_SET_CURRENT_SNAPSHOT:
         desc = getSetCurrentSnapshotDesc(tableName, partitionSpec, (ASTNode) 
command.getChild(1));
@@ -130,18 +133,25 @@ public class AlterTableExecuteAnalyzer extends 
AbstractAlterTableAnalyzer {
   }
 
   private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName 
tableName, Map<String, String> partitionSpec,
-      ASTNode childNode) throws SemanticException {
+      List<Node> children) throws SemanticException {
     AlterTableExecuteSpec<ExpireSnapshotsSpec> spec;
-    // the second child must be the rollback parameter
 
     ZoneId timeZone = SessionState.get() == null ?
         new HiveConf().getLocalTimeZone() :
         SessionState.get().getConf().getLocalTimeZone();
-    String childText = PlanUtils.stripQuotes(childNode.getText().trim());
-    if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(childText).matches()) {
-      spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new 
ExpireSnapshotsSpec(childText));
+    ASTNode firstNode = (ASTNode) children.get(1);
+    String firstNodeText = PlanUtils.stripQuotes(firstNode.getText().trim());
+    if (children.size() == 3) {
+      ASTNode secondNode = (ASTNode) children.get(2);
+      String secondNodeText = 
PlanUtils.stripQuotes(secondNode.getText().trim());
+      TimestampTZ fromTime = TimestampTZUtil.parse(firstNodeText, timeZone);
+      TimestampTZ toTime = TimestampTZUtil.parse(secondNodeText, timeZone);
+      spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT,
+          new ExpireSnapshotsSpec(fromTime.toEpochMilli(), 
toTime.toEpochMilli()));
+    } else if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(firstNodeText).matches()) {
+      spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new 
ExpireSnapshotsSpec(firstNodeText));
     } else {
-      TimestampTZ time = TimestampTZUtil.parse(childText, timeZone);
+      TimestampTZ time = TimestampTZUtil.parse(firstNodeText, timeZone);
       spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new 
ExpireSnapshotsSpec(time.toEpochMilli()));
     }
     return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
index fefeb267c29..c469b24415f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
@@ -109,6 +109,8 @@ public class AlterTableExecuteSpec<T> {
     private long timestampMillis = -1L;
     private String[] idsToExpire = null;
 
+    private long fromTimestampMillis = -1L;
+
     public ExpireSnapshotsSpec(long timestampMillis) {
       this.timestampMillis = timestampMillis;
     }
@@ -117,10 +119,19 @@ public class AlterTableExecuteSpec<T> {
       this.idsToExpire = ids.split(",");
     }
 
+    public ExpireSnapshotsSpec(long fromTimestampMillis, long 
toTimestampMillis) {
+      this.fromTimestampMillis = fromTimestampMillis;
+      this.timestampMillis = toTimestampMillis;
+    }
+
     public Long getTimestampMillis() {
       return timestampMillis;
     }
 
+    public Long getFromTimestampMillis() {
+      return fromTimestampMillis;
+    }
+
     public String[] getIdsToExpire() {
       return idsToExpire;
     }
@@ -129,10 +140,16 @@ public class AlterTableExecuteSpec<T> {
       return idsToExpire != null;
     }
 
+    public boolean isExpireByTimestampRange() {
+      return timestampMillis != -1 && fromTimestampMillis != -1;
+    }
+
     @Override
     public String toString() {
       MoreObjects.ToStringHelper stringHelper = 
MoreObjects.toStringHelper(this);
-      if (isExpireByIds()) {
+      if (isExpireByTimestampRange()) {
+        stringHelper.add("fromTimestampMillis", 
fromTimestampMillis).add("toTimestampMillis", timestampMillis);
+      } else if (isExpireByIds()) {
         stringHelper.add("idsToExpire", Arrays.toString(idsToExpire));
       } else {
         stringHelper.add("timestampMillis", timestampMillis);

Reply via email to