This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 04735598d660ef2063d3f3ddb78cfd08a7b6f6a9
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Fri Jan 31 17:37:35 2025 +0100

    IMPALA-13718: Skip reloading Iceberg tables when metadata JSON file is the 
same
    
    With this patch Impala skips reloading Iceberg tables when metadata
    JSON file is the same, as this means that the table is essentially
    unchanged.
    
    This can help in situations when the event processor is lagging behind
    and we have an Iceberg table that is updated frequently. Imagine the
    case when Impala gets 100 events for an Iceberg table. In this case
    after processing the first event, our internal representation of
    the Iceberg table is already up-to-date, there is no need to do the
    reload 100 times.
    
    We cannot use the internal icebergApiTable_'s metadata location,
    as the following statement might silently refresh the metadata
    in 'current()':
    
     icebergApiTable_.operations().current().metadataFileLocation()
    
    To guarantee that we check against the actual loaded metadata
    this patch introduces a new member to store the metadata location.
    
    Testing
     * added e2e tests for REFRESH, also for event processing
    
    Change-Id: I16727000cb11d1c0591875a6542d428564dce664
    Reviewed-on: http://gerrit.cloudera.org:8080/22432
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Noemi Pap-Takacs <[email protected]>
---
 .../org/apache/impala/catalog/IcebergTable.java    | 134 +++++++++++++--------
 .../iceberg-mixed-format-position-deletes.test     |   4 +
 tests/metadata/test_event_processing.py            |  17 +++
 3 files changed, 108 insertions(+), 47 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 75c026630..36799218f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -37,6 +38,7 @@ import java.util.TreeMap;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
@@ -243,6 +245,7 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
 
   // Cached Iceberg API table object.
   private org.apache.iceberg.Table icebergApiTable_;
+  private String currentMetadataLocation_ = null;
 
   // The snapshot id cached in the CatalogD, necessary to syncronize the 
caches.
   private long catalogSnapshotId_ = -1;
@@ -456,60 +459,97 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
         getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     verifyTable(msTbl);
     try {
-      // Copy the table to check later if anything has changed.
-      msTable_ = msTbl.deepCopy();
-      // Other engines might create Iceberg tables without setting the 
HiveIceberg*
-      // storage descriptors. Impala relies on the storage descriptors being 
set to
-      // certain classes, so we set it here for the in-memory metastore table.
-      FeIcebergTable.setIcebergStorageDescriptor(msTable_);
-      setTableStats(msTable_);
-      // Load metadata from Iceberg
-      final Timer.Context ctxStorageLdTime =
-          getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time();
-      try {
-        icebergApiTable_ = IcebergUtil.loadTable(this);
-        catalogTimeline.markEvent("Loaded Iceberg API table");
-        catalogSnapshotId_ = FeIcebergTable.super.snapshotId();
-        loadSchemaFromIceberg();
-        catalogTimeline.markEvent("Loaded schema from Iceberg");
-        // Loading hdfs table after loaded schema from Iceberg,
-        // in case we create external Iceberg table skipping column info in 
sql.
-        icebergFileFormat_ = IcebergUtil.getIcebergFileFormat(msTbl);
-        icebergParquetCompressionCodec_ = 
Utils.getIcebergParquetCompressionCodec(msTbl);
-        icebergParquetRowGroupSize_ = 
Utils.getIcebergParquetRowGroupSize(msTbl);
-        icebergParquetPlainPageSize_ = 
Utils.getIcebergParquetPlainPageSize(msTbl);
-        icebergParquetDictPageSize_ = 
Utils.getIcebergParquetDictPageSize(msTbl);
-        GroupedContentFiles icebergFiles = IcebergUtil.getIcebergFiles(this,
-            new ArrayList<>(), /*timeTravelSpec=*/null);
-        catalogTimeline.markEvent("Loaded Iceberg files");
-        hdfsTable_.setSkipIcebergFileMetadataLoading(true);
-        hdfsTable_.load(reuseMetadata, msClient, msTable_, reason, 
catalogTimeline);
-        IcebergFileMetadataLoader loader = new IcebergFileMetadataLoader(
-            icebergApiTable_,
-            fileStore_ == null ? Collections.emptyList() : 
fileStore_.getAllFiles(),
-            getHostIndex(), Preconditions.checkNotNull(icebergFiles),
-            Utils.requiresDataFilesInTableLocation(this));
-        loader.load();
-        fileStore_ = new IcebergContentFileStore(
-            icebergApiTable_, loader.getLoadedIcebergFds(), icebergFiles);
-        partitionStats_ = Utils.loadPartitionStats(this, icebergFiles);
-        setIcebergTableStats();
-        loadAllColumnStats(msClient, catalogTimeline);
-        applyPuffinNdvStats(catalogTimeline);
-        setAvroSchema(msClient, msTbl, fileStore_, catalogTimeline);
-        updateMetrics(loader.getFileMetadataStats());
-      } catch (Exception e) {
+      loadTableMetadata(msClient, msTbl, catalogTimeline);
+      loadFileMetadata(reuseMetadata, msClient, reason, catalogTimeline);
+      setIcebergTableStats();
+      refreshLastUsedTime();
+    } catch (Exception e) {
         throw new IcebergTableLoadingException("Error loading metadata for 
Iceberg table "
             + icebergTableLocation_, e);
-      } finally {
-        storageMetadataLoadTime_ = ctxStorageLdTime.stop();
-      }
-      refreshLastUsedTime();
     } finally {
       context.stop();
     }
   }
 
+  private void loadTableMetadata(IMetaStoreClient msClient,
+      org.apache.hadoop.hive.metastore.api.Table msTbl, EventSequence 
catalogTimeline)
+      throws TableLoadingException, ImpalaRuntimeException {
+    // Copy the table to check later if anything has changed.
+    msTable_ = msTbl.deepCopy();
+    // Other engines might create Iceberg tables without setting the 
HiveIceberg*
+    // storage descriptors. Impala relies on the storage descriptors being set 
to
+    // certain classes, so we set it here for the in-memory metastore table.
+    FeIcebergTable.setIcebergStorageDescriptor(msTable_);
+    setTableStats(msTable_);
+    icebergApiTable_ = IcebergUtil.loadTable(this);
+    catalogTimeline.markEvent("Loaded Iceberg API table");
+    catalogSnapshotId_ = FeIcebergTable.super.snapshotId();
+    loadSchemaFromIceberg();
+    catalogTimeline.markEvent("Loaded schema from Iceberg");
+    icebergFileFormat_ = IcebergUtil.getIcebergFileFormat(msTbl);
+    icebergParquetCompressionCodec_ = 
Utils.getIcebergParquetCompressionCodec(msTbl);
+    icebergParquetRowGroupSize_ = Utils.getIcebergParquetRowGroupSize(msTbl);
+    icebergParquetPlainPageSize_ = Utils.getIcebergParquetPlainPageSize(msTbl);
+    icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTbl);
+    loadAllColumnStats(msClient, catalogTimeline);
+    applyPuffinNdvStats(catalogTimeline);
+  }
+
+  /**
+   * Reloads file metadata, unless reuseMetadata is true and metadata.json 
file hasn't
+   * changed.
+   */
+  private void loadFileMetadata(boolean reuseMetadata, IMetaStoreClient 
msClient,
+      String reason, EventSequence catalogTimeline) throws 
IcebergTableLoadingException {
+    if (reuseMetadata && canSkipReload()) {
+      catalogTimeline.markEvent(
+          "Iceberg table reload skipped as no change detected");
+      return;
+    }
+    final Timer.Context ctxStorageLdTime =
+        getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time();
+    try {
+      currentMetadataLocation_ =
+          
((BaseTable)icebergApiTable_).operations().current().metadataFileLocation();
+      GroupedContentFiles icebergFiles = IcebergUtil.getIcebergFiles(this,
+          new ArrayList<>(), /*timeTravelSpec=*/null);
+      catalogTimeline.markEvent("Loaded Iceberg files");
+      // We use IcebergFileMetadataLoader directly to load file metadata, so 
we don't
+      // want 'hdfsTable_' to do any file loading.
+      hdfsTable_.setSkipIcebergFileMetadataLoading(true);
+      // Iceberg schema loading must always precede hdfs table loading, 
because in case we
+      // create an external Iceberg table, we have no column information in 
the SQL
+      // statement.
+      hdfsTable_.load(reuseMetadata, msClient, msTable_, reason, 
catalogTimeline);
+      IcebergFileMetadataLoader loader = new IcebergFileMetadataLoader(
+          icebergApiTable_,
+          fileStore_ == null ? Collections.emptyList() : 
fileStore_.getAllFiles(),
+          getHostIndex(), Preconditions.checkNotNull(icebergFiles),
+          Utils.requiresDataFilesInTableLocation(this));
+      loader.load();
+      fileStore_ = new IcebergContentFileStore(
+          icebergApiTable_, loader.getLoadedIcebergFds(), icebergFiles);
+      partitionStats_ = Utils.loadPartitionStats(this, icebergFiles);
+
+      setAvroSchema(msClient, msTable_, fileStore_, catalogTimeline);
+      updateMetrics(loader.getFileMetadataStats());
+    } catch (Exception e) {
+      throw new IcebergTableLoadingException("Error loading metadata for 
Iceberg table "
+          + icebergTableLocation_, e);
+    } finally {
+      storageMetadataLoadTime_ = ctxStorageLdTime.stop();
+    }
+  }
+
+  private boolean canSkipReload() {
+    if (icebergApiTable_ == null) return false;
+    Preconditions.checkState(icebergApiTable_ instanceof BaseTable);
+    BaseTable newTable = (BaseTable) icebergApiTable_;
+    return Objects.equals(
+        currentMetadataLocation_,
+        newTable.operations().current().metadataFileLocation());
+  }
+
   private void updateMetrics(FileMetadataStats stats) {
     long memUsageEstimate = stats.numFiles * PER_FD_MEM_USAGE_BYTES +
         stats.numBlocks * PER_BLOCK_MEM_USAGE_BYTES;
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-mixed-format-position-deletes.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-mixed-format-position-deletes.test
index 056030f57..eb0bec7fd 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-mixed-format-position-deletes.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-mixed-format-position-deletes.test
@@ -14,7 +14,11 @@ ALTER TABLE ice_mixed_formats SET TBLPROPERTIES 
('write.format.default'='avro');
 DELETE FROM ice_mixed_formats WHERE i = 3;
 ====
 ---- QUERY
+# Second REFRESH should not do a full reload.
 refresh ice_mixed_formats;
+refresh ice_mixed_formats;
+---- RUNTIME_PROFILE
+row_regex:.*Iceberg table reload skipped as no change detected
 ====
 ---- QUERY
 SHOW FILES IN ice_mixed_formats;
diff --git a/tests/metadata/test_event_processing.py 
b/tests/metadata/test_event_processing.py
index eb0455671..dde3a06b9 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -75,6 +75,23 @@ class TestEventProcessing(ImpalaTestSuite):
     finally:
       self.execute_query("drop database if exists {0} cascade".format(db_name))
 
+  def test_hive_impala_iceberg_reloads(self, unique_database):
+    test_tbl = unique_database + ".test_events"
+    self.run_stmt_in_hive("create table {} (value string) \
+        partitioned by (year int) stored by iceberg".format(test_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    self.execute_query("describe {}".format(test_tbl))
+
+    self.run_stmt_in_hive("insert into {} values ('1', 2025)".format(test_tbl))
+    self.run_stmt_in_hive("select * from {}".format(test_tbl))
+
+    EventProcessorUtils.wait_for_event_processing(self)
+    res = self.execute_query("select * from {}".format(test_tbl))
+
+    assert ["1\t2025"] == res.data
+    res = self.execute_query("refresh {}".format(test_tbl))
+    assert "Iceberg table reload skipped as no change detected" in 
res.runtime_profile
+
   @SkipIfHive2.acid
   def test_empty_partition_events_transactional(self, unique_database):
     self._run_test_empty_partition_events(unique_database, True)

Reply via email to