This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5835f2dfbe5d610d3c444da3d67406caf296c4f1
Author: stiga-huang <[email protected]>
AuthorDate: Sat Feb 22 20:07:18 2025 +0800

    IMPALA-13783: Fix huge temp list created in catalog-update thread
    
    In catalogd, the catalog-update thread collects new/deleted catalog
    objects since the last update. When collecting new partitions of a
    HdfsTable, a list of TCatalogObject is created. Each TCatalogObject
    has a THdfsPartition corresponding to a new partition. See
    HdfsTable#getNewPartitionsSinceLastUpdate().
    
    This temp list could be huge and occupying a comparable memory footprint
    to the table metadata. E.g. for a HdfsTable that has 6M partitions and
    6M files (one file per partition), the HdfsTable object itself takes
    7.3GB in memory. This temp list takes 8.9GB. See more details in the
    JIRA description.
    
    This patch fixes the issue by iterating the partition map directly to
    avoid creating this temp list. For each partition, we collect its thrift
    representation and add it into the topic update directly.
    
    In local-catalog mode, the thrift representation of the partition can be
    simplified to just contain the names of db, table, partition and the
    partition ids. This is also optimized to reduce the topic update size in
    local-catalog mode.
    
    Change-Id: I41dfd48278cb87bdc6f251a13919466d15f8d52d
    Reviewed-on: http://gerrit.cloudera.org:8080/22538
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/catalog/CatalogObject.java   | 10 ++++++----
 .../impala/catalog/CatalogServiceCatalog.java      | 21 +++++++++++++++++++--
 .../org/apache/impala/catalog/HdfsPartition.java   |  3 +++
 .../java/org/apache/impala/catalog/HdfsTable.java  | 22 ----------------------
 4 files changed, 28 insertions(+), 28 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
index 23b970092..d550ec8c0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
@@ -28,10 +28,12 @@ public interface CatalogObject extends HasName {
    * Catalog objects are often serialized to Thrift. When doing so, many of 
the objects
    * have a minimal "descriptor" form used in query execution as well as a 
more complete
    * "full" form with all information, used in catalog topic updates and DDL 
responses to
-   * coordinators. When sending incremental update for a hdfs table, its 
"descriptor" form
-   * is used with no partitions. Its incremental partition updates will follow 
it in the
-   * same topic update. "invalidation" form means only the name will be 
included. "none"
-   * form means return nothing, i.e. null.
+   * coordinators in the legacy catalog mode. When sending incremental update 
for a hdfs
+   * table, its "descriptor" form is used with no partitions. Its incremental 
partition
+   * updates will follow it in the same topic update. "invalidation" form 
means only
+   * collecting information that is enough for local-catalog mode coordinators 
to
+   * invalidate the stale cache item. Usually only names and ids will be 
included.
+   * "none" form means return nothing, i.e. null.
    */
   enum ThriftObjectType {
     FULL,
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 9bbd0e915..a5f68f6ab 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -1795,10 +1795,27 @@ public class CatalogServiceCatalog extends Catalog {
 
     // Add updates for new partitions.
     long maxSentId = hdfsTable.getMaxSentPartitionId();
-    for (TCatalogObject catalogPart : 
hdfsTable.getNewPartitionsSinceLastUpdate()) {
-      maxSentId = Math.max(maxSentId, catalogPart.getHdfs_partition().getId());
+    int numSkippedParts = 0;
+    for (PrunablePartition p: hdfsTable.getPartitions()) {
+      HdfsPartition partition = (HdfsPartition) p;
+      if (partition.getId() <= hdfsTable.getMaxSentPartitionId()) {
+        numSkippedParts++;
+        continue;
+      }
+      maxSentId = Math.max(maxSentId, partition.getId());
+      TCatalogObject catalogPart;
+      if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) {
+        catalogPart = new TCatalogObject(
+            TCatalogObjectType.HDFS_PARTITION, hdfsTable.getCatalogVersion());
+        partition.setTCatalogObject(catalogPart);
+      } else {
+        Preconditions.checkState(topicMode_ == TopicMode.MINIMAL);
+        catalogPart = partition.toMinimalTCatalogObject();
+      }
       ctx.addCatalogObject(catalogPart, false, updateSummary);
     }
+    LOG.info("Skipped {} partitions of table {} in the incremental update",
+        numSkippedParts, hdfsTable.getFullName());
     hdfsTable.setMaxSentPartitionId(maxSentId);
 
     for (HdfsPartition part : hdfsTable.getDroppedPartitions()) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index d967d050b..3d337beec 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -792,6 +792,9 @@ public class HdfsPartition extends CatalogObjectImpl
     part.setTbl_name(table_.getName());
     part.setPartition_name(partName_);
     part.setId(id_);
+    if (prevId_ != INITIAL_PARTITION_ID - 1) {
+      part.setPrev_id(prevId_);
+    }
     return part;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 141f38286..b68d21eb4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2318,28 +2318,6 @@ public class HdfsTable extends Table implements 
FeFsTable {
    */
   public void resetDroppedPartitions() { droppedPartitions_.clear(); }
 
-  /**
-   * Gets catalog objects of new partitions since last catalog update. They 
are partitions
-   * that coordinators are not aware of.
-   */
-  public List<TCatalogObject> getNewPartitionsSinceLastUpdate() {
-    List<TCatalogObject> result = new ArrayList<>();
-    int numSkippedParts = 0;
-    for (HdfsPartition partition: partitionMap_.values()) {
-      if (partition.getId() <= maxSentPartitionId_) {
-        numSkippedParts++;
-        continue;
-      }
-      TCatalogObject catalogPart =
-          new TCatalogObject(TCatalogObjectType.HDFS_PARTITION, 
getCatalogVersion());
-      partition.setTCatalogObject(catalogPart);
-      result.add(catalogPart);
-    }
-    LOG.info("Skipped {} partitions of table {} in the incremental update",
-        numSkippedParts, getFullName());
-    return result;
-  }
-
   public TGetPartialCatalogObjectResponse getPartialInfo(
       TGetPartialCatalogObjectRequest req,
       Map<HdfsPartition, TPartialPartitionInfo> missingPartitionInfos)

Reply via email to