This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 04735598d660ef2063d3f3ddb78cfd08a7b6f6a9 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Fri Jan 31 17:37:35 2025 +0100 IMPALA-13718: Skip reloading Iceberg tables when metadata JSON file is the same With this patch Impala skips reloading Iceberg tables when metadata JSON file is the same, as this means that the table is essentially unchanged. This can help in situations when the event processor is lagging behind and we have an Iceberg table that is updated frequently. Imagine the case when Impala gets 100 events for an Iceberg table. In this case after processing the first event, our internal representation of the Iceberg table is already up-to-date, there is no need to do the reload 100 times. We cannot use the internal icebergApiTable_'s metadata location, as the following statement might silently refresh the metadata in 'current()': icebergApiTable_.operations().current().metadataFileLocation() To guarantee that we check against the actual loaded metadata this patch introduces a new member to store the metadata location. Testing * added e2e tests for REFRESH, also for event processing Change-Id: I16727000cb11d1c0591875a6542d428564dce664 Reviewed-on: http://gerrit.cloudera.org:8080/22432 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Noemi Pap-Takacs <[email protected]> --- .../org/apache/impala/catalog/IcebergTable.java | 134 +++++++++++++-------- .../iceberg-mixed-format-position-deletes.test | 4 + tests/metadata/test_event_processing.py | 17 +++ 3 files changed, 108 insertions(+), 47 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index 75c026630..36799218f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; @@ -37,6 +38,7 @@ import java.util.TreeMap; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; @@ -243,6 +245,7 @@ public class IcebergTable extends Table implements FeIcebergTable { // Cached Iceberg API table object. private org.apache.iceberg.Table icebergApiTable_; + private String currentMetadataLocation_ = null; // The snapshot id cached in the CatalogD, necessary to syncronize the caches. private long catalogSnapshotId_ = -1; @@ -456,60 +459,97 @@ public class IcebergTable extends Table implements FeIcebergTable { getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time(); verifyTable(msTbl); try { - // Copy the table to check later if anything has changed. - msTable_ = msTbl.deepCopy(); - // Other engines might create Iceberg tables without setting the HiveIceberg* - // storage descriptors. Impala relies on the storage descriptors being set to - // certain classes, so we set it here for the in-memory metastore table. - FeIcebergTable.setIcebergStorageDescriptor(msTable_); - setTableStats(msTable_); - // Load metadata from Iceberg - final Timer.Context ctxStorageLdTime = - getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time(); - try { - icebergApiTable_ = IcebergUtil.loadTable(this); - catalogTimeline.markEvent("Loaded Iceberg API table"); - catalogSnapshotId_ = FeIcebergTable.super.snapshotId(); - loadSchemaFromIceberg(); - catalogTimeline.markEvent("Loaded schema from Iceberg"); - // Loading hdfs table after loaded schema from Iceberg, - // in case we create external Iceberg table skipping column info in sql. - icebergFileFormat_ = IcebergUtil.getIcebergFileFormat(msTbl); - icebergParquetCompressionCodec_ = Utils.getIcebergParquetCompressionCodec(msTbl); - icebergParquetRowGroupSize_ = Utils.getIcebergParquetRowGroupSize(msTbl); - icebergParquetPlainPageSize_ = Utils.getIcebergParquetPlainPageSize(msTbl); - icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTbl); - GroupedContentFiles icebergFiles = IcebergUtil.getIcebergFiles(this, - new ArrayList<>(), /*timeTravelSpec=*/null); - catalogTimeline.markEvent("Loaded Iceberg files"); - hdfsTable_.setSkipIcebergFileMetadataLoading(true); - hdfsTable_.load(reuseMetadata, msClient, msTable_, reason, catalogTimeline); - IcebergFileMetadataLoader loader = new IcebergFileMetadataLoader( - icebergApiTable_, - fileStore_ == null ? Collections.emptyList() : fileStore_.getAllFiles(), - getHostIndex(), Preconditions.checkNotNull(icebergFiles), - Utils.requiresDataFilesInTableLocation(this)); - loader.load(); - fileStore_ = new IcebergContentFileStore( - icebergApiTable_, loader.getLoadedIcebergFds(), icebergFiles); - partitionStats_ = Utils.loadPartitionStats(this, icebergFiles); - setIcebergTableStats(); - loadAllColumnStats(msClient, catalogTimeline); - applyPuffinNdvStats(catalogTimeline); - setAvroSchema(msClient, msTbl, fileStore_, catalogTimeline); - updateMetrics(loader.getFileMetadataStats()); - } catch (Exception e) { + loadTableMetadata(msClient, msTbl, catalogTimeline); + loadFileMetadata(reuseMetadata, msClient, reason, catalogTimeline); + setIcebergTableStats(); + refreshLastUsedTime(); + } catch (Exception e) { throw new IcebergTableLoadingException("Error loading metadata for Iceberg table " + icebergTableLocation_, e); - } finally { - storageMetadataLoadTime_ = ctxStorageLdTime.stop(); - } - refreshLastUsedTime(); } finally { context.stop(); } } + private void loadTableMetadata(IMetaStoreClient msClient, + org.apache.hadoop.hive.metastore.api.Table msTbl, EventSequence catalogTimeline) + throws TableLoadingException, ImpalaRuntimeException { + // Copy the table to check later if anything has changed. + msTable_ = msTbl.deepCopy(); + // Other engines might create Iceberg tables without setting the HiveIceberg* + // storage descriptors. Impala relies on the storage descriptors being set to + // certain classes, so we set it here for the in-memory metastore table. + FeIcebergTable.setIcebergStorageDescriptor(msTable_); + setTableStats(msTable_); + icebergApiTable_ = IcebergUtil.loadTable(this); + catalogTimeline.markEvent("Loaded Iceberg API table"); + catalogSnapshotId_ = FeIcebergTable.super.snapshotId(); + loadSchemaFromIceberg(); + catalogTimeline.markEvent("Loaded schema from Iceberg"); + icebergFileFormat_ = IcebergUtil.getIcebergFileFormat(msTbl); + icebergParquetCompressionCodec_ = Utils.getIcebergParquetCompressionCodec(msTbl); + icebergParquetRowGroupSize_ = Utils.getIcebergParquetRowGroupSize(msTbl); + icebergParquetPlainPageSize_ = Utils.getIcebergParquetPlainPageSize(msTbl); + icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTbl); + loadAllColumnStats(msClient, catalogTimeline); + applyPuffinNdvStats(catalogTimeline); + } + + /** + * Reloads file metadata, unless reuseMetadata is true and metadata.json file hasn't + * changed. + */ + private void loadFileMetadata(boolean reuseMetadata, IMetaStoreClient msClient, + String reason, EventSequence catalogTimeline) throws IcebergTableLoadingException { + if (reuseMetadata && canSkipReload()) { + catalogTimeline.markEvent( + "Iceberg table reload skipped as no change detected"); + return; + } + final Timer.Context ctxStorageLdTime = + getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time(); + try { + currentMetadataLocation_ = + ((BaseTable)icebergApiTable_).operations().current().metadataFileLocation(); + GroupedContentFiles icebergFiles = IcebergUtil.getIcebergFiles(this, + new ArrayList<>(), /*timeTravelSpec=*/null); + catalogTimeline.markEvent("Loaded Iceberg files"); + // We use IcebergFileMetadataLoader directly to load file metadata, so we don't + // want 'hdfsTable_' to do any file loading. + hdfsTable_.setSkipIcebergFileMetadataLoading(true); + // Iceberg schema loading must always precede hdfs table loading, because in case we + // create an external Iceberg table, we have no column information in the SQL + // statement. + hdfsTable_.load(reuseMetadata, msClient, msTable_, reason, catalogTimeline); + IcebergFileMetadataLoader loader = new IcebergFileMetadataLoader( + icebergApiTable_, + fileStore_ == null ? Collections.emptyList() : fileStore_.getAllFiles(), + getHostIndex(), Preconditions.checkNotNull(icebergFiles), + Utils.requiresDataFilesInTableLocation(this)); + loader.load(); + fileStore_ = new IcebergContentFileStore( + icebergApiTable_, loader.getLoadedIcebergFds(), icebergFiles); + partitionStats_ = Utils.loadPartitionStats(this, icebergFiles); + + setAvroSchema(msClient, msTable_, fileStore_, catalogTimeline); + updateMetrics(loader.getFileMetadataStats()); + } catch (Exception e) { + throw new IcebergTableLoadingException("Error loading metadata for Iceberg table " + + icebergTableLocation_, e); + } finally { + storageMetadataLoadTime_ = ctxStorageLdTime.stop(); + } + } + + private boolean canSkipReload() { + if (icebergApiTable_ == null) return false; + Preconditions.checkState(icebergApiTable_ instanceof BaseTable); + BaseTable newTable = (BaseTable) icebergApiTable_; + return Objects.equals( + currentMetadataLocation_, + newTable.operations().current().metadataFileLocation()); + } + private void updateMetrics(FileMetadataStats stats) { long memUsageEstimate = stats.numFiles * PER_FD_MEM_USAGE_BYTES + stats.numBlocks * PER_BLOCK_MEM_USAGE_BYTES; diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-mixed-format-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-mixed-format-position-deletes.test index 056030f57..eb0bec7fd 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-mixed-format-position-deletes.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-mixed-format-position-deletes.test @@ -14,7 +14,11 @@ ALTER TABLE ice_mixed_formats SET TBLPROPERTIES ('write.format.default'='avro'); DELETE FROM ice_mixed_formats WHERE i = 3; ==== ---- QUERY +# Second REFRESH should not do a full reload. refresh ice_mixed_formats; +refresh ice_mixed_formats; +---- RUNTIME_PROFILE +row_regex:.*Iceberg table reload skipped as no change detected ==== ---- QUERY SHOW FILES IN ice_mixed_formats; diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py index eb0455671..dde3a06b9 100644 --- a/tests/metadata/test_event_processing.py +++ b/tests/metadata/test_event_processing.py @@ -75,6 +75,23 @@ class TestEventProcessing(ImpalaTestSuite): finally: self.execute_query("drop database if exists {0} cascade".format(db_name)) + def test_hive_impala_iceberg_reloads(self, unique_database): + test_tbl = unique_database + ".test_events" + self.run_stmt_in_hive("create table {} (value string) \ + partitioned by (year int) stored by iceberg".format(test_tbl)) + EventProcessorUtils.wait_for_event_processing(self) + self.execute_query("describe {}".format(test_tbl)) + + self.run_stmt_in_hive("insert into {} values ('1', 2025)".format(test_tbl)) + self.run_stmt_in_hive("select * from {}".format(test_tbl)) + + EventProcessorUtils.wait_for_event_processing(self) + res = self.execute_query("select * from {}".format(test_tbl)) + + assert ["1\t2025"] == res.data + res = self.execute_query("refresh {}".format(test_tbl)) + assert "Iceberg table reload skipped as no change detected" in res.runtime_profile + @SkipIfHive2.acid def test_empty_partition_events_transactional(self, unique_database): self._run_test_empty_partition_events(unique_database, True)
