This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 7e2920af40 Hive: close the fileIO client when closing the hive catalog
(#10771)
7e2920af40 is described below
commit 7e2920af400e5d559f8f1742b1043787b0477d74
Author: Hussein Awala <[email protected]>
AuthorDate: Thu Jul 25 19:50:19 2024 +0200
Hive: close the fileIO client when closing the hive catalog (#10771)
Co-authored-by: Amogh Jahagirdar <[email protected]>
Co-authored-by: Eduard Tudenhoefner <[email protected]>
---
.../java/org/apache/iceberg/hive/HiveCatalog.java | 33 +++++++++++++++++++++-
1 file changed, 32 insertions(+), 1 deletion(-)
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index b4f49e29fc..8944cf9394 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -18,6 +18,10 @@
*/
package org.apache.iceberg.hive;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -79,6 +83,7 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;
private Map<String, String> catalogProperties;
+ private Cache<TableOperations, FileIO> fileIOCloser;
public HiveCatalog() {}
@@ -111,6 +116,20 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
: CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
this.clients = new CachedClientPool(conf, properties);
+ this.fileIOCloser = newFileIOCloser();
+ }
+
+ private Cache<TableOperations, FileIO> newFileIOCloser() {
+ return Caffeine.newBuilder()
+ .weakKeys()
+ .removalListener(
+ (RemovalListener<TableOperations, FileIO>)
+ (ops, fileIOInstance, cause) -> {
+ if (null != fileIOInstance) {
+ fileIOInstance.close();
+ }
+ })
+ .build();
}
@Override
@@ -512,7 +531,10 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
public TableOperations newTableOps(TableIdentifier tableIdentifier) {
String dbName = tableIdentifier.namespace().level(0);
String tableName = tableIdentifier.name();
- return new HiveTableOperations(conf, clients, fileIO, name, dbName,
tableName);
+ HiveTableOperations ops =
+ new HiveTableOperations(conf, clients, fileIO, name, dbName,
tableName);
+ fileIOCloser.put(ops, ops.io());
+ return ops;
}
@Override
@@ -636,6 +658,15 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}
+ @Override
+ public void close() throws IOException {
+ super.close();
+ if (fileIOCloser != null) {
+ fileIOCloser.invalidateAll();
+ fileIOCloser.cleanUp();
+ }
+ }
+
@VisibleForTesting
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;