This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new cf02ffac43 AWS, Core, Hive: Extract FileIO closing into separate 
FileIOTracker class (#10893)
cf02ffac43 is described below

commit cf02ffac4329141b30bca265cafb9987f64f6cc4
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Aug 14 01:27:32 2024 +0200

    AWS, Core, Hive: Extract FileIO closing into separate FileIOTracker class 
(#10893)
---
 .../org/apache/iceberg/aws/glue/GlueCatalog.java   | 31 ++--------
 .../java/org/apache/iceberg/io/FileIOTracker.java  | 65 +++++++++++++++++++
 .../apache/iceberg/rest/RESTSessionCatalog.java    | 27 ++------
 .../test/java/org/apache/iceberg/TestTables.java   | 21 ++++++-
 .../org/apache/iceberg/io/TestFileIOTracker.java   | 72 ++++++++++++++++++++++
 .../java/org/apache/iceberg/hive/HiveCatalog.java  | 28 ++-------
 6 files changed, 173 insertions(+), 71 deletions(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java 
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
index c6b157bb5c..47807a2b9f 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -18,9 +18,6 @@
  */
 package org.apache.iceberg.aws.glue;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.RemovalListener;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +48,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.Configurable;
 import org.apache.iceberg.io.CloseableGroup;
-import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOTracker;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
@@ -97,7 +94,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
   private LockManager lockManager;
   private CloseableGroup closeableGroup;
   private Map<String, String> catalogProperties;
-  private Cache<TableOperations, FileIO> fileIOCloser;
+  private FileIOTracker fileIOTracker;
 
   // Attempt to set versionId if available on the path
   private static final DynMethods.UnboundMethod SET_VERSION_ID =
@@ -194,11 +191,12 @@ public class GlueCatalog extends BaseMetastoreCatalog
     this.lockManager = lock;
 
     this.closeableGroup = new CloseableGroup();
+    this.fileIOTracker = new FileIOTracker();
     closeableGroup.addCloseable(glue);
     closeableGroup.addCloseable(lockManager);
     closeableGroup.addCloseable(metricsReporter());
+    closeableGroup.addCloseable(fileIOTracker);
     closeableGroup.setSuppressCloseFailure(true);
-    this.fileIOCloser = newFileIOCloser();
   }
 
   @Override
@@ -243,7 +241,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
               tableSpecificCatalogPropertiesBuilder.buildOrThrow(),
               hadoopConf,
               tableIdentifier);
-      fileIOCloser.put(glueTableOperations, glueTableOperations.io());
+      fileIOTracker.track(glueTableOperations);
       return glueTableOperations;
     }
 
@@ -256,7 +254,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
             catalogProperties,
             hadoopConf,
             tableIdentifier);
-    fileIOCloser.put(glueTableOperations, glueTableOperations.io());
+    fileIOTracker.track(glueTableOperations);
     return glueTableOperations;
   }
 
@@ -634,10 +632,6 @@ public class GlueCatalog extends BaseMetastoreCatalog
   @Override
   public void close() throws IOException {
     closeableGroup.close();
-    if (fileIOCloser != null) {
-      fileIOCloser.invalidateAll();
-      fileIOCloser.cleanUp();
-    }
   }
 
   @Override
@@ -649,17 +643,4 @@ public class GlueCatalog extends BaseMetastoreCatalog
   protected Map<String, String> properties() {
     return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
   }
-
-  private Cache<TableOperations, FileIO> newFileIOCloser() {
-    return Caffeine.newBuilder()
-        .weakKeys()
-        .removalListener(
-            (RemovalListener<TableOperations, FileIO>)
-                (ops, fileIO, cause) -> {
-                  if (null != fileIO) {
-                    fileIO.close();
-                  }
-                })
-        .build();
-  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/io/FileIOTracker.java 
b/core/src/main/java/org/apache/iceberg/io/FileIOTracker.java
new file mode 100644
index 0000000000..9d8630e79b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/FileIOTracker.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.Closeable;
+import org.apache.iceberg.TableOperations;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Keeps track of the {@link FileIO} instance of the given {@link 
TableOperations} instance and
+ * closes the {@link FileIO} when {@link FileIOTracker#close()} gets called
+ */
+public class FileIOTracker implements Closeable {
+  private final Cache<TableOperations, FileIO> tracker;
+
+  public FileIOTracker() {
+    this.tracker =
+        Caffeine.newBuilder()
+            .weakKeys()
+            .removalListener(
+                (RemovalListener<TableOperations, FileIO>)
+                    (ops, fileIO, cause) -> {
+                      if (null != fileIO) {
+                        fileIO.close();
+                      }
+                    })
+            .build();
+  }
+
+  public void track(TableOperations ops) {
+    Preconditions.checkArgument(null != ops, "Invalid table ops: null");
+    tracker.put(ops, ops.io());
+  }
+
+  @VisibleForTesting
+  Cache<TableOperations, FileIO> tracker() {
+    return tracker;
+  }
+
+  @Override
+  public void close() {
+    tracker.invalidateAll();
+    tracker.cleanUp();
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 1c607e3b02..53ce45bb0a 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -47,7 +47,6 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.Transactions;
 import org.apache.iceberg.catalog.BaseViewSessionCatalog;
@@ -63,6 +62,7 @@ import org.apache.iceberg.exceptions.RESTException;
 import org.apache.iceberg.hadoop.Configurable;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOTracker;
 import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.metrics.MetricsReporters;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -136,7 +136,7 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
   private final BiFunction<SessionContext, Map<String, String>, FileIO> 
ioBuilder;
   private Cache<String, AuthSession> sessions = null;
   private Cache<String, AuthSession> tableSessions = null;
-  private Cache<TableOperations, FileIO> fileIOCloser;
+  private FileIOTracker fileIOTracker = null;
   private AuthSession catalogAuth = null;
   private boolean keepTokenRefreshed = true;
   private RESTClient client = null;
@@ -268,10 +268,11 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
 
     this.io = newFileIO(SessionContext.createEmpty(), mergedProps);
 
-    this.fileIOCloser = newFileIOCloser();
+    this.fileIOTracker = new FileIOTracker();
     this.closeables = new CloseableGroup();
     this.closeables.addCloseable(this.io);
     this.closeables.addCloseable(this.client);
+    this.closeables.addCloseable(fileIOTracker);
     this.closeables.setSuppressCloseFailure(true);
 
     this.snapshotMode =
@@ -465,7 +466,7 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
 
   private void trackFileIO(RESTTableOperations ops) {
     if (io != ops.io()) {
-      fileIOCloser.put(ops, ops.io());
+      fileIOTracker.track(ops);
     }
   }
 
@@ -641,11 +642,6 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
     if (closeables != null) {
       closeables.close();
     }
-
-    if (fileIOCloser != null) {
-      fileIOCloser.invalidateAll();
-      fileIOCloser.cleanUp();
-    }
   }
 
   private void shutdownRefreshExecutor() {
@@ -1088,19 +1084,6 @@ public class RESTSessionCatalog extends 
BaseViewSessionCatalog
         .build();
   }
 
-  private Cache<TableOperations, FileIO> newFileIOCloser() {
-    return Caffeine.newBuilder()
-        .weakKeys()
-        .removalListener(
-            (RemovalListener<TableOperations, FileIO>)
-                (ops, fileIO, cause) -> {
-                  if (null != fileIO) {
-                    fileIO.close();
-                  }
-                })
-        .build();
-  }
-
   public void commitTransaction(SessionContext context, List<TableCommit> 
commits) {
     List<UpdateTableRequest> tableChanges = 
Lists.newArrayListWithCapacity(commits.size());
 
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java 
b/core/src/test/java/org/apache/iceberg/TestTables.java
index de05e85c3c..eeff5db8e5 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -216,6 +216,7 @@ public class TestTables {
 
     private final String tableName;
     private final File metadata;
+    private final FileIO fileIO;
     private TableMetadata current = null;
     private long lastSnapshotId = 0;
     private int failCommits = 0;
@@ -223,6 +224,22 @@ public class TestTables {
     public TestTableOperations(String tableName, File location) {
       this.tableName = tableName;
       this.metadata = new File(location, "metadata");
+      this.fileIO = new LocalFileIO();
+      metadata.mkdirs();
+      refresh();
+      if (current != null) {
+        for (Snapshot snap : current.snapshots()) {
+          this.lastSnapshotId = Math.max(lastSnapshotId, snap.snapshotId());
+        }
+      } else {
+        this.lastSnapshotId = 0;
+      }
+    }
+
+    public TestTableOperations(String tableName, File location, FileIO fileIO) 
{
+      this.tableName = tableName;
+      this.metadata = new File(location, "metadata");
+      this.fileIO = fileIO;
       metadata.mkdirs();
       refresh();
       if (current != null) {
@@ -277,7 +294,7 @@ public class TestTables {
 
     @Override
     public FileIO io() {
-      return new LocalFileIO();
+      return fileIO;
     }
 
     @Override
@@ -300,7 +317,7 @@ public class TestTables {
     }
   }
 
-  static class LocalFileIO implements FileIO {
+  public static class LocalFileIO implements FileIO {
 
     @Override
     public InputFile newInputFile(String path) {
diff --git a/core/src/test/java/org/apache/iceberg/io/TestFileIOTracker.java 
b/core/src/test/java/org/apache/iceberg/io/TestFileIOTracker.java
new file mode 100644
index 0000000000..e6225d886c
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/TestFileIOTracker.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.times;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.TestTables;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+
+public class TestFileIOTracker {
+
+  @TempDir private File tableDir;
+
+  @SuppressWarnings("resource")
+  @Test
+  public void nullTableOps() {
+    assertThatThrownBy(() -> new FileIOTracker().track(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid table ops: null");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void fileIOGetsClosed() throws NoSuchFieldException, 
IllegalAccessException {
+    FileIOTracker fileIOTracker = new FileIOTracker();
+
+    FileIO firstFileIO = Mockito.spy(new TestTables.LocalFileIO());
+    TestTables.TestTableOperations firstOps =
+        new TestTables.TestTableOperations("x", tableDir, firstFileIO);
+    fileIOTracker.track(firstOps);
+    assertThat(fileIOTracker.tracker().estimatedSize()).isEqualTo(1);
+
+    FileIO secondFileIO = Mockito.spy(new TestTables.LocalFileIO());
+    TestTables.TestTableOperations secondOps =
+        new TestTables.TestTableOperations("y", tableDir, secondFileIO);
+    fileIOTracker.track(secondOps);
+    assertThat(fileIOTracker.tracker().estimatedSize()).isEqualTo(2);
+
+    fileIOTracker.close();
+    Awaitility.await("FileIO gets closed")
+        .atMost(5, TimeUnit.SECONDS)
+        .untilAsserted(
+            () -> {
+              assertThat(fileIOTracker.tracker().estimatedSize()).isEqualTo(0);
+              Mockito.verify(firstFileIO, times(1)).close();
+              Mockito.verify(secondFileIO, times(1)).close();
+            });
+  }
+}
diff --git 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 8944cf9394..5c58222f0c 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -18,9 +18,6 @@
  */
 package org.apache.iceberg.hive;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.RemovalListener;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -53,6 +50,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOTracker;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -83,7 +81,7 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
   private ClientPool<IMetaStoreClient, TException> clients;
   private boolean listAllTables = false;
   private Map<String, String> catalogProperties;
-  private Cache<TableOperations, FileIO> fileIOCloser;
+  private FileIOTracker fileIOTracker;
 
   public HiveCatalog() {}
 
@@ -116,20 +114,7 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
             : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
 
     this.clients = new CachedClientPool(conf, properties);
-    this.fileIOCloser = newFileIOCloser();
-  }
-
-  private Cache<TableOperations, FileIO> newFileIOCloser() {
-    return Caffeine.newBuilder()
-        .weakKeys()
-        .removalListener(
-            (RemovalListener<TableOperations, FileIO>)
-                (ops, fileIOInstance, cause) -> {
-                  if (null != fileIOInstance) {
-                    fileIOInstance.close();
-                  }
-                })
-        .build();
+    this.fileIOTracker = new FileIOTracker();
   }
 
   @Override
@@ -533,7 +518,7 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
     String tableName = tableIdentifier.name();
     HiveTableOperations ops =
         new HiveTableOperations(conf, clients, fileIO, name, dbName, 
tableName);
-    fileIOCloser.put(ops, ops.io());
+    fileIOTracker.track(ops);
     return ops;
   }
 
@@ -661,9 +646,8 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
   @Override
   public void close() throws IOException {
     super.close();
-    if (fileIOCloser != null) {
-      fileIOCloser.invalidateAll();
-      fileIOCloser.cleanUp();
+    if (fileIOTracker != null) {
+      fileIOTracker.close();
     }
   }
 

Reply via email to