This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0474236fdb AWS: Add FileIO tracker/closer to Glue catalog (#8315)
0474236fdb is described below

commit 0474236fdbf447c5246bdbbfb432558d0ab8bdcf
Author: Ashok <[email protected]>
AuthorDate: Tue Aug 22 00:06:51 2023 +0530

    AWS: Add FileIO tracker/closer to Glue catalog (#8315)
---
 .../org/apache/iceberg/aws/glue/GlueCatalog.java   | 61 ++++++++++++++++------
 1 file changed, 45 insertions(+), 16 deletions(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java 
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
index f96145746c..0338d1da5b 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iceberg.aws.glue;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
@@ -48,6 +51,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.Configurable;
 import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -91,6 +95,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
   private LockManager lockManager;
   private CloseableGroup closeableGroup;
   private Map<String, String> catalogProperties;
+  private Cache<TableOperations, FileIO> fileIOCloser;
 
   // Attempt to set versionId if available on the path
   private static final DynMethods.UnboundMethod SET_VERSION_ID =
@@ -183,6 +188,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
     closeableGroup.addCloseable(glue);
     closeableGroup.addCloseable(lockManager);
     closeableGroup.setSuppressCloseFailure(true);
+    this.fileIOCloser = newFileIOCloser();
   }
 
   @Override
@@ -217,24 +223,30 @@ public class GlueCatalog extends BaseMetastoreCatalog
 
       // FileIO initialization depends on tableSpecificCatalogProperties, so a 
new FileIO is
       // initialized each time
-      return new GlueTableOperations(
-          glue,
-          lockManager,
-          catalogName,
-          awsProperties,
-          tableSpecificCatalogPropertiesBuilder.buildOrThrow(),
-          hadoopConf,
-          tableIdentifier);
+      GlueTableOperations glueTableOperations =
+          new GlueTableOperations(
+              glue,
+              lockManager,
+              catalogName,
+              awsProperties,
+              tableSpecificCatalogPropertiesBuilder.buildOrThrow(),
+              hadoopConf,
+              tableIdentifier);
+      fileIOCloser.put(glueTableOperations, glueTableOperations.io());
+      return glueTableOperations;
     }
 
-    return new GlueTableOperations(
-        glue,
-        lockManager,
-        catalogName,
-        awsProperties,
-        catalogProperties,
-        hadoopConf,
-        tableIdentifier);
+    GlueTableOperations glueTableOperations =
+        new GlueTableOperations(
+            glue,
+            lockManager,
+            catalogName,
+            awsProperties,
+            catalogProperties,
+            hadoopConf,
+            tableIdentifier);
+    fileIOCloser.put(glueTableOperations, glueTableOperations.io());
+    return glueTableOperations;
   }
 
   /**
@@ -608,6 +620,10 @@ public class GlueCatalog extends BaseMetastoreCatalog
   @Override
   public void close() throws IOException {
     closeableGroup.close();
+    if (fileIOCloser != null) {
+      fileIOCloser.invalidateAll();
+      fileIOCloser.cleanUp();
+    }
   }
 
   @Override
@@ -619,4 +635,17 @@ public class GlueCatalog extends BaseMetastoreCatalog
   protected Map<String, String> properties() {
     return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
   }
+
+  private Cache<TableOperations, FileIO> newFileIOCloser() {
+    return Caffeine.newBuilder()
+        .weakKeys()
+        .removalListener(
+            (RemovalListener<TableOperations, FileIO>)
+                (ops, fileIO, cause) -> {
+                  if (null != fileIO) {
+                    fileIO.close();
+                  }
+                })
+        .build();
+  }
 }

Reply via email to