This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 365b272bb [Improvement]: AMS frequently print warning logs of 
'Unclosed ResolvingFileIO' (#3124)
365b272bb is described below

commit 365b272bb34e25d19d2a8eb5735953242f56b796
Author: tcodehuber <[email protected]>
AuthorDate: Mon Sep 2 14:32:18 2024 +0800

    [Improvement]: AMS frequently print warning logs of 'Unclosed 
ResolvingFileIO' (#3124)
    
    * [Improvement]: AMS frequently print warning logs of 'Unclosed 
ResolvingFileIO'
    
    * code refactor
---
 .../server/catalog/InternalIcebergCatalogImpl.java | 34 ++++++++++++++++++----
 .../server/catalog/InternalMixedCatalogImpl.java   |  7 +++--
 2 files changed, 32 insertions(+), 9 deletions(-)

diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java
index ad41de77e..bfc4a70b8 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java
@@ -18,6 +18,9 @@
 
 package org.apache.amoro.server.catalog;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
 import org.apache.amoro.AmoroTable;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.api.CatalogMeta;
@@ -40,6 +43,7 @@ import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.rest.RESTCatalog;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
 
@@ -50,10 +54,13 @@ public class InternalIcebergCatalogImpl extends 
InternalCatalog {
   final int httpPort;
   final String exposedHost;
 
+  final Cache<AmoroTable<?>, FileIO> fileIOCloser;
+
   protected InternalIcebergCatalogImpl(CatalogMeta metadata, Configurations 
serverConfiguration) {
     super(metadata);
     this.httpPort = 
serverConfiguration.getInteger(AmoroManagementConf.HTTP_SERVER_PORT);
     this.exposedHost = 
serverConfiguration.getString(AmoroManagementConf.SERVER_EXPOSE_HOST);
+    this.fileIOCloser = newFileIOCloser();
   }
 
   @Override
@@ -96,12 +103,14 @@ public class InternalIcebergCatalogImpl extends 
InternalCatalog {
                 .toString());
     org.apache.amoro.table.TableIdentifier tableIdentifier =
         org.apache.amoro.table.TableIdentifier.of(name(), database, tableName);
-
-    return IcebergTable.newIcebergTable(
-        tableIdentifier,
-        table,
-        CatalogUtil.buildMetaStore(getMetadata()),
-        getMetadata().getCatalogProperties());
+    AmoroTable<?> amoroTable =
+        IcebergTable.newIcebergTable(
+            tableIdentifier,
+            table,
+            CatalogUtil.buildMetaStore(getMetadata()),
+            getMetadata().getCatalogProperties());
+    fileIOCloser.put(amoroTable, ops.io());
+    return amoroTable;
   }
 
   protected AuthenticatedFileIO fileIO(CatalogMeta catalogMeta) {
@@ -144,4 +153,17 @@ public class InternalIcebergCatalogImpl extends 
InternalCatalog {
     //noinspection unchecked
     return (InternalTableHandler<O>) new InternalIcebergHandler(getMetadata(), 
metadata);
   }
+
+  private Cache<AmoroTable<?>, FileIO> newFileIOCloser() {
+    return Caffeine.newBuilder()
+        .weakKeys()
+        .removalListener(
+            (RemovalListener<AmoroTable<?>, FileIO>)
+                (tbl, fileIO, cause) -> {
+                  if (null != fileIO) {
+                    fileIO.close();
+                  }
+                })
+        .build();
+  }
 }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java
index 48731d882..45d2cfb33 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java
@@ -142,9 +142,10 @@ public class InternalMixedCatalogImpl extends 
InternalIcebergCatalogImpl {
           new BasicUnkeyedTable(
               tableIdentifier, baseTable, fileIO, 
getMetadata().getCatalogProperties());
     }
-
-    return new org.apache.amoro.formats.mixed.MixedTable(
-        mixedIcebergTable, TableFormat.MIXED_ICEBERG);
+    AmoroTable<?> amoroTable =
+        new org.apache.amoro.formats.mixed.MixedTable(mixedIcebergTable, 
TableFormat.MIXED_ICEBERG);
+    fileIOCloser.put(amoroTable, fileIO);
+    return amoroTable;
   }
 
   protected TableFormat format() {

Reply via email to