This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 76bc585c86f HIVE-27325: Iceberg: Expiring old snapshots deletes files
with DirectExecutorService causing runtime delays. (#4302). (Ayush Saxena,
reviewed by Rajesh Balamohan, Attila Turoczy)
76bc585c86f is described below
commit 76bc585c86f50e2601230e836c9aed477caefcd4
Author: Ayush Saxena <[email protected]>
AuthorDate: Tue May 9 22:20:53 2023 +0530
HIVE-27325: Iceberg: Expiring old snapshots deletes files with
DirectExecutorService causing runtime delays. (#4302). (Ayush Saxena, reviewed
by Rajesh Balamohan, Attila Turoczy)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 +++
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 29 +++++++++++++++++++++-
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 60ef76474a0..a1bae8b0d34 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2207,6 +2207,9 @@ public class HiveConf extends Configuration {
"Whether to use codec pool in ORC. Disable if there are bugs with
codec reuse."),
HIVE_ICEBERG_STATS_SOURCE("hive.iceberg.stats.source", "iceberg",
"Use stats from iceberg table snapshot for query planning. This has
two values metastore and iceberg"),
+
HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS("hive.iceberg.expire.snapshot.numthreads",
4,
+ "The number of threads to be used for deleting files during expire
snapshot. If set to 0 or below it uses the" +
+ " defult DirectExecutorService"),
HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is
not\n" +
"set the header will be that borrowed from sequence files, e.g. SEQ-
followed\n" +
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index df2dcc1ff12..78611aa47ca 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -32,6 +32,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -661,7 +664,22 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
hmsTable.getTableName());
AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec =
(AlterTableExecuteSpec.ExpireSnapshotsSpec)
executeSpec.getOperationParams();
-
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis()).commit();
+ int numThreads =
conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
+
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
+ if (numThreads > 0) {
+ LOG.info("Executing expire snapshots on iceberg table {} with {}
threads", hmsTable.getCompleteName(),
+ numThreads);
+ final ExecutorService deleteExecutorService =
+ getDeleteExecutorService(hmsTable.getCompleteName(), numThreads);
+ try {
+
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis())
+ .executeDeleteWith(deleteExecutorService).commit();
+ } finally {
+ deleteExecutorService.shutdown();
+ }
+ } else {
+
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis()).commit();
+ }
break;
case SET_CURRENT_SNAPSHOT:
AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec =
@@ -676,6 +694,15 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
}
}
+ private static ExecutorService getDeleteExecutorService(String completeName,
int numThreads) {
+ AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
+ return Executors.newFixedThreadPool(numThreads, runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("remove-snapshot-" + completeName + "-" +
deleteThreadsIndex.getAndIncrement());
+ return thread;
+ });
+ }
+
@Override
public boolean isValidMetadataTable(String metaTableName) {
return IcebergMetadataTables.isValidMetaTable(metaTableName);