This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new cf02ffac43 AWS, Core, Hive: Extract FileIO closing into separate
FileIOTracker class (#10893)
cf02ffac43 is described below
commit cf02ffac4329141b30bca265cafb9987f64f6cc4
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Aug 14 01:27:32 2024 +0200
AWS, Core, Hive: Extract FileIO closing into separate FileIOTracker class
(#10893)
---
.../org/apache/iceberg/aws/glue/GlueCatalog.java | 31 ++--------
.../java/org/apache/iceberg/io/FileIOTracker.java | 65 +++++++++++++++++++
.../apache/iceberg/rest/RESTSessionCatalog.java | 27 ++------
.../test/java/org/apache/iceberg/TestTables.java | 21 ++++++-
.../org/apache/iceberg/io/TestFileIOTracker.java | 72 ++++++++++++++++++++++
.../java/org/apache/iceberg/hive/HiveCatalog.java | 28 ++-------
6 files changed, 173 insertions(+), 71 deletions(-)
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
index c6b157bb5c..47807a2b9f 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -18,9 +18,6 @@
*/
package org.apache.iceberg.aws.glue;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -51,7 +48,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
-import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOTracker;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
@@ -97,7 +94,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
private LockManager lockManager;
private CloseableGroup closeableGroup;
private Map<String, String> catalogProperties;
- private Cache<TableOperations, FileIO> fileIOCloser;
+ private FileIOTracker fileIOTracker;
// Attempt to set versionId if available on the path
private static final DynMethods.UnboundMethod SET_VERSION_ID =
@@ -194,11 +191,12 @@ public class GlueCatalog extends BaseMetastoreCatalog
this.lockManager = lock;
this.closeableGroup = new CloseableGroup();
+ this.fileIOTracker = new FileIOTracker();
closeableGroup.addCloseable(glue);
closeableGroup.addCloseable(lockManager);
closeableGroup.addCloseable(metricsReporter());
+ closeableGroup.addCloseable(fileIOTracker);
closeableGroup.setSuppressCloseFailure(true);
- this.fileIOCloser = newFileIOCloser();
}
@Override
@@ -243,7 +241,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
tableSpecificCatalogPropertiesBuilder.buildOrThrow(),
hadoopConf,
tableIdentifier);
- fileIOCloser.put(glueTableOperations, glueTableOperations.io());
+ fileIOTracker.track(glueTableOperations);
return glueTableOperations;
}
@@ -256,7 +254,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
catalogProperties,
hadoopConf,
tableIdentifier);
- fileIOCloser.put(glueTableOperations, glueTableOperations.io());
+ fileIOTracker.track(glueTableOperations);
return glueTableOperations;
}
@@ -634,10 +632,6 @@ public class GlueCatalog extends BaseMetastoreCatalog
@Override
public void close() throws IOException {
closeableGroup.close();
- if (fileIOCloser != null) {
- fileIOCloser.invalidateAll();
- fileIOCloser.cleanUp();
- }
}
@Override
@@ -649,17 +643,4 @@ public class GlueCatalog extends BaseMetastoreCatalog
protected Map<String, String> properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}
-
- private Cache<TableOperations, FileIO> newFileIOCloser() {
- return Caffeine.newBuilder()
- .weakKeys()
- .removalListener(
- (RemovalListener<TableOperations, FileIO>)
- (ops, fileIO, cause) -> {
- if (null != fileIO) {
- fileIO.close();
- }
- })
- .build();
- }
}
diff --git a/core/src/main/java/org/apache/iceberg/io/FileIOTracker.java
b/core/src/main/java/org/apache/iceberg/io/FileIOTracker.java
new file mode 100644
index 0000000000..9d8630e79b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/FileIOTracker.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.Closeable;
+import org.apache.iceberg.TableOperations;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Keeps track of the {@link FileIO} instance of the given {@link
TableOperations} instance and
+ * closes the {@link FileIO} when {@link FileIOTracker#close()} gets called
+ */
+public class FileIOTracker implements Closeable {
+ private final Cache<TableOperations, FileIO> tracker;
+
+ public FileIOTracker() {
+ this.tracker =
+ Caffeine.newBuilder()
+ .weakKeys()
+ .removalListener(
+ (RemovalListener<TableOperations, FileIO>)
+ (ops, fileIO, cause) -> {
+ if (null != fileIO) {
+ fileIO.close();
+ }
+ })
+ .build();
+ }
+
+ public void track(TableOperations ops) {
+ Preconditions.checkArgument(null != ops, "Invalid table ops: null");
+ tracker.put(ops, ops.io());
+ }
+
+ @VisibleForTesting
+ Cache<TableOperations, FileIO> tracker() {
+ return tracker;
+ }
+
+ @Override
+ public void close() {
+ tracker.invalidateAll();
+ tracker.cleanUp();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 1c607e3b02..53ce45bb0a 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -47,7 +47,6 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.Transactions;
import org.apache.iceberg.catalog.BaseViewSessionCatalog;
@@ -63,6 +62,7 @@ import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOTracker;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.MetricsReporters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -136,7 +136,7 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
private final BiFunction<SessionContext, Map<String, String>, FileIO>
ioBuilder;
private Cache<String, AuthSession> sessions = null;
private Cache<String, AuthSession> tableSessions = null;
- private Cache<TableOperations, FileIO> fileIOCloser;
+ private FileIOTracker fileIOTracker = null;
private AuthSession catalogAuth = null;
private boolean keepTokenRefreshed = true;
private RESTClient client = null;
@@ -268,10 +268,11 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
this.io = newFileIO(SessionContext.createEmpty(), mergedProps);
- this.fileIOCloser = newFileIOCloser();
+ this.fileIOTracker = new FileIOTracker();
this.closeables = new CloseableGroup();
this.closeables.addCloseable(this.io);
this.closeables.addCloseable(this.client);
+ this.closeables.addCloseable(fileIOTracker);
this.closeables.setSuppressCloseFailure(true);
this.snapshotMode =
@@ -465,7 +466,7 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
private void trackFileIO(RESTTableOperations ops) {
if (io != ops.io()) {
- fileIOCloser.put(ops, ops.io());
+ fileIOTracker.track(ops);
}
}
@@ -641,11 +642,6 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
if (closeables != null) {
closeables.close();
}
-
- if (fileIOCloser != null) {
- fileIOCloser.invalidateAll();
- fileIOCloser.cleanUp();
- }
}
private void shutdownRefreshExecutor() {
@@ -1088,19 +1084,6 @@ public class RESTSessionCatalog extends
BaseViewSessionCatalog
.build();
}
- private Cache<TableOperations, FileIO> newFileIOCloser() {
- return Caffeine.newBuilder()
- .weakKeys()
- .removalListener(
- (RemovalListener<TableOperations, FileIO>)
- (ops, fileIO, cause) -> {
- if (null != fileIO) {
- fileIO.close();
- }
- })
- .build();
- }
-
public void commitTransaction(SessionContext context, List<TableCommit>
commits) {
List<UpdateTableRequest> tableChanges =
Lists.newArrayListWithCapacity(commits.size());
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java
b/core/src/test/java/org/apache/iceberg/TestTables.java
index de05e85c3c..eeff5db8e5 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -216,6 +216,7 @@ public class TestTables {
private final String tableName;
private final File metadata;
+ private final FileIO fileIO;
private TableMetadata current = null;
private long lastSnapshotId = 0;
private int failCommits = 0;
@@ -223,6 +224,22 @@ public class TestTables {
public TestTableOperations(String tableName, File location) {
this.tableName = tableName;
this.metadata = new File(location, "metadata");
+ this.fileIO = new LocalFileIO();
+ metadata.mkdirs();
+ refresh();
+ if (current != null) {
+ for (Snapshot snap : current.snapshots()) {
+ this.lastSnapshotId = Math.max(lastSnapshotId, snap.snapshotId());
+ }
+ } else {
+ this.lastSnapshotId = 0;
+ }
+ }
+
+ public TestTableOperations(String tableName, File location, FileIO fileIO)
{
+ this.tableName = tableName;
+ this.metadata = new File(location, "metadata");
+ this.fileIO = fileIO;
metadata.mkdirs();
refresh();
if (current != null) {
@@ -277,7 +294,7 @@ public class TestTables {
@Override
public FileIO io() {
- return new LocalFileIO();
+ return fileIO;
}
@Override
@@ -300,7 +317,7 @@ public class TestTables {
}
}
- static class LocalFileIO implements FileIO {
+ public static class LocalFileIO implements FileIO {
@Override
public InputFile newInputFile(String path) {
diff --git a/core/src/test/java/org/apache/iceberg/io/TestFileIOTracker.java
b/core/src/test/java/org/apache/iceberg/io/TestFileIOTracker.java
new file mode 100644
index 0000000000..e6225d886c
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/TestFileIOTracker.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.times;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.TestTables;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+
+public class TestFileIOTracker {
+
+ @TempDir private File tableDir;
+
+ @SuppressWarnings("resource")
+ @Test
+ public void nullTableOps() {
+ assertThatThrownBy(() -> new FileIOTracker().track(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid table ops: null");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void fileIOGetsClosed() throws NoSuchFieldException,
IllegalAccessException {
+ FileIOTracker fileIOTracker = new FileIOTracker();
+
+ FileIO firstFileIO = Mockito.spy(new TestTables.LocalFileIO());
+ TestTables.TestTableOperations firstOps =
+ new TestTables.TestTableOperations("x", tableDir, firstFileIO);
+ fileIOTracker.track(firstOps);
+ assertThat(fileIOTracker.tracker().estimatedSize()).isEqualTo(1);
+
+ FileIO secondFileIO = Mockito.spy(new TestTables.LocalFileIO());
+ TestTables.TestTableOperations secondOps =
+ new TestTables.TestTableOperations("y", tableDir, secondFileIO);
+ fileIOTracker.track(secondOps);
+ assertThat(fileIOTracker.tracker().estimatedSize()).isEqualTo(2);
+
+ fileIOTracker.close();
+ Awaitility.await("FileIO gets closed")
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ assertThat(fileIOTracker.tracker().estimatedSize()).isEqualTo(0);
+ Mockito.verify(firstFileIO, times(1)).close();
+ Mockito.verify(secondFileIO, times(1)).close();
+ });
+ }
+}
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 8944cf9394..5c58222f0c 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -18,9 +18,6 @@
*/
package org.apache.iceberg.hive;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -53,6 +50,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOTracker;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -83,7 +81,7 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;
private Map<String, String> catalogProperties;
- private Cache<TableOperations, FileIO> fileIOCloser;
+ private FileIOTracker fileIOTracker;
public HiveCatalog() {}
@@ -116,20 +114,7 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
: CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
this.clients = new CachedClientPool(conf, properties);
- this.fileIOCloser = newFileIOCloser();
- }
-
- private Cache<TableOperations, FileIO> newFileIOCloser() {
- return Caffeine.newBuilder()
- .weakKeys()
- .removalListener(
- (RemovalListener<TableOperations, FileIO>)
- (ops, fileIOInstance, cause) -> {
- if (null != fileIOInstance) {
- fileIOInstance.close();
- }
- })
- .build();
+ this.fileIOTracker = new FileIOTracker();
}
@Override
@@ -533,7 +518,7 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
String tableName = tableIdentifier.name();
HiveTableOperations ops =
new HiveTableOperations(conf, clients, fileIO, name, dbName,
tableName);
- fileIOCloser.put(ops, ops.io());
+ fileIOTracker.track(ops);
return ops;
}
@@ -661,9 +646,8 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
@Override
public void close() throws IOException {
super.close();
- if (fileIOCloser != null) {
- fileIOCloser.invalidateAll();
- fileIOCloser.cleanUp();
+ if (fileIOTracker != null) {
+ fileIOTracker.close();
}
}