This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2446cee5cf Core: Close the MetricsReporter when Catalog is closed 
(#9353)
2446cee5cf is described below

commit 2446cee5cf0ad93a2be9a68f0b2f7f6fa6edb865
Author: big face cat <[email protected]>
AuthorDate: Thu Jan 18 15:31:40 2024 +0800

    Core: Close the MetricsReporter when Catalog is closed (#9353)
---
 .../org/apache/iceberg/metrics/MetricsReporter.java   |  6 +++++-
 .../apache/iceberg/aws/dynamodb/DynamoDbCatalog.java  |  4 ++--
 .../java/org/apache/iceberg/aws/glue/GlueCatalog.java |  4 ++--
 .../java/org/apache/iceberg/BaseMetastoreCatalog.java | 13 +++++++++++--
 .../java/org/apache/iceberg/hadoop/HadoopCatalog.java |  4 ++--
 .../org/apache/iceberg/inmemory/InMemoryCatalog.java  |  6 ++++++
 .../java/org/apache/iceberg/jdbc/JdbcCatalog.java     | 19 ++++++++++++++++---
 .../java/org/apache/iceberg/dell/ecs/EcsCatalog.java  |  4 ++--
 .../java/org/apache/iceberg/nessie/NessieCatalog.java |  3 ++-
 .../apache/iceberg/snowflake/SnowflakeCatalog.java    |  4 ++--
 10 files changed, 50 insertions(+), 17 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java 
b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
index 365f7f99d6..9958b75ca3 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
@@ -18,11 +18,12 @@
  */
 package org.apache.iceberg.metrics;
 
+import java.io.Closeable;
 import java.util.Map;
 
 /** This interface defines the basic API for reporting metrics for operations 
to a Table. */
 @FunctionalInterface
-public interface MetricsReporter {
+public interface MetricsReporter extends Closeable {
 
   /**
    * A custom MetricsReporter implementation must have a no-arg constructor, 
which will be called
@@ -40,4 +41,7 @@ public interface MetricsReporter {
    * @param report The {@link MetricsReport} to report.
    */
   void report(MetricsReport report);
+
+  @Override
+  default void close() {}
 }
diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java 
b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
index fc1479c3a0..0c991af750 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.aws.dynamodb;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -87,7 +86,7 @@ import 
software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
 
 /** DynamoDB implementation of Iceberg catalog */
 public class DynamoDbCatalog extends BaseMetastoreCatalog
-    implements Closeable, SupportsNamespaces, Configurable {
+    implements SupportsNamespaces, Configurable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbCatalog.class);
   private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
@@ -143,6 +142,7 @@ public class DynamoDbCatalog extends BaseMetastoreCatalog
     this.closeableGroup = new CloseableGroup();
     closeableGroup.addCloseable(dynamo);
     closeableGroup.addCloseable(fileIO);
+    closeableGroup.addCloseable(metricsReporter());
     closeableGroup.setSuppressCloseFailure(true);
 
     ensureCatalogTableExistsOrCreate();
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java 
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
index 6e95379c1d..bdc2452731 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.aws.glue;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.RemovalListener;
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -85,7 +84,7 @@ import software.amazon.awssdk.services.glue.model.TableInput;
 import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
 
 public class GlueCatalog extends BaseMetastoreCatalog
-    implements Closeable, SupportsNamespaces, Configurable<Configuration> {
+    implements SupportsNamespaces, Configurable<Configuration> {
 
   private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);
 
@@ -197,6 +196,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
     this.closeableGroup = new CloseableGroup();
     closeableGroup.addCloseable(glue);
     closeableGroup.addCloseable(lockManager);
+    closeableGroup.addCloseable(metricsReporter());
     closeableGroup.setSuppressCloseFailure(true);
     this.fileIOCloser = newFileIOCloser();
   }
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java 
b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index a683533473..bb7d5a0ffd 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -34,7 +36,7 @@ import org.apache.iceberg.util.PropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class BaseMetastoreCatalog implements Catalog {
+public abstract class BaseMetastoreCatalog implements Catalog, Closeable {
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseMetastoreCatalog.class);
 
   private MetricsReporter metricsReporter;
@@ -305,11 +307,18 @@ public abstract class BaseMetastoreCatalog implements 
Catalog {
     return sb.toString();
   }
 
-  private MetricsReporter metricsReporter() {
+  protected MetricsReporter metricsReporter() {
     if (metricsReporter == null) {
       metricsReporter = CatalogUtil.loadMetricsReporter(properties());
     }
 
     return metricsReporter;
   }
+
+  @Override
+  public void close() throws IOException {
+    if (metricsReporter != null) {
+      metricsReporter.close();
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
index e9ed4dcd28..92ba25af0f 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.hadoop;
 
-import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -78,7 +77,7 @@ import org.slf4j.LoggerFactory;
  * <p>Note: The HadoopCatalog requires that the underlying file system 
supports atomic rename.
  */
 public class HadoopCatalog extends BaseMetastoreCatalog
-    implements Closeable, SupportsNamespaces, Configurable {
+    implements SupportsNamespaces, Configurable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopCatalog.class);
 
@@ -122,6 +121,7 @@ public class HadoopCatalog extends BaseMetastoreCatalog
 
     this.closeableGroup = new CloseableGroup();
     closeableGroup.addCloseable(lockManager);
+    closeableGroup.addCloseable(metricsReporter());
     closeableGroup.setSuppressCloseFailure(true);
 
     this.suppressPermissionError =
diff --git 
a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java 
b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
index 51d242f934..a880f94f43 100644
--- a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
@@ -42,6 +42,7 @@ import 
org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Objects;
@@ -68,6 +69,7 @@ public class InMemoryCatalog extends BaseMetastoreViewCatalog
   private FileIO io;
   private String catalogName;
   private String warehouseLocation;
+  private CloseableGroup closeableGroup;
 
   public InMemoryCatalog() {
     this.namespaces = Maps.newConcurrentMap();
@@ -87,6 +89,9 @@ public class InMemoryCatalog extends BaseMetastoreViewCatalog
     String warehouse = 
properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, "");
     this.warehouseLocation = warehouse.replaceAll("/*$", "");
     this.io = new InMemoryFileIO();
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(metricsReporter());
+    closeableGroup.setSuppressCloseFailure(true);
   }
 
   @Override
@@ -302,6 +307,7 @@ public class InMemoryCatalog extends 
BaseMetastoreViewCatalog
 
   @Override
   public void close() throws IOException {
+    closeableGroup.close();
     namespaces.clear();
     tables.clear();
     views.clear();
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java 
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
index 314595dd02..0bab6ade4c 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
@@ -18,7 +18,8 @@
  */
 package org.apache.iceberg.jdbc;
 
-import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -50,6 +51,7 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -62,7 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class JdbcCatalog extends BaseMetastoreCatalog
-    implements Configurable<Object>, SupportsNamespaces, Closeable {
+    implements Configurable<Object>, SupportsNamespaces {
 
   public static final String PROPERTY_PREFIX = "jdbc.";
   private static final String NAMESPACE_EXISTS_PROPERTY = "exists";
@@ -78,6 +80,7 @@ public class JdbcCatalog extends BaseMetastoreCatalog
   private final Function<Map<String, String>, FileIO> ioBuilder;
   private final Function<Map<String, String>, JdbcClientPool> 
clientPoolBuilder;
   private final boolean initializeCatalogTables;
+  private CloseableGroup closeableGroup;
 
   public JdbcCatalog() {
     this(null, null, true);
@@ -140,6 +143,10 @@ public class JdbcCatalog extends BaseMetastoreCatalog
       Thread.currentThread().interrupt();
       throw new UncheckedInterruptedException(e, "Interrupted in call to 
initialize");
     }
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(metricsReporter());
+    closeableGroup.addCloseable(connections);
+    closeableGroup.setSuppressCloseFailure(true);
   }
 
   private void initializeCatalogTables() throws InterruptedException, 
SQLException {
@@ -482,7 +489,13 @@ public class JdbcCatalog extends BaseMetastoreCatalog
 
   @Override
   public void close() {
-    connections.close();
+    if (closeableGroup != null) {
+      try {
+        closeableGroup.close();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
   }
 
   @Override
diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java 
b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
index f951c8c937..07ad683658 100644
--- a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
+++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
@@ -26,7 +26,6 @@ import com.emc.object.s3.bean.ListObjectsResult;
 import com.emc.object.s3.bean.S3Object;
 import com.emc.object.s3.request.ListObjectsRequest;
 import com.emc.object.s3.request.PutObjectRequest;
-import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UncheckedIOException;
@@ -64,7 +63,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class EcsCatalog extends BaseMetastoreCatalog
-    implements Closeable, SupportsNamespaces, Configurable<Object> {
+    implements SupportsNamespaces, Configurable<Object> {
 
   /** Suffix of table metadata object */
   private static final String TABLE_OBJECT_SUFFIX = ".table";
@@ -111,6 +110,7 @@ public class EcsCatalog extends BaseMetastoreCatalog
     this.closeableGroup = new CloseableGroup();
     closeableGroup.addCloseable(client::destroy);
     closeableGroup.addCloseable(fileIO);
+    closeableGroup.addCloseable(metricsReporter());
     closeableGroup.setSuppressCloseFailure(true);
   }
 
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java 
b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
index 6a877893ef..cce6fcf144 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
 
 /** Nessie implementation of Iceberg Catalog. */
 public class NessieCatalog extends BaseMetastoreViewCatalog
-    implements AutoCloseable, SupportsNamespaces, Configurable<Object> {
+    implements SupportsNamespaces, Configurable<Object> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(NessieCatalog.class);
   private static final Joiner SLASH = Joiner.on("/");
@@ -176,6 +176,7 @@ public class NessieCatalog extends BaseMetastoreViewCatalog
     this.closeableGroup = new CloseableGroup();
     closeableGroup.addCloseable(client);
     closeableGroup.addCloseable(fileIO);
+    closeableGroup.addCloseable(metricsReporter());
     closeableGroup.setSuppressCloseFailure(true);
   }
 
diff --git 
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java 
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
index dd20c8ded9..06dacad185 100644
--- a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
+++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.snowflake;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -45,7 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SnowflakeCatalog extends BaseMetastoreCatalog
-    implements Closeable, SupportsNamespaces, Configurable<Object> {
+    implements SupportsNamespaces, Configurable<Object> {
   private static final String DEFAULT_CATALOG_NAME = "snowflake_catalog";
   private static final String DEFAULT_FILE_IO_IMPL = 
"org.apache.iceberg.io.ResolvingFileIO";
   // Specifies the name of a Snowflake's partner application to connect 
through JDBC.
@@ -157,6 +156,7 @@ public class SnowflakeCatalog extends BaseMetastoreCatalog
     this.catalogProperties = properties;
     this.closeableGroup = new CloseableGroup();
     closeableGroup.addCloseable(snowflakeClient);
+    closeableGroup.addCloseable(metricsReporter());
     closeableGroup.setSuppressCloseFailure(true);
   }
 

Reply via email to