This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 20fe8fea58061f5da7d5c0e7d26755712d02ef79
Author: Riza Suminto <[email protected]>
AuthorDate: Mon Jan 9 14:55:05 2023 -0800

    IMPALA-11658: Implement Iceberg manifest caching config for Impala
    
    Impala needs to supply Iceberg's catalog properties to enable manifest
    caching feature. This commit implements the necessary config reading.
    Iceberg related config is read from hadoop-conf.xml and supplied as a
    Map in catalog instantiation.
    
    Additionally, this patch also replace deprecated RuntimeIOException with
    its superclass, UncheckedIOException.
    
    Testing:
    - Pass core tests.
    - Checked that manifest caching works through debug logging.
    
    Change-Id: I5a60a700d2ae6302dfe395d1ef602e6b1d821888
    Reviewed-on: http://gerrit.cloudera.org:8080/19423
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/iceberg/IcebergCatalog.java     | 14 +++++++++
 .../impala/catalog/iceberg/IcebergCatalogs.java    | 15 ----------
 .../catalog/iceberg/IcebergHadoopCatalog.java      |  9 +++---
 .../catalog/iceberg/IcebergHadoopTables.java       |  5 ++--
 .../impala/catalog/iceberg/IcebergHiveCatalog.java |  4 ++-
 .../java/org/apache/impala/util/IcebergUtil.java   | 34 +++++++++++++++++++++-
 .../common/etc/hadoop/conf/core-site.xml.py        |  7 +++++
 7 files changed, 65 insertions(+), 23 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
index 9dd693d15..1aa239ba2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
@@ -72,4 +72,18 @@ public interface IcebergCatalog {
    * For HadoopCatalog, Iceberg implement 'renameTable' method with Exception 
threw
    */
   void renameTable(FeIcebergTable feTable, TableIdentifier newTableId);
+
+  /**
+   * Some of the implemetation methods might be running on native threads as 
they might
+   * be invoked via JNI. In that case the context class loader for those 
threads are
+   * null. 'Catalogs' uses JNDI to load the catalog implementations, e.g. 
HadoopCatalog
+   * or HiveCatalog. JNDI uses the context class loader, but as it is null it 
falls back
+   * to the bootstrap class loader that doesn't have the Iceberg classes on 
its classpath.
+   * To avoid ClassNotFoundException we set the context class loader to the 
class loader
+   * that loaded this class.
+   */
+  default void setContextClassLoader() {
+    if (Thread.currentThread().getContextClassLoader() != null) return;
+    
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+  }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
index 2c1620a60..488cf348c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
@@ -30,7 +30,6 @@ import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.hadoop.ConfigProperties;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
@@ -160,18 +159,4 @@ public class IcebergCatalogs implements IcebergCatalog {
                            tableProps.get(IcebergTable.ICEBERG_CATALOG));
     return properties;
   }
-
-  /**
-   * Some of the above methods might be running on native threads as they 
might be invoked
-   * via JNI. In that case the context class loader for those threads are 
null. 'Catalogs'
-   * uses JNDI to load the catalog implementations, e.g. HadoopCatalog or 
HiveCatalog.
-   * JNDI uses the context class loader, but as it is null it falls back to 
the bootstrap
-   * class loader that doesn't have the Iceberg classes on its classpath.
-   * To avoid ClassNotFoundException we set the context class loader to the 
class loader
-   * that loaded this class.
-   */
-  private void setContextClassLoader() {
-    if (Thread.currentThread().getContextClassLoader() != null) return;
-    
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-  }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
index 287b004b3..80d542dee 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
@@ -17,16 +17,16 @@
 
 package org.apache.impala.catalog.iceberg;
 
+import java.io.UncheckedIOException;
 import java.lang.NullPointerException;
-import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
@@ -47,8 +47,9 @@ public class IcebergHadoopCatalog implements IcebergCatalog {
   private HadoopCatalog hadoopCatalog;
 
   public IcebergHadoopCatalog(String catalogLocation) {
+    setContextClassLoader();
     hadoopCatalog = new HadoopCatalog();
-    Map<String, String> props = new HashMap<>();
+    Map<String, String> props = IcebergUtil.composeCatalogProperties();
     props.put(CatalogProperties.WAREHOUSE_LOCATION, catalogLocation);
     hadoopCatalog.setConf(FileSystemUtil.getConfiguration());
     hadoopCatalog.initialize("", props);
@@ -86,7 +87,7 @@ public class IcebergHadoopCatalog implements IcebergCatalog {
         return hadoopCatalog.loadTable(tableId);
       } catch (NoSuchTableException e) {
         throw new TableLoadingException(e.getMessage());
-      } catch (NullPointerException | RuntimeIOException e) {
+      } catch (NullPointerException | UncheckedIOException e) {
         if (attempt == MAX_ATTEMPTS - 1) {
           // Throw exception on last attempt.
           throw new TableLoadingException(String.format(
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
index 42017f367..5a99c0b56 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
@@ -17,14 +17,15 @@
 
 package org.apache.impala.catalog.iceberg;
 
+import java.io.UncheckedIOException;
 import java.lang.NullPointerException;
 import java.util.Map;
+
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
@@ -85,7 +86,7 @@ public class IcebergHadoopTables implements IcebergCatalog {
         return hadoopTables.load(tableLocation);
       } catch (NoSuchTableException e) {
         throw new TableLoadingException(e.getMessage());
-      } catch (NullPointerException | RuntimeIOException e) {
+      } catch (NullPointerException | UncheckedIOException e) {
         if (attempt == MAX_ATTEMPTS - 1) {
           // Throw exception on last attempt.
           throw new TableLoadingException(String.format(
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
index 9f7e5b719..b4b5f4b32 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
@@ -51,11 +51,13 @@ public class IcebergHiveCatalog implements IcebergCatalog {
   private HiveCatalog hiveCatalog_;
 
   private IcebergHiveCatalog() {
+    setContextClassLoader();
     HiveConf conf = new HiveConf(IcebergHiveCatalog.class);
     conf.setBoolean(ConfigProperties.ENGINE_HIVE_ENABLED, true);
     hiveCatalog_ = new HiveCatalog();
     hiveCatalog_.setConf(conf);
-    hiveCatalog_.initialize("ImpalaHiveCatalog", new HashMap<>());
+    Map<String, String> properties = IcebergUtil.composeCatalogProperties();
+    hiveCatalog_.initialize("ImpalaHiveCatalog", properties);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java 
b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 72859527e..2c304b00c 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -34,12 +34,14 @@ import java.time.ZoneOffset;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
@@ -56,6 +58,7 @@ import org.apache.iceberg.TableScan;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.transforms.PartitionSpecVisitor;
@@ -79,6 +82,7 @@ import org.apache.impala.catalog.iceberg.IcebergCatalogs;
 import org.apache.impala.catalog.iceberg.IcebergHadoopCatalog;
 import org.apache.impala.catalog.iceberg.IcebergHadoopTables;
 import org.apache.impala.catalog.iceberg.IcebergHiveCatalog;
+import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.fb.FbFileMetadata;
@@ -1033,4 +1037,32 @@ public class IcebergUtil {
       IcebergPartitionSpec spec) {
     return getPartitionTransformType(column, spec) != 
TIcebergPartitionTransformType.VOID;
   }
+
+  /**
+   * Compose Iceberg catalog properties from Hadoop Configuration.
+   */
+  public static Map<String, String> composeCatalogProperties() {
+    Configuration conf = FileSystemUtil.getConfiguration();
+    Map<String, String> props = new HashMap<>();
+    List<String> configKeys = new ArrayList<>(Arrays.asList(
+        CatalogProperties.FILE_IO_IMPL, 
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
+        CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
+        CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES,
+        CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH));
+
+    for (String key : configKeys) {
+      String val = conf.get("iceberg." + key);
+      if (val != null) {
+        props.put(key, val);
+      }
+    }
+
+    if (!props.containsKey(CatalogProperties.FILE_IO_IMPL)) {
+      // Manifest caching only enabled if "io-impl" is specified. Default to 
HadoopFileIO
+      // if non-existent.
+      props.put(CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName());
+    }
+
+    return props;
+  }
 }
diff --git 
a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py 
b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py
index 499cba249..bcb17edfa 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py
@@ -118,6 +118,13 @@ CONFIG = {
    'fs.oss.endpoint': '${OSS_ACCESS_ENDPOINT}',
    'fs.oss.impl': 'org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem',
    'fs.AbstractFileSystem.oss.impl': 'org.apache.hadoop.fs.aliyun.oss.OSS',
+
+   # Manifest caching configuration for Iceberg.
+   'iceberg.io-impl': 'org.apache.iceberg.hadoop.HadoopFileIO',
+   'iceberg.io.manifest.cache-enabled': 'true',
+   'iceberg.io.manifest.cache.expiration-interval-ms': '60000',
+   'iceberg.io.manifest.cache.max-total-bytes': '104857600',
+   'iceberg.io.manifest.cache.max-content-length': '8388608',
 }
 
 if target_filesystem == 's3':

Reply via email to