This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 76bc585c86f HIVE-27325: Iceberg: Expiring old snapshots deletes files 
with DirectExecutorService causing runtime delays. (#4302). (Ayush Saxena, 
reviewed by  Rajesh Balamohan, Attila Turoczy)
76bc585c86f is described below

commit 76bc585c86f50e2601230e836c9aed477caefcd4
Author: Ayush Saxena <[email protected]>
AuthorDate: Tue May 9 22:20:53 2023 +0530

    HIVE-27325: Iceberg: Expiring old snapshots deletes files with 
DirectExecutorService causing runtime delays. (#4302). (Ayush Saxena, reviewed 
by  Rajesh Balamohan, Attila Turoczy)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  3 +++
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 29 +++++++++++++++++++++-
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 60ef76474a0..a1bae8b0d34 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2207,6 +2207,9 @@ public class HiveConf extends Configuration {
         "Whether to use codec pool in ORC. Disable if there are bugs with 
codec reuse."),
     HIVE_ICEBERG_STATS_SOURCE("hive.iceberg.stats.source", "iceberg",
         "Use stats from iceberg table snapshot for query planning. This has 
two values metastore and iceberg"),
+    
HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS("hive.iceberg.expire.snapshot.numthreads",
 4,
+        "The number of threads to be used for deleting files during expire 
snapshot. If set to 0 or below it uses the" +
+            " defult DirectExecutorService"),
     HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true,
         "If this is set the header for RCFiles will simply be RCF.  If this is 
not\n" +
         "set the header will be that borrowed from sequence files, e.g. SEQ- 
followed\n" +
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index df2dcc1ff12..78611aa47ca 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -32,6 +32,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -661,7 +664,22 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
             hmsTable.getTableName());
         AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec =
             (AlterTableExecuteSpec.ExpireSnapshotsSpec) 
executeSpec.getOperationParams();
-        
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis()).commit();
+        int numThreads = 
conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
+            
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
+        if (numThreads > 0) {
+          LOG.info("Executing expire snapshots on iceberg table {} with {} 
threads", hmsTable.getCompleteName(),
+              numThreads);
+          final ExecutorService deleteExecutorService =
+              getDeleteExecutorService(hmsTable.getCompleteName(), numThreads);
+          try {
+            
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis())
+                .executeDeleteWith(deleteExecutorService).commit();
+          } finally {
+            deleteExecutorService.shutdown();
+          }
+        } else {
+          
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis()).commit();
+        }
         break;
       case SET_CURRENT_SNAPSHOT:
         AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec =
@@ -676,6 +694,15 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     }
   }
 
+  private static ExecutorService getDeleteExecutorService(String completeName, 
int numThreads) {
+    AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
+    return Executors.newFixedThreadPool(numThreads, runnable -> {
+      Thread thread = new Thread(runnable);
+      thread.setName("remove-snapshot-" + completeName + "-" + 
deleteThreadsIndex.getAndIncrement());
+      return thread;
+    });
+  }
+
   @Override
   public boolean isValidMetadataTable(String metaTableName) {
     return IcebergMetadataTables.isValidMetaTable(metaTableName);

Reply via email to