This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f55e551b IMPALA-12413: Make Iceberg tables created by Trino 
compatible with Impala
0f55e551b is described below

commit 0f55e551bc98843c79a9ec82582ddca237aa4fe9
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Sep 5 18:47:10 2023 +0200

    IMPALA-12413: Make Iceberg tables created by Trino compatible with Impala
    
    Trino creates Iceberg tables without 'engine.hive.enabled'='true'. It
    also doesn't provide a way for users to set this property. Therefore
    Trino always creates Iceberg tables with non-HiveIceberg storage
    descriptors.
    
    Impala uses the Input/Output/SerDe properties to recognize table types.
    This change relaxes this a bit for Iceberg tables, i.e. a table is also
    considered to be an Iceberg table if the table property
    'table_type'='ICEBERG' is set.
    
    During table loading Impala uses an internal HDFS table to load table
    metadata. It currently throws an exception when no proper storage
    descriptor is being set. To workaround this, IcebergTable changes
    the in-memory HMS table's storage descriptor properties to the
    HiveIceberg* properties. Normally, this shouldn't persist to the
    HMS database on read operations. Though it wouldn't harm AFAICT, we
    just want to be on the safe side.
    
    Modifications to the table from Impala goes through its Iceberg
    libary (with 'engine.hive.enabled'='true'), which means we set
    the HiveIceberg storage descriptors. Trino is still compatible with
    such tables.
    
    Testing
     * Manually tested with Trino
     * IMPALA-12422 will add interop tests once we have Trino in the
       minicluster environment
    
    Change-Id: I18ea3858314d70a6131982a4e4d3ca90a95a311a
    Reviewed-on: http://gerrit.cloudera.org:8080/20453
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/catalog/FeIcebergTable.java  | 17 +++++++++++++++++
 .../java/org/apache/impala/catalog/IcebergTable.java    | 17 ++++++++++++++---
 .../apache/impala/catalog/local/LocalIcebergTable.java  |  1 +
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index c55c9f7fa..f23096c40 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -375,6 +375,23 @@ public interface FeIcebergTable extends FeFsTable {
     getTTableStats().setTotal_file_bytes(Utils.calculateFileSizeInBytes(this));
   }
 
+  static void setIcebergStorageDescriptor(
+      org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+    hmsTable.getSd().setInputFormat(HdfsFileFormat.ICEBERG.inputFormat());
+    hmsTable.getSd().setOutputFormat(HdfsFileFormat.ICEBERG.outputFormat());
+    hmsTable.getSd().getSerdeInfo().setSerializationLib(
+        HdfsFileFormat.ICEBERG.serializationLib());
+  }
+
+  static void resetIcebergStorageDescriptor(
+      org.apache.hadoop.hive.metastore.api.Table modifiedTable,
+      org.apache.hadoop.hive.metastore.api.Table originalTable) {
+    
modifiedTable.getSd().setInputFormat(originalTable.getSd().getInputFormat());
+    
modifiedTable.getSd().setOutputFormat(originalTable.getSd().getOutputFormat());
+    modifiedTable.getSd().getSerdeInfo().setSerializationLib(
+        originalTable.getSd().getSerdeInfo().getSerializationLib());
+  }
+
   /**
    * Utility functions
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 2a9f7ade8..f30883788 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -29,8 +29,6 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.impala.analysis.IcebergPartitionField;
@@ -247,7 +245,14 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
   }
 
   public static boolean 
isIcebergTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    return 
isIcebergStorageHandler(msTbl.getParameters().get(KEY_STORAGE_HANDLER));
+    String inputFormat = msTbl.getSd().getInputFormat();
+    HdfsFileFormat hdfsFileFormat = inputFormat != null ?
+        HdfsFileFormat.fromHdfsInputFormatClass(inputFormat, null) :
+        null;
+    return 
isIcebergStorageHandler(msTbl.getParameters().get(KEY_STORAGE_HANDLER)) ||
+        hdfsFileFormat == HdfsFileFormat.ICEBERG ||
+        (hdfsFileFormat == null &&
+         "ICEBERG".equals(msTbl.getParameters().get("table_type")));
   }
 
   @Override
@@ -347,6 +352,10 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
     try {
       // Copy the table to check later if anything has changed.
       msTable_ = msTbl.deepCopy();
+      // Other engines might create Iceberg tables without setting the 
HiveIceberg*
+      // storage descriptors. Impala relies on the storage descriptors being 
set to
+      // certain classes, so we set it here for the in-memory metastore table.
+      FeIcebergTable.setIcebergStorageDescriptor(msTable_);
       setTableStats(msTable_);
       // Load metadata from Iceberg
       final Timer.Context ctxStorageLdTime =
@@ -382,6 +391,8 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
 
       refreshLastUsedTime();
 
+      // Let's reset the storage descriptors, so we don't update the table 
unnecessarily.
+      FeIcebergTable.resetIcebergStorageDescriptor(msTable_, msTbl);
       // Avoid updating HMS if the schema didn't change.
       if (msTable_.equals(msTbl)) return;
 
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 8395a4f8d..78dfbd9f0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -85,6 +85,7 @@ public class LocalIcebergTable extends LocalTable implements 
FeIcebergTable {
     Preconditions.checkNotNull(db);
     Preconditions.checkNotNull(msTable);
     try {
+      FeIcebergTable.setIcebergStorageDescriptor(msTable);
       TableParams tableParams = new TableParams(msTable);
       TPartialTableInfo tableInfo = db.getCatalog().getMetaProvider()
           .loadIcebergTable(ref);

Reply via email to