This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 0f55e551b IMPALA-12413: Make Iceberg tables created by Trino
compatible with Impala
0f55e551b is described below
commit 0f55e551bc98843c79a9ec82582ddca237aa4fe9
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Sep 5 18:47:10 2023 +0200
IMPALA-12413: Make Iceberg tables created by Trino compatible with Impala
Trino creates Iceberg tables without 'engine.hive.enabled'='true'. It
also doesn't provide a way for users to set this property. Therefore
Trino always creates Iceberg tables with non-HiveIceberg storage
descriptors.
Impala uses the Input/Output/SerDe properties to recognize table types.
This change relaxes this a bit for Iceberg tables, i.e. a table is also
considered to be an Iceberg table if the table property
'table_type'='ICEBERG' is set.
During table loading Impala uses an internal HDFS table to load table
metadata. It currently throws an exception when no proper storage
descriptor is being set. To workaround this, IcebergTable changes
the in-memory HMS table's storage descriptor properties to the
HiveIceberg* properties. Normally, this shouldn't persist to the
HMS database on read operations. Though it wouldn't harm AFAICT, we
just want to be on the safe side.
Modifications to the table from Impala goes through its Iceberg
libary (with 'engine.hive.enabled'='true'), which means we set
the HiveIceberg storage descriptors. Trino is still compatible with
such tables.
Testing
* Manually tested with Trino
* IMPALA-12422 will add interop tests once we have Trino in the
minicluster environment
Change-Id: I18ea3858314d70a6131982a4e4d3ca90a95a311a
Reviewed-on: http://gerrit.cloudera.org:8080/20453
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../java/org/apache/impala/catalog/FeIcebergTable.java | 17 +++++++++++++++++
.../java/org/apache/impala/catalog/IcebergTable.java | 17 ++++++++++++++---
.../apache/impala/catalog/local/LocalIcebergTable.java | 1 +
3 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index c55c9f7fa..f23096c40 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -375,6 +375,23 @@ public interface FeIcebergTable extends FeFsTable {
getTTableStats().setTotal_file_bytes(Utils.calculateFileSizeInBytes(this));
}
+ static void setIcebergStorageDescriptor(
+ org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ hmsTable.getSd().setInputFormat(HdfsFileFormat.ICEBERG.inputFormat());
+ hmsTable.getSd().setOutputFormat(HdfsFileFormat.ICEBERG.outputFormat());
+ hmsTable.getSd().getSerdeInfo().setSerializationLib(
+ HdfsFileFormat.ICEBERG.serializationLib());
+ }
+
+ static void resetIcebergStorageDescriptor(
+ org.apache.hadoop.hive.metastore.api.Table modifiedTable,
+ org.apache.hadoop.hive.metastore.api.Table originalTable) {
+
modifiedTable.getSd().setInputFormat(originalTable.getSd().getInputFormat());
+
modifiedTable.getSd().setOutputFormat(originalTable.getSd().getOutputFormat());
+ modifiedTable.getSd().getSerdeInfo().setSerializationLib(
+ originalTable.getSd().getSerdeInfo().getSerializationLib());
+ }
+
/**
* Utility functions
*/
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 2a9f7ade8..f30883788 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -29,8 +29,6 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.impala.analysis.IcebergPartitionField;
@@ -247,7 +245,14 @@ public class IcebergTable extends Table implements
FeIcebergTable {
}
public static boolean
isIcebergTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
- return
isIcebergStorageHandler(msTbl.getParameters().get(KEY_STORAGE_HANDLER));
+ String inputFormat = msTbl.getSd().getInputFormat();
+ HdfsFileFormat hdfsFileFormat = inputFormat != null ?
+ HdfsFileFormat.fromHdfsInputFormatClass(inputFormat, null) :
+ null;
+ return
isIcebergStorageHandler(msTbl.getParameters().get(KEY_STORAGE_HANDLER)) ||
+ hdfsFileFormat == HdfsFileFormat.ICEBERG ||
+ (hdfsFileFormat == null &&
+ "ICEBERG".equals(msTbl.getParameters().get("table_type")));
}
@Override
@@ -347,6 +352,10 @@ public class IcebergTable extends Table implements
FeIcebergTable {
try {
// Copy the table to check later if anything has changed.
msTable_ = msTbl.deepCopy();
+ // Other engines might create Iceberg tables without setting the
HiveIceberg*
+ // storage descriptors. Impala relies on the storage descriptors being
set to
+ // certain classes, so we set it here for the in-memory metastore table.
+ FeIcebergTable.setIcebergStorageDescriptor(msTable_);
setTableStats(msTable_);
// Load metadata from Iceberg
final Timer.Context ctxStorageLdTime =
@@ -382,6 +391,8 @@ public class IcebergTable extends Table implements
FeIcebergTable {
refreshLastUsedTime();
+ // Let's reset the storage descriptors, so we don't update the table
unnecessarily.
+ FeIcebergTable.resetIcebergStorageDescriptor(msTable_, msTbl);
// Avoid updating HMS if the schema didn't change.
if (msTable_.equals(msTbl)) return;
diff --git
a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 8395a4f8d..78dfbd9f0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -85,6 +85,7 @@ public class LocalIcebergTable extends LocalTable implements
FeIcebergTable {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(msTable);
try {
+ FeIcebergTable.setIcebergStorageDescriptor(msTable);
TableParams tableParams = new TableParams(msTable);
TPartialTableInfo tableInfo = db.getCatalog().getMetaProvider()
.loadIcebergTable(ref);