This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0474236fdb AWS: Add FileIO tracker/closer to Glue catalog (#8315)
0474236fdb is described below
commit 0474236fdbf447c5246bdbbfb432558d0ab8bdcf
Author: Ashok <[email protected]>
AuthorDate: Tue Aug 22 00:06:51 2023 +0530
AWS: Add FileIO tracker/closer to Glue catalog (#8315)
---
.../org/apache/iceberg/aws/glue/GlueCatalog.java | 61 ++++++++++++++++------
1 file changed, 45 insertions(+), 16 deletions(-)
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
index f96145746c..0338d1da5b 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -18,6 +18,9 @@
*/
package org.apache.iceberg.aws.glue;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
@@ -48,6 +51,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -91,6 +95,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
private LockManager lockManager;
private CloseableGroup closeableGroup;
private Map<String, String> catalogProperties;
+ private Cache<TableOperations, FileIO> fileIOCloser;
// Attempt to set versionId if available on the path
private static final DynMethods.UnboundMethod SET_VERSION_ID =
@@ -183,6 +188,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
closeableGroup.addCloseable(glue);
closeableGroup.addCloseable(lockManager);
closeableGroup.setSuppressCloseFailure(true);
+ this.fileIOCloser = newFileIOCloser();
}
@Override
@@ -217,24 +223,30 @@ public class GlueCatalog extends BaseMetastoreCatalog
// FileIO initialization depends on tableSpecificCatalogProperties, so a
new FileIO is
// initialized each time
- return new GlueTableOperations(
- glue,
- lockManager,
- catalogName,
- awsProperties,
- tableSpecificCatalogPropertiesBuilder.buildOrThrow(),
- hadoopConf,
- tableIdentifier);
+ GlueTableOperations glueTableOperations =
+ new GlueTableOperations(
+ glue,
+ lockManager,
+ catalogName,
+ awsProperties,
+ tableSpecificCatalogPropertiesBuilder.buildOrThrow(),
+ hadoopConf,
+ tableIdentifier);
+ fileIOCloser.put(glueTableOperations, glueTableOperations.io());
+ return glueTableOperations;
}
- return new GlueTableOperations(
- glue,
- lockManager,
- catalogName,
- awsProperties,
- catalogProperties,
- hadoopConf,
- tableIdentifier);
+ GlueTableOperations glueTableOperations =
+ new GlueTableOperations(
+ glue,
+ lockManager,
+ catalogName,
+ awsProperties,
+ catalogProperties,
+ hadoopConf,
+ tableIdentifier);
+ fileIOCloser.put(glueTableOperations, glueTableOperations.io());
+ return glueTableOperations;
}
/**
@@ -608,6 +620,10 @@ public class GlueCatalog extends BaseMetastoreCatalog
@Override
public void close() throws IOException {
closeableGroup.close();
+ if (fileIOCloser != null) {
+ fileIOCloser.invalidateAll();
+ fileIOCloser.cleanUp();
+ }
}
@Override
@@ -619,4 +635,17 @@ public class GlueCatalog extends BaseMetastoreCatalog
protected Map<String, String> properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}
+
+ private Cache<TableOperations, FileIO> newFileIOCloser() {
+ return Caffeine.newBuilder()
+ .weakKeys()
+ .removalListener(
+ (RemovalListener<TableOperations, FileIO>)
+ (ops, fileIO, cause) -> {
+ if (null != fileIO) {
+ fileIO.close();
+ }
+ })
+ .build();
+ }
}