This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e2920af40 Hive: close the fileIO client when closing the hive catalog 
(#10771)
7e2920af40 is described below

commit 7e2920af400e5d559f8f1742b1043787b0477d74
Author: Hussein Awala <[email protected]>
AuthorDate: Thu Jul 25 19:50:19 2024 +0200

    Hive: close the fileIO client when closing the hive catalog (#10771)
    
    Co-authored-by: Amogh Jahagirdar <[email protected]>
    Co-authored-by: Eduard Tudenhoefner <[email protected]>
---
 .../java/org/apache/iceberg/hive/HiveCatalog.java  | 33 +++++++++++++++++++++-
 1 file changed, 32 insertions(+), 1 deletion(-)

diff --git 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index b4f49e29fc..8944cf9394 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -18,6 +18,10 @@
  */
 package org.apache.iceberg.hive;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -79,6 +83,7 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
   private ClientPool<IMetaStoreClient, TException> clients;
   private boolean listAllTables = false;
   private Map<String, String> catalogProperties;
+  private Cache<TableOperations, FileIO> fileIOCloser;
 
   public HiveCatalog() {}
 
@@ -111,6 +116,20 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
             : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
 
     this.clients = new CachedClientPool(conf, properties);
+    this.fileIOCloser = newFileIOCloser();
+  }
+
+  private Cache<TableOperations, FileIO> newFileIOCloser() {
+    return Caffeine.newBuilder()
+        .weakKeys()
+        .removalListener(
+            (RemovalListener<TableOperations, FileIO>)
+                (ops, fileIOInstance, cause) -> {
+                  if (null != fileIOInstance) {
+                    fileIOInstance.close();
+                  }
+                })
+        .build();
   }
 
   @Override
@@ -512,7 +531,10 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
   public TableOperations newTableOps(TableIdentifier tableIdentifier) {
     String dbName = tableIdentifier.namespace().level(0);
     String tableName = tableIdentifier.name();
-    return new HiveTableOperations(conf, clients, fileIO, name, dbName, 
tableName);
+    HiveTableOperations ops =
+        new HiveTableOperations(conf, clients, fileIO, name, dbName, 
tableName);
+    fileIOCloser.put(ops, ops.io());
+    return ops;
   }
 
   @Override
@@ -636,6 +658,15 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
     return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
   }
 
+  @Override
+  public void close() throws IOException {
+    super.close();
+    if (fileIOCloser != null) {
+      fileIOCloser.invalidateAll();
+      fileIOCloser.cleanUp();
+    }
+  }
+
   @VisibleForTesting
   void setListAllTables(boolean listAllTables) {
     this.listAllTables = listAllTables;

Reply via email to