This is an automated email from the ASF dual-hosted git repository. bikram pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 61964882d152edd0d369c9a912a52d5c982f3523 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Fri Sep 17 15:44:42 2021 +0200 IMPALA-10914: Consistently schedule scan ranges for Iceberg tables Before this patch Impala inconsistently scheduled scan ranges for Iceberg tables on HDFS, in local catalog mode. It did so because LocalIcebergTable reloaded all the files descriptors, and the HDFS block locations were not consistent across the reloads. Impala's scheduler uses the block location list for scan range assignment, hence the assignments were inconsistent between queries. This has a negative effect on caching and hence hit performance quite badly. It is redundant and expensive to reload file descriptors for each query in local catalog mode. This patch extends the GetPartialInfo() RPC with Iceberg-specific snapshot information. It means that the coordinator is now able to fetch Iceberg data file descriptors from the CatalogD. This way scan range assignment becomes consistent because we reuse the same file descriptors with the same block location information. Fixing the above revealed another bug. Before this patch we didn't handle self-events of Iceberg tables. When an Iceberg table is stored in the HiveCatalog it means that Iceberg will update the HMS table on modifications because it needs to update table property 'metadata_location' (this points to the new snapshot file). Then Catalogd processes these modifications again when they arrive via the event notification mechanism. I fixed this by creating Iceberg transactions in which I set the catalog service ID and new catalog version for the Iceberg table. Since we are using transactions now Iceberg has to embed all table modifications in a single ALTER TABLE request to HMS, and detect the corresponding alter event later via the aforementioned catalog service ID and version. Testing: * added e2e test for the scan range assignment * added e2e test for detecting self-events Change-Id: Ibb8216b37d350469b573dad7fcefdd0ee0599ed5 Reviewed-on: http://gerrit.cloudera.org:8080/17857 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Qifan Chen <[email protected]> --- common/thrift/CatalogService.thrift | 12 ++ .../impala/catalog/CatalogServiceCatalog.java | 5 +- .../org/apache/impala/catalog/FeIcebergTable.java | 38 ++++- .../org/apache/impala/catalog/IcebergTable.java | 29 ++-- .../main/java/org/apache/impala/catalog/Table.java | 1 + .../impala/catalog/iceberg/IcebergCatalogs.java | 25 +-- .../impala/catalog/iceberg/IcebergCtasTarget.java | 16 +- .../impala/catalog/iceberg/IcebergHiveCatalog.java | 1 + .../impala/catalog/local/CatalogdMetaProvider.java | 11 ++ .../impala/catalog/local/DirectMetaProvider.java | 8 + .../impala/catalog/local/LocalIcebergTable.java | 24 +-- .../apache/impala/catalog/local/MetaProvider.java | 6 + .../apache/impala/service/CatalogOpExecutor.java | 168 +++++++++++++++------ .../impala/service/IcebergCatalogOpExecutor.java | 105 ++++++++----- .../java/org/apache/impala/util/IcebergUtil.java | 8 +- .../queries/QueryTest/iceberg-catalogs.test | 2 - .../queries/QueryTest/show-create-table.test | 2 +- tests/custom_cluster/test_events_custom_configs.py | 58 +++++++ tests/metadata/test_show_create_table.py | 4 +- tests/query_test/test_iceberg.py | 23 +++ tests/stress/test_insert_stress.py | 26 +++- 21 files changed, 432 insertions(+), 140 deletions(-) diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 058ab8a..1b6ab30 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -389,6 +389,10 @@ struct TTableInfoSelector { // Note that want_hms_partition=true will consume more space (IMPALA-7501), so only use // it in cases the clients do need HMS partition structs. 12: bool want_hms_partition + + // The response should contain information about the Iceberg snapshot, i.e. the snapshot + // id and the file descriptors. + 13: bool want_iceberg_snapshot } // Returned information about a particular partition. @@ -435,6 +439,11 @@ struct TPartialPartitionInfo { 13: optional CatalogObjects.THdfsPartitionLocation location } +struct TIcebergSnapshot { + 1: required i64 snapshot_id + 2: optional map<string, CatalogObjects.THdfsFileDesc> iceberg_file_desc_map +} + // Returned information about a Table, as selected by TTableInfoSelector. struct TPartialTableInfo { 1: optional hive_metastore.Table hms_table @@ -474,6 +483,9 @@ struct TPartialTableInfo { // The prefixes of locations of partitions in this table. See THdfsPartitionLocation for // the description of how a prefix is computed. 11: optional list<string> partition_prefixes + + // Iceberg snapshot information + 12: optional TIcebergSnapshot iceberg_snapshot } struct TBriefTableMeta { diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 34eea1c..942698c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -3363,9 +3363,8 @@ public class CatalogServiceCatalog extends Catalog { TGetPartialCatalogObjectResponse resp; table.takeReadLock(); try { - if (table instanceof HdfsTable || table instanceof IcebergTable) { - HdfsTable hdfsTable = table instanceof HdfsTable ? (HdfsTable) table : - ((IcebergTable) table).getHdfsTable(); + if (table instanceof HdfsTable) { + HdfsTable hdfsTable = (HdfsTable)table; missingPartialInfos = Maps.newHashMap(); resp = hdfsTable.getPartialInfo(req, missingPartialInfos); if (missingPartialInfos.isEmpty()) return resp; diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java index bb06002..c25664d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java @@ -39,16 +39,19 @@ import org.apache.iceberg.TableMetadata; import org.apache.impala.analysis.IcebergPartitionField; import org.apache.impala.analysis.IcebergPartitionSpec; import org.apache.impala.analysis.LiteralExpr; +import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.Reference; import org.apache.impala.compat.HdfsShim; import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TCompressionCodec; import org.apache.impala.thrift.THdfsCompression; +import org.apache.impala.thrift.THdfsFileDesc; import org.apache.impala.thrift.THdfsTable; import org.apache.impala.thrift.THdfsPartition; import org.apache.impala.thrift.TIcebergCatalog; import org.apache.impala.thrift.TIcebergFileFormat; +import org.apache.impala.thrift.TIcebergSnapshot; import org.apache.impala.thrift.TIcebergTable; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TResultSet; @@ -382,11 +385,9 @@ public interface FeIcebergTable extends FeFsTable { tIcebergTable.setDefault_partition_spec_id( icebergTable.getDefaultPartitionSpecId()); - for (Map.Entry<String, HdfsPartition.FileDescriptor> entry : - icebergTable.getPathHashToFileDescMap().entrySet()) { - tIcebergTable.putToPath_hash_to_file_descriptor(entry.getKey(), - entry.getValue().toThrift()); - } + tIcebergTable.setPath_hash_to_file_descriptor( + convertPathHashToFileDescMap(icebergTable)); + tIcebergTable.setSnapshot_id(icebergTable.snapshotId()); tIcebergTable.setParquet_compression_codec( icebergTable.getIcebergParquetCompressionCodec()); @@ -399,6 +400,33 @@ public interface FeIcebergTable extends FeFsTable { return tIcebergTable; } + public static Map<String, THdfsFileDesc> convertPathHashToFileDescMap( + FeIcebergTable icebergTable) { + Map<String, THdfsFileDesc> ret = new HashMap<>(); + for (Map.Entry<String, HdfsPartition.FileDescriptor> entry : + icebergTable.getPathHashToFileDescMap().entrySet()) { + ret.put(entry.getKey(), entry.getValue().toThrift()); + } + return ret; + } + + public static Map<String, FileDescriptor> loadFileDescMapFromThrift( + Map<String, THdfsFileDesc> tFileDescMap) { + Map<String, FileDescriptor> fileDescMap = new HashMap<>(); + if (tFileDescMap == null) return fileDescMap; + for (Map.Entry<String, THdfsFileDesc> entry : tFileDescMap.entrySet()) { + fileDescMap.put(entry.getKey(), FileDescriptor.fromThrift(entry.getValue())); + } + return fileDescMap; + } + + public static TIcebergSnapshot createTIcebergSnapshot(FeIcebergTable icebergTable) { + TIcebergSnapshot snapshot = new TIcebergSnapshot(); + snapshot.setSnapshot_id(icebergTable.snapshotId()); + snapshot.setIceberg_file_desc_map(convertPathHashToFileDescMap(icebergTable)); + return snapshot; + } + /** * Get FileDescriptor by data file location */ diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index 851cd5c..91af691 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -36,6 +36,8 @@ import org.apache.impala.analysis.IcebergPartitionTransform; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TCompressionCodec; +import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; +import org.apache.impala.thrift.TGetPartialCatalogObjectResponse; import org.apache.impala.thrift.THdfsCompression; import org.apache.impala.thrift.THdfsFileDesc; import org.apache.impala.thrift.THdfsTable; @@ -44,6 +46,7 @@ import org.apache.impala.thrift.TIcebergFileFormat; import org.apache.impala.thrift.TIcebergPartitionField; import org.apache.impala.thrift.TIcebergPartitionSpec; import org.apache.impala.thrift.TIcebergTable; +import org.apache.impala.thrift.TPartialPartitionInfo; import org.apache.impala.thrift.TTable; import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTableType; @@ -415,7 +418,7 @@ public class IcebergTable extends Table implements FeIcebergTable { icebergParquetDictPageSize_ = ticeberg.getParquet_dict_page_size(); partitionSpecs_ = loadPartitionBySpecsFromThrift(ticeberg.getPartition_spec()); defaultPartitionSpecId_ = ticeberg.getDefault_partition_spec_id(); - pathHashToFileDescMap_ = loadFileDescFromThrift( + pathHashToFileDescMap_ = FeIcebergTable.Utils.loadFileDescMapFromThrift( ticeberg.getPath_hash_to_file_descriptor()); snapshotId_ = ticeberg.getSnapshot_id(); hdfsTable_.loadFromThrift(thriftTable); @@ -450,16 +453,6 @@ public class IcebergTable extends Table implements FeIcebergTable { return ret; } - private Map<String, FileDescriptor> loadFileDescFromThrift( - Map<String, THdfsFileDesc> tFileDescMap) { - Map<String, FileDescriptor> fileDescMap = new HashMap<>(); - if (tFileDescMap == null) return fileDescMap; - for (Map.Entry<String, THdfsFileDesc> entry : tFileDescMap.entrySet()) { - fileDescMap.put(entry.getKey(), FileDescriptor.fromThrift(entry.getValue())); - } - return fileDescMap; - } - @Override public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) { @@ -479,4 +472,18 @@ public class IcebergTable extends Table implements FeIcebergTable { } return hdfsTable; } + + @Override + public TGetPartialCatalogObjectResponse getPartialInfo( + TGetPartialCatalogObjectRequest req) throws CatalogException { + Preconditions.checkState(isLoaded(), "unloaded table: %s", getFullName()); + Map<HdfsPartition, TPartialPartitionInfo> missingPartialInfos = new HashMap<>(); + TGetPartialCatalogObjectResponse resp = + getHdfsTable().getPartialInfo(req, missingPartialInfos); + if (req.table_info_selector.want_iceberg_snapshot) { + resp.table_info.setIceberg_snapshot( + FeIcebergTable.Utils.createTIcebergSnapshot(this)); + } + return resp; + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 0b6db14..e1524d6 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -707,6 +707,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { } return resp; } + /** * @see FeCatalogUtils#parseColumnType(FieldSchema, String) */ diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java index 56133dd..2c1620a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java @@ -66,8 +66,8 @@ public class IcebergCatalogs implements IcebergCatalog { } public TIcebergCatalog getUnderlyingCatalogType(String catalogName) { - String catalogType = configuration_.get(catalogPropertyConfigKey( - catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE)); + String catalogType = getCatalogProperty( + catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE); if (catalogType == null || CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType)) { return TIcebergCatalog.HIVE_CATALOG; @@ -91,8 +91,7 @@ public class IcebergCatalogs implements IcebergCatalog { setContextClassLoader(); String catName = tableProps.get(IcebergTable.ICEBERG_CATALOG); Preconditions.checkState(catName != null); - String catalogType = configuration_.get(catalogPropertyConfigKey( - catName, CatalogUtil.ICEBERG_CATALOG_TYPE)); + String catalogType = getCatalogProperty(catName, CatalogUtil.ICEBERG_CATALOG_TYPE); if (catalogType == null) { throw new ImpalaRuntimeException( String.format("Unknown catalog name: %s", catName)); @@ -101,6 +100,7 @@ public class IcebergCatalogs implements IcebergCatalog { properties.setProperty(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema)); properties.setProperty(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec)); + properties.setProperty("external.table.purge", "TRUE"); return Catalogs.createTable(configuration_, properties); } @@ -138,6 +138,15 @@ public class IcebergCatalogs implements IcebergCatalog { "Cannot rename Iceberg tables that use 'Catalogs'."); } + /** + * Returns the value of 'catalogPropertyKey' for the given catalog. + */ + public String getCatalogProperty(String catalogName, String catalogPropertyKey) { + String propKey = String.format("%s%s.%s", InputFormatConfig.CATALOG_CONFIG_PREFIX, + catalogName, catalogPropertyKey); + return configuration_.get(propKey); + } + private Properties createPropsForCatalogs(TableIdentifier tableId, String location, Map<String, String> tableProps) { Properties properties = new Properties(); @@ -147,15 +156,11 @@ public class IcebergCatalogs implements IcebergCatalog { } else if (location != null) { properties.setProperty(Catalogs.LOCATION, location); } + properties.setProperty(IcebergTable.ICEBERG_CATALOG, + tableProps.get(IcebergTable.ICEBERG_CATALOG)); return properties; } - private static String catalogPropertyConfigKey(String catalogName, - String catalogProperty) { - return String.format("%s%s.%s", InputFormatConfig.CATALOG_CONFIG_PREFIX, - catalogName, catalogProperty); - } - /** * Some of the above methods might be running on native threads as they might be invoked * via JNI. In that case the context class loader for those threads are null. 'Catalogs' diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java index 5e122e9..e0c986f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java @@ -26,6 +26,7 @@ import java.util.Set; import com.google.common.base.Preconditions; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Namespace; @@ -46,6 +47,7 @@ import org.apache.impala.catalog.HdfsStorageDescriptor; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.IcebergColumn; import org.apache.impala.catalog.IcebergStructField; +import org.apache.impala.catalog.IcebergTable; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.local.LocalDb; import org.apache.impala.catalog.local.LocalFsTable; @@ -143,8 +145,15 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable private void setLocations() { Preconditions.checkState(msTable_ != null); Preconditions.checkState(icebergCatalog_ != null); - if (icebergCatalog_ == TIcebergCatalog.HADOOP_CATALOG) { - icebergCatalogLocation_ = IcebergUtil.getIcebergCatalogLocation(msTable_); + TIcebergCatalog underlyingCatalog = IcebergUtil.getUnderlyingCatalog(msTable_); + if (underlyingCatalog == TIcebergCatalog.HADOOP_CATALOG) { + if (icebergCatalog_ == TIcebergCatalog.CATALOGS) { + String catName = msTable_.getParameters().get(IcebergTable.ICEBERG_CATALOG); + icebergCatalogLocation_ = IcebergCatalogs.getInstance().getCatalogProperty( + catName, CatalogProperties.WAREHOUSE_LOCATION); + } else { + icebergCatalogLocation_ = IcebergUtil.getIcebergCatalogLocation(msTable_); + } TableIdentifier tId = IcebergUtil.getIcebergTableIdentifier(msTable_); Namespace ns = tId.namespace(); List<String> components = new ArrayList<>(); @@ -155,7 +164,8 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable return; } Preconditions.checkState(icebergCatalog_ == TIcebergCatalog.HADOOP_TABLES || - icebergCatalog_ == TIcebergCatalog.HIVE_CATALOG); + icebergCatalog_ == TIcebergCatalog.HIVE_CATALOG || + icebergCatalog_ == TIcebergCatalog.CATALOGS); icebergTableLocation_ = msTable_.getSd().getLocation(); icebergCatalogLocation_ = icebergTableLocation_; } diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java index 3841277..d115def 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java @@ -62,6 +62,7 @@ public class IcebergHiveCatalog implements IcebergCatalog { PartitionSpec spec, String location, Map<String, String> properties) { + properties.put("external.table.purge", "TRUE"); return hiveCatalog_.createTable(identifier, schema, spec, location, properties); } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index af198ec..5531e31 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -80,6 +80,7 @@ import org.apache.impala.thrift.TFunctionName; import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; import org.apache.impala.thrift.TGetPartialCatalogObjectResponse; import org.apache.impala.thrift.THdfsFileDesc; +import org.apache.impala.thrift.TIcebergSnapshot; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TPartialPartitionInfo; import org.apache.impala.thrift.TTable; @@ -1001,6 +1002,16 @@ public class CatalogdMetaProvider implements MetaProvider { return ret; } + @Override + public TIcebergSnapshot loadIcebergSnapshot(final TableMetaRef table) + throws TException { + Preconditions.checkArgument(table instanceof TableMetaRefImpl); + TGetPartialCatalogObjectRequest req = newReqForTable(table); + req.table_info_selector.want_iceberg_snapshot = true; + TGetPartialCatalogObjectResponse resp = sendRequest(req); + return resp.table_info.getIceberg_snapshot(); + } + private ImmutableList<FileDescriptor> convertThriftFdList(List<THdfsFileDesc> thriftFds, List<TNetworkAddress> networkAddresses, ListMap<TNetworkAddress> hostIndex) { List<FileDescriptor> fds = Lists.newArrayListWithCapacity(thriftFds.size()); diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java index 0745f56..001723f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java @@ -51,6 +51,7 @@ import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TBackendGflags; import org.apache.impala.thrift.TBriefTableMeta; +import org.apache.impala.thrift.TIcebergSnapshot; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TValidWriteIdList; import org.apache.impala.util.ListMap; @@ -498,4 +499,11 @@ class DirectMetaProvider implements MetaProvider { throw new NotImplementedException( "getValidWriteIdList() is not implemented for DirectMetaProvider"); } + + @Override + public TIcebergSnapshot loadIcebergSnapshot(final TableMetaRef table) + throws TException { + throw new NotImplementedException( + "loadIcebergSnapshot() is not implemented for DirectMetaProvider"); + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java index 4db65d6..4c4a49a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java @@ -42,10 +42,12 @@ import org.apache.impala.thrift.THdfsPartition; import org.apache.impala.thrift.THdfsTable; import org.apache.impala.thrift.TIcebergCatalog; import org.apache.impala.thrift.TIcebergFileFormat; +import org.apache.impala.thrift.TIcebergSnapshot; import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTableType; import org.apache.impala.util.IcebergSchemaConverter; import org.apache.impala.util.IcebergUtil; +import org.apache.thrift.TException; import com.google.common.base.Preconditions; import com.google.errorprone.annotations.Immutable; @@ -98,21 +100,21 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable { ColumnMap cmap, TableMetadata metadata) throws TableLoadingException { super(db, msTable, ref, cmap); - tableParams_ = new TableParams(msTable); - partitionSpecs_ = Utils.loadPartitionSpecByIceberg(metadata); - defaultPartitionSpecId_ = metadata.defaultSpecId(); localFsTable_ = LocalFsTable.load(db, msTable, ref); - if (metadata.currentSnapshot() != null) { - snapshotId_ = metadata.currentSnapshot().snapshotId(); - } - icebergSchema_ = metadata.schema(); + tableParams_ = new TableParams(msTable); + TIcebergSnapshot tSnapshot; try { - pathHashToFileDescMap_ = Utils.loadAllPartition(this); - } catch (IOException e) { + tSnapshot = db_.getCatalog().getMetaProvider().loadIcebergSnapshot(ref); + } catch (TException e) { throw new TableLoadingException(String.format( - "Failed to load table: %s.%s", msTable.getDbName(), msTable.getTableName()), - (Exception)e); + "Failed to load table: %s.%s", msTable.getDbName(), msTable.getTableName()), e); } + snapshotId_ = tSnapshot.getSnapshot_id(); + partitionSpecs_ = Utils.loadPartitionSpecByIceberg(metadata); + defaultPartitionSpecId_ = metadata.defaultSpecId(); + icebergSchema_ = metadata.schema(); + pathHashToFileDescMap_ = FeIcebergTable.Utils.loadFileDescMapFromThrift( + tSnapshot.getIceberg_file_desc_map()); icebergFileFormat_ = IcebergUtil.getIcebergFileFormat(msTable); icebergParquetCompressionCodec_ = Utils.getIcebergParquetCompressionCodec(msTable); icebergParquetRowGroupSize_ = Utils.getIcebergParquetRowGroupSize(msTable); diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java index fe94f51..146a6dd 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java @@ -36,6 +36,7 @@ import org.apache.impala.catalog.HdfsStorageDescriptor; import org.apache.impala.catalog.SqlConstraints; import org.apache.impala.common.Pair; import org.apache.impala.thrift.TBriefTableMeta; +import org.apache.impala.thrift.TIcebergSnapshot; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TValidWriteIdList; import org.apache.impala.util.ListMap; @@ -124,6 +125,11 @@ public interface MetaProvider { List<String> colNames) throws TException; /** + * Loads Iceberg snapshot information, i.e. snapshot id and file descriptors. + */ + public TIcebergSnapshot loadIcebergSnapshot(final TableMetaRef table) throws TException; + + /** * Reference to a table as returned by loadTable(). This reference must be passed * back to other functions to fetch more details about the table. Implementations * may use this reference to store internal information such as version numbers diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 5cc4bfd..5728752 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -31,6 +31,7 @@ import com.google.common.collect.Sets; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -213,6 +214,7 @@ import org.apache.impala.thrift.TSortingOrder; import org.apache.impala.thrift.TStatus; import org.apache.impala.thrift.TTable; import org.apache.impala.thrift.TTableName; +import org.apache.impala.thrift.TTablePropertyType; import org.apache.impala.thrift.TTableRowFormat; import org.apache.impala.thrift.TTableStats; import org.apache.impala.thrift.TTestCaseData; @@ -946,9 +948,9 @@ public class CatalogOpExecutor { return; } else if (tbl instanceof IcebergTable && altersIcebergTable(params.getAlter_type())) { - alterIcebergTable(params, response, (IcebergTable) tbl, newCatalogVersion, - wantMinimalResult); - return; + boolean needToUpdateHms = alterIcebergTable(params, response, (IcebergTable)tbl, + newCatalogVersion, wantMinimalResult); + if (!needToUpdateHms) return; } switch (params.getAlter_type()) { case ADD_COLUMNS: @@ -1216,41 +1218,59 @@ public class CatalogOpExecutor { || type == TAlterTableType.REPLACE_COLUMNS || type == TAlterTableType.DROP_COLUMN || type == TAlterTableType.ALTER_COLUMN - || type == TAlterTableType.SET_PARTITION_SPEC; + || type == TAlterTableType.SET_PARTITION_SPEC + || type == TAlterTableType.SET_TBL_PROPERTIES + || type == TAlterTableType.UNSET_TBL_PROPERTIES; } /** - * Executes the ALTER TABLE command for a Iceberg table and reloads its metadata. + * Executes the ALTER TABLE command for an Iceberg table and reloads its metadata. */ - private void alterIcebergTable(TAlterTableParams params, TDdlExecResponse response, + private boolean alterIcebergTable(TAlterTableParams params, TDdlExecResponse response, IcebergTable tbl, long newCatalogVersion, boolean wantMinimalResult) throws ImpalaException { Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); + boolean needsToUpdateHms = !isIcebergHmsIntegrationEnabled(tbl.getMetaStoreTable()); try { + boolean needsTxn = true; + org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(tbl); switch (params.getAlter_type()) { case ADD_COLUMNS: TAlterTableAddColsParams addColParams = params.getAdd_cols_params(); - IcebergCatalogOpExecutor.addColumn(tbl, addColParams.getColumns()); + IcebergCatalogOpExecutor.addColumns(iceTxn, addColParams.getColumns()); addSummary(response, "Column(s) have been added."); break; case DROP_COLUMN: TAlterTableDropColParams dropColParams = params.getDrop_col_params(); - IcebergCatalogOpExecutor.dropColumn(tbl, dropColParams.getCol_name()); + IcebergCatalogOpExecutor.dropColumn(iceTxn, dropColParams.getCol_name()); addSummary(response, "Column has been dropped."); break; case ALTER_COLUMN: TAlterTableAlterColParams alterColParams = params.getAlter_col_params(); - IcebergCatalogOpExecutor.alterColumn(tbl, alterColParams.getCol_name(), - alterColParams.getNew_col_def()); + IcebergCatalogOpExecutor.alterColumn(iceTxn, alterColParams.getCol_name(), + alterColParams.getNew_col_def()); addSummary(response, "Column has been altered."); break; case SET_PARTITION_SPEC: + // Set partition spec uses 'TableOperations', not transactions. + needsTxn = false; + // Partition spec is not stored in HMS. + needsToUpdateHms = false; TAlterTableSetPartitionSpecParams setPartSpecParams = params.getSet_partition_spec_params(); IcebergCatalogOpExecutor.alterTableSetPartitionSpec(tbl, - setPartSpecParams.getPartition_spec()); + setPartSpecParams.getPartition_spec(), + catalog_.getCatalogServiceId(), newCatalogVersion); addSummary(response, "Updated partition spec."); break; + case SET_TBL_PROPERTIES: + needsToUpdateHms |= !setIcebergTblProperties(tbl, params, iceTxn); + addSummary(response, "Updated table."); + break; + case UNSET_TBL_PROPERTIES: + needsToUpdateHms |= !unsetIcebergTblProperties(tbl, params, iceTxn); + addSummary(response, "Updated table."); + break; case REPLACE_COLUMNS: // It doesn't make sense to replace all the columns of an Iceberg table as it // would basically make all existing data unaccessible. @@ -1259,15 +1279,57 @@ public class CatalogOpExecutor { "Unsupported ALTER TABLE operation for Iceberg tables: " + params.getAlter_type()); } + if (needsTxn) { + if (!needsToUpdateHms) { + IcebergCatalogOpExecutor.addCatalogVersionToTxn(iceTxn, + catalog_.getCatalogServiceId(), newCatalogVersion); + } + iceTxn.commitTransaction(); + } } catch (IllegalArgumentException ex) { throw new ImpalaRuntimeException(String.format( "Failed to ALTER table '%s': %s", params.getTable_name().table_name, ex.getMessage())); } - loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg TABLE " + - params.getAlter_type().name()); - addTableToCatalogUpdate(tbl, wantMinimalResult, response.result); + if (!needsToUpdateHms) { + // We don't need to update HMS because either it is already done by Iceberg's + // HiveCatalog, or we modified the PARTITION SPEC which is not stored in HMS. + loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg TABLE " + + params.getAlter_type().name()); + catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion); + addTableToCatalogUpdate(tbl, wantMinimalResult, response.result); + return false; + } + return true; + } + + /** + * Sets table properties for an Iceberg table. Returns true on success, returns false + * if the operation is not applicable at the Iceberg table level, e.g. setting SERDE + * properties. + */ + private boolean setIcebergTblProperties(IcebergTable tbl, TAlterTableParams params, + org.apache.iceberg.Transaction iceTxn) throws ImpalaException { + TAlterTableSetTblPropertiesParams setPropsParams = + params.getSet_tbl_properties_params(); + if (setPropsParams.getTarget() != TTablePropertyType.TBL_PROPERTY) return false; + IcebergCatalogOpExecutor.setTblProperties(iceTxn, setPropsParams.getProperties()); + return true; + } + + /** + * Unsets table properties for an Iceberg table. Returns true on success, returns false + * if the operation is not applicable at the Iceberg table level, e.g. setting SERDE + * properties. + */ + private boolean unsetIcebergTblProperties(IcebergTable tbl, TAlterTableParams params, + org.apache.iceberg.Transaction iceTxn) throws ImpalaException { + TAlterTableUnSetTblPropertiesParams unsetParams = + params.getUnset_tbl_properties_params(); + if (unsetParams.getTarget() != TTablePropertyType.TBL_PROPERTY) return false; + IcebergCatalogOpExecutor.unsetTblProperties(iceTxn, unsetParams.getProperty_keys()); + return true; } /** @@ -2841,12 +2903,21 @@ public class CatalogOpExecutor { Preconditions.checkState(table instanceof FeIcebergTable); long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); - FeIcebergTable iceTable = (FeIcebergTable)table; + addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(), + newCatalogVersion); + FeIcebergTable iceTbl = (FeIcebergTable)table; if (params.isDelete_stats()) { dropColumnStats(table); dropTableStats(table); } - IcebergCatalogOpExecutor.truncateTable(iceTable); + org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(iceTbl); + IcebergCatalogOpExecutor.truncateTable(iceTxn); + if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) { + catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion); + IcebergCatalogOpExecutor.addCatalogVersionToTxn(iceTxn, + catalog_.getCatalogServiceId(), newCatalogVersion); + } + iceTxn.commitTransaction(); return newCatalogVersion; } @@ -3405,24 +3476,34 @@ public class CatalogOpExecutor { .location(); newTable.getSd().setLocation(tableLoc); } else { - if (location == null) { - if (IcebergUtil.getUnderlyingCatalog(newTable) != - TIcebergCatalog.HADOOP_TABLES) { - // When creating external Iceberg table we load - // the Iceberg table using catalog and table identifier to get - // the actual location of the table. This way we can also get the - // correct location for tables stored in nested namespaces. - TableIdentifier identifier = - IcebergUtil.getIcebergTableIdentifier(newTable); - newTable.getSd().setLocation(IcebergUtil.loadTable( - catalog, identifier, - IcebergUtil.getIcebergCatalogLocation(newTable), - newTable.getParameters()).location()); - } else { + // If this is not a synchronized table, we assume that the table must be + // existing in an Iceberg Catalog. + TIcebergCatalog underlyingCatalog = + IcebergUtil.getUnderlyingCatalog(newTable); + String locationToLoadFrom; + if (underlyingCatalog == TIcebergCatalog.HADOOP_TABLES) { + if (location == null) { addSummary(response, "Location is necessary for external iceberg table."); return false; } + locationToLoadFrom = location; + } else { + // For HadoopCatalog tables 'locationToLoadFrom' is the location of the + // hadoop catalog. For HiveCatalog tables it remains null. + locationToLoadFrom = IcebergUtil.getIcebergCatalogLocation(newTable); + } + TableIdentifier identifier = IcebergUtil.getIcebergTableIdentifier(newTable); + org.apache.iceberg.Table iceTable = IcebergUtil.loadTable( + catalog, identifier, locationToLoadFrom, newTable.getParameters()); + // Populate the HMS table schema based on the Iceberg table's schema because + // the Iceberg metadata is the source of truth. This also avoids an + // unnecessary ALTER TABLE. + IcebergCatalogOpExecutor.populateExternalTableCols(newTable, iceTable); + if (location == null) { + // Using the location of the loaded Iceberg table we can also get the + // correct location for tables stored in nested namespaces. + newTable.getSd().setLocation(iceTable.location()); } } @@ -3433,20 +3514,6 @@ public class CatalogOpExecutor { newTable.getPartitionKeys().isEmpty()); if (!isIcebergHmsIntegrationEnabled(newTable)) { msClient.getHiveClient().createTable(newTable); - } else { - // Currently HiveCatalog doesn't set the table property - // 'external.table.purge' during createTable(). - org.apache.hadoop.hive.metastore.api.Table msTbl = - msClient.getHiveClient().getTable( - newTable.getDbName(), newTable.getTableName()); - msTbl.putToParameters("external.table.purge", "TRUE"); - // HiveCatalog also doesn't set the table properties either. - for (Map.Entry<String, String> entry : - params.getTable_properties().entrySet()) { - msTbl.putToParameters(entry.getKey(), entry.getValue()); - } - msClient.getHiveClient().alter_table( - newTable.getDbName(), newTable.getTableName(), msTbl); } events = getNextMetastoreEventsIfEnabled(eventId, event -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE.equals(event.getEventType()) @@ -6002,8 +6069,19 @@ public class CatalogOpExecutor { } if (table instanceof FeIcebergTable && update.isSetIceberg_operation()) { - IcebergCatalogOpExecutor.appendFiles((FeIcebergTable)table, + FeIcebergTable iceTbl = (FeIcebergTable)table; + org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(iceTbl); + IcebergCatalogOpExecutor.appendFiles(iceTbl, iceTxn, update.getIceberg_operation()); + if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) { + // Add catalog service id and the 'newCatalogVersion' to the table parameters. + // This way we can avoid reloading the table on self-events (Iceberg generates + // an ALTER TABLE statement to set the new metadata_location). + IcebergCatalogOpExecutor.addCatalogVersionToTxn(iceTxn, + catalog_.getCatalogServiceId(), newCatalogVersion); + catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion); + } + iceTxn.commitTransaction(); } loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata, diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index 33a47df..5de7ef4 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -35,12 +35,15 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.types.Types; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.IcebergTable; import org.apache.impala.catalog.TableLoadingException; @@ -80,12 +83,23 @@ public class IcebergCatalogOpExecutor { params.getPartition_spec()); IcebergCatalog icebergCatalog = IcebergUtil.getIcebergCatalog(catalog, location); Table iceTable = icebergCatalog.createTable(identifier, schema, spec, location, - excludeHmsOnlyProps(params.getTable_properties())); + params.getTable_properties()); LOG.info("Create iceberg table successful."); return iceTable; } /** + * Populates HMS table schema based on the Iceberg table's schema. + */ + public static void populateExternalTableCols( + org.apache.hadoop.hive.metastore.api.Table msTbl, Table iceTbl) + throws TableLoadingException { + TableMetadata metadata = ((BaseTable)iceTbl).operations().current(); + Schema schema = metadata.schema(); + msTbl.getSd().setCols(IcebergSchemaConverter.convertToHiveSchema(schema)); + } + + /** * Drops Iceberg table from Iceberg's catalog. * Throws TableNotFoundException if table is not found and 'ifExists' is false. */ @@ -107,9 +121,9 @@ public class IcebergCatalogOpExecutor { /** * Adds a column to an existing Iceberg table. */ - public static void addColumn(FeIcebergTable feTable, List<TColumn> columns) + public static void addColumns(Transaction txn, List<TColumn> columns) throws TableLoadingException, ImpalaRuntimeException { - UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable); + UpdateSchema schema = txn.updateSchema(); for (TColumn column : columns) { org.apache.iceberg.types.Type type = IcebergSchemaConverter.fromImpalaColumnType(column.getColumnType()); @@ -125,9 +139,9 @@ public class IcebergCatalogOpExecutor { * FLOAT -> DOUBLE * DECIMAL(p1,s1) -> DECIMAL(p1,s2), same scale, p1<=p2 */ - public static void alterColumn(FeIcebergTable feTable, String colName, TColumn newCol) + public static void alterColumn(Transaction txn, String colName, TColumn newCol) throws TableLoadingException, ImpalaRuntimeException { - UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable); + UpdateSchema schema = txn.updateSchema(); org.apache.iceberg.types.Type type = IcebergSchemaConverter.fromImpalaColumnType(newCol.getColumnType()); // Cannot change a column to complex type @@ -150,8 +164,8 @@ public class IcebergCatalogOpExecutor { * Sets new default partition spec for an Iceberg table. */ public static void alterTableSetPartitionSpec(FeIcebergTable feTable, - TIcebergPartitionSpec partSpec) throws TableLoadingException, - ImpalaRuntimeException { + TIcebergPartitionSpec partSpec, String catalogServiceId, long catalogVersion) + throws TableLoadingException, ImpalaRuntimeException { BaseTable iceTable = (BaseTable)IcebergUtil.loadTable(feTable); TableOperations tableOp = iceTable.operations(); TableMetadata metadata = tableOp.current(); @@ -159,16 +173,21 @@ public class IcebergCatalogOpExecutor { Schema schema = metadata.schema(); PartitionSpec newPartSpec = IcebergUtil.createIcebergPartition(schema, partSpec); TableMetadata newMetadata = metadata.updatePartitionSpec(newPartSpec); - + Map<String, String> properties = new HashMap<>(newMetadata.properties()); + properties.put(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), + catalogServiceId); + properties.put(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), + String.valueOf(catalogVersion)); + newMetadata = newMetadata.replaceProperties(properties); tableOp.commit(metadata, newMetadata); } /** * Drops a column from a Iceberg table. */ - public static void dropColumn(FeIcebergTable feTable, String colName) + public static void dropColumn(Transaction txn, String colName) throws TableLoadingException, ImpalaRuntimeException { - UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable); + UpdateSchema schema = txn.updateSchema(); schema.deleteColumn(colName); schema.commit(); } @@ -177,34 +196,31 @@ public class IcebergCatalogOpExecutor { * Rename Iceberg table */ public static void renameTable(FeIcebergTable feTable, TableIdentifier tableId) - throws ImpalaRuntimeException{ + throws ImpalaRuntimeException { IcebergCatalog catalog = IcebergUtil.getIcebergCatalog(feTable); catalog.renameTable(feTable, tableId); } /** - * Returns a new Map without the properties that only need to be stored at the - * HMS level, not at the Iceberg table level. + * Set TBLPROPERTIES. */ - private static Map<String, String> excludeHmsOnlyProps(Map<String, String> props) { - Map<String, String> ret = new HashMap<>(); - for (Map.Entry<String, String> entry : props.entrySet()) { - if (isHmsOnlyProperty(entry.getKey())) continue; - ret.put(entry.getKey(), entry.getValue()); + public static void setTblProperties(Transaction txn, Map<String, String> properties) { + UpdateProperties updateProps = txn.updateProperties(); + for (Map.Entry<String, String> entry : properties.entrySet()) { + updateProps.set(entry.getKey(), entry.getValue()); } - return ret; + updateProps.commit(); } /** - * Returns true if the table property should only be stored in HMS. - * If false, the property is stored in HMS as well as iceberg. + * Unset TBLPROPERTIES */ - private static boolean isHmsOnlyProperty(String propKey) { - if (IcebergTable.ICEBERG_FILE_FORMAT.equals(propKey)) return true; - if (IcebergTable.ICEBERG_CATALOG_LOCATION.equals(propKey)) return true; - if (IcebergTable.ICEBERG_TABLE_IDENTIFIER.equals(propKey)) return true; - if (CatalogOpExecutor.CAPABILITIES_KEY.equals(propKey)) return true; - return false; + public static void unsetTblProperties(Transaction txn, List<String> removeProperties) { + UpdateProperties updateProps = txn.updateProperties(); + for (String prop : removeProperties) { + updateProps.remove(prop); + } + updateProps.commit(); } /** @@ -225,8 +241,8 @@ public class IcebergCatalogOpExecutor { private static class Append implements BatchWrite { final private AppendFiles append; - public Append(org.apache.iceberg.Table tbl) { - append = tbl.newAppend(); + public Append(Transaction txn) { + append = txn.newAppend(); } @Override @@ -242,8 +258,8 @@ public class IcebergCatalogOpExecutor { private static class DynamicOverwrite implements BatchWrite { final private ReplacePartitions replace; - public DynamicOverwrite(org.apache.iceberg.Table tbl) { - replace = tbl.newReplacePartitions(); + public DynamicOverwrite(Transaction txn) { + replace = txn.newReplacePartitions(); } @Override @@ -261,7 +277,7 @@ public class IcebergCatalogOpExecutor { * Append the newly inserted data files to the Iceberg table using the AppendFiles * API. */ - public static void appendFiles(FeIcebergTable feIcebergTable, + public static void appendFiles(FeIcebergTable feIcebergTable, Transaction txn, TIcebergOperationParam icebergOp) throws ImpalaRuntimeException, TableLoadingException { org.apache.iceberg.Table nativeIcebergTable = @@ -269,9 +285,9 @@ public class IcebergCatalogOpExecutor { List<ByteBuffer> dataFilesFb = icebergOp.getIceberg_data_files_fb(); BatchWrite batchWrite; if (icebergOp.isIs_overwrite()) { - batchWrite = new DynamicOverwrite(nativeIcebergTable); + batchWrite = new DynamicOverwrite(txn); } else { - batchWrite = new Append(nativeIcebergTable); + batchWrite = new Append(txn); } for (ByteBuffer buf : dataFilesFb) { FbIcebergDataFile dataFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf); @@ -323,11 +339,24 @@ public class IcebergCatalogOpExecutor { /** * Creates new snapshot for the iceberg table by deleting all data files. */ - public static void truncateTable(FeIcebergTable feIceTable) - throws ImpalaRuntimeException, TableLoadingException { - Table iceTable = IcebergUtil.loadTable(feIceTable); - DeleteFiles delete = iceTable.newDelete(); + public static void truncateTable(Transaction txn) + throws ImpalaRuntimeException { + DeleteFiles delete = txn.newDelete(); delete.deleteFromRowFilter(Expressions.alwaysTrue()); delete.commit(); } + + /** + * Sets catalog service id and the new catalog version in table properties using 'txn'. + * This way we can avoid reloading the table on self-events. + */ + public static void addCatalogVersionToTxn(Transaction txn, String serviceId, + long version) { + UpdateProperties updateProps = txn.updateProperties(); + updateProps.set(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), + serviceId); + updateProps.set(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), + String.valueOf(version)); + updateProps.commit(); + } } diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java index 8bca3a6..364430f 100644 --- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java +++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java @@ -41,7 +41,7 @@ import com.google.common.primitives.Longs; import org.apache.impala.catalog.IcebergStructField; import org.apache.impala.common.Pair; import org.apache.iceberg.BaseTable; -import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; @@ -186,12 +186,12 @@ public class IcebergUtil { } /** - * Get Iceberg UpdateSchema from 'feTable', usually use UpdateSchema to update Iceberg + * Get Iceberg Transaction for 'feTable', usually use Transaction to update Iceberg * table schema. */ - public static UpdateSchema getIcebergUpdateSchema(FeIcebergTable feTable) + public static Transaction getIcebergTransaction(FeIcebergTable feTable) throws TableLoadingException, ImpalaRuntimeException { - return getIcebergCatalog(feTable).loadTable(feTable).updateSchema(); + return getIcebergCatalog(feTable).loadTable(feTable).newTransaction(); } /** diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test index 3df404e..6da98c7 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test @@ -104,7 +104,6 @@ DESCRIBE FORMATTED iceberg_hive_catalogs; ---- RESULTS: VERIFY_IS_SUBSET 'Location: ','$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs','NULL' '','write.format.default','parquet ' -'','iceberg.catalog ','ice_hive_cat ' ---- TYPES string, string, string ==== @@ -136,7 +135,6 @@ DESCRIBE FORMATTED iceberg_hive_catalogs_ext; ---- RESULTS: VERIFY_IS_SUBSET 'Location: ','$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs','NULL' '','write.format.default','parquet ' -'','iceberg.catalog ','ice_hive_cat ' '','iceberg.table_identifier','$DATABASE.iceberg_hive_catalogs' '','name ','$DATABASE.iceberg_hive_catalogs' ---- TYPES diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test index b6edbfd..dfe435e 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test +++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test @@ -911,7 +911,7 @@ PARTITIONED BY SPEC (BUCKET(3, i)) STORED AS ICEBERG LOCATION '$$location_uri$$' TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet', -'engine.hive.enabled'='true', 'iceberg.catalog'='ice_hive_cat', 'table_type'='ICEBERG') +'engine.hive.enabled'='true', 'table_type'='ICEBERG') ==== ---- CREATE_TABLE CREATE TABLE iceberg_catalogs_hadoop (i int) diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index c90275e..1df40b5 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -325,6 +325,64 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite): # 24 partitions inserted and hence we must refresh 24 partitions once. assert int(partitions_refreshed_after_hive) == int(partitions_refreshed_insert) + 24 + @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") + def test_iceberg_self_events(self, unique_database): + """This test checks that Impala doesn't refresh Iceberg tables on self events.""" + tbl_name = unique_database + ".test_iceberg_events" + + def check_self_events(query, skips_events=True): + tbls_refreshed_before, partitions_refreshed_before, \ + events_skipped_before = self.__get_self_event_metrics() + self.client.execute(query) + EventProcessorUtils.wait_for_event_processing(self) + tbls_refreshed_after, partitions_refreshed_after, \ + events_skipped_after = self.__get_self_event_metrics() + assert tbls_refreshed_before == tbls_refreshed_after + assert partitions_refreshed_before == partitions_refreshed_after + if skips_events: + assert events_skipped_after > events_skipped_before + + hadoop_tables = "'iceberg.catalog'='hadoop.tables'" + hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', " + + "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format( + unique_database)) + hive_catalog = "'iceberg.catalog'='hive.catalog'" + hive_catalogs = "'iceberg.catalog'='ice_hive_cat'" + hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'" + + all_catalogs = [hadoop_tables, hadoop_catalog, hive_catalog, hive_catalogs, + hadoop_catalogs] + + for catalog in all_catalogs: + is_hive_catalog = catalog == hive_catalog or catalog == hive_catalogs + self.client.execute(""" + CREATE TABLE {0} (i int) STORED AS ICEBERG + TBLPROPERTIES ({1})""".format(tbl_name, catalog)) + + check_self_events("ALTER TABLE {0} ADD COLUMN j INT".format(tbl_name)) + check_self_events("ALTER TABLE {0} DROP COLUMN i".format(tbl_name)) + check_self_events("ALTER TABLE {0} CHANGE COLUMN j j BIGINT".format(tbl_name)) + # SET PARTITION SPEC only updates HMS in case of HiveCatalog (which sets + # table property 'metadata_location') + check_self_events( + "ALTER TABLE {0} SET PARTITION SPEC (truncate(2, j))".format(tbl_name), + skips_events=is_hive_catalog) + check_self_events( + "ALTER TABLE {0} SET TBLPROPERTIES('key'='value')".format(tbl_name)) + check_self_events("ALTER TABLE {0} UNSET TBLPROPERTIES('key')".format(tbl_name)) + check_self_events("INSERT INTO {0} VALUES (1), (2), (3)".format(tbl_name), + skips_events=is_hive_catalog) + check_self_events("INSERT OVERWRITE {0} VALUES (4), (5), (6)".format(tbl_name), + skips_events=is_hive_catalog) + ctas_tbl = unique_database + ".ice_ctas" + check_self_events("""CREATE TABLE {0} STORED AS ICEBERG + TBLPROPERTIES ({1}) AS SELECT * FROM {2}""".format(ctas_tbl, catalog, tbl_name)) + check_self_events("DROP TABLE {0}".format(ctas_tbl)) + check_self_events("TRUNCATE TABLE {0}".format(tbl_name), + skips_events=is_hive_catalog) + + self.client.execute("DROP TABLE {0}".format(tbl_name)) + def __run_self_events_test(self, db_name, use_impala): recover_tbl_name = ImpalaTestSuite.get_random_name("tbl_") # create a table similar to alltypes so that we can recover the partitions on it diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py index 8e82974..07844c8 100644 --- a/tests/metadata/test_show_create_table.py +++ b/tests/metadata/test_show_create_table.py @@ -39,7 +39,9 @@ class TestShowCreateTable(ImpalaTestSuite): "STATS_GENERATED_VIA_STATS_TASK", "last_modified_by", "last_modified_time", "numFilesErasureCoded", "bucketing_version", "OBJCAPABILITIES", - "TRANSLATED_TO_EXTERNAL", "previous_metadata_location"] + "TRANSLATED_TO_EXTERNAL", "previous_metadata_location", + "impala.events.catalogServiceId", + "impala.events.catalogVersion"] @classmethod def get_workload(self): diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index ca75ba0..c4e23f0 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -460,3 +460,26 @@ class TestIcebergTable(ImpalaTestSuite): pytest.skip('runs only in exhaustive') self.run_test_case('QueryTest/iceberg-write-many-files-stress', vector, use_db=unique_database) + + def test_consistent_scheduling(self, vector, unique_database): + """IMPALA-10914: This test verifies that Impala schedules scan ranges consistently for + Iceberg tables.""" + def collect_split_stats(profile): + splits = [l.strip() for l in profile.splitlines() if "Hdfs split stats" in l] + splits.sort() + return splits + + with self.create_impala_client() as impalad_client: + impalad_client.execute("use " + unique_database) + impalad_client.execute("""create table line_ice stored as iceberg + as select * from tpch_parquet.lineitem""") + first_result = impalad_client.execute("""select count(*) from line_ice""") + ref_profile = first_result.runtime_profile + ref_split_stats = collect_split_stats(ref_profile) + + for i in range(0, 10): + # Subsequent executions of the same query should schedule scan ranges similarly. + result = impalad_client.execute("""select count(*) from line_ice""") + profile = result.runtime_profile + split_stats = collect_split_stats(profile) + assert ref_split_stats == split_stats diff --git a/tests/stress/test_insert_stress.py b/tests/stress/test_insert_stress.py index fe43a38..5caf3db 100644 --- a/tests/stress/test_insert_stress.py +++ b/tests/stress/test_insert_stress.py @@ -48,9 +48,16 @@ class TestInsertStress(ImpalaTestSuite): try: insert_cnt = 0 while insert_cnt < num_inserts: - impalad_client.execute("insert into table %s values (%i, %i)" % ( - tbl_name, wid, insert_cnt)) - insert_cnt += 1 + try: + impalad_client.execute("insert into table %s values (%i, %i)" % ( + tbl_name, wid, insert_cnt)) + insert_cnt += 1 + except Exception as e: + # It's possible that the Iceberg table is concurrently updated in CatalogD + # during data load in local catalog. + if "InconsistentMetadataFetchException" in str(e): + continue + raise e finally: with counter.get_lock(): counter.value += 1 @@ -72,9 +79,16 @@ class TestInsertStress(ImpalaTestSuite): impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) try: while counter.value != writers: - result = impalad_client.execute("select * from %s" % tbl_name) - verify_result_set(result) - time.sleep(random.random()) + try: + result = impalad_client.execute("select * from %s" % tbl_name) + verify_result_set(result) + time.sleep(random.random()) + except Exception as e: + # It's possible that the Iceberg table is concurrently updated in CatalogD + # during data load in local catalog. + if "InconsistentMetadataFetchException" in str(e): + continue + raise e finally: impalad_client.close()
