This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5835f2dfbe5d610d3c444da3d67406caf296c4f1 Author: stiga-huang <[email protected]> AuthorDate: Sat Feb 22 20:07:18 2025 +0800 IMPALA-13783: Fix huge temp list created in catalog-update thread In catalogd, the catalog-update thread collects new/deleted catalog objects since the last update. When collecting new partitions of a HdfsTable, a list of TCatalogObject is created. Each TCatalogObject has a THdfsPartition corresponding to a new partition. See HdfsTable#getNewPartitionsSinceLastUpdate(). This temp list could be huge and occupying a comparable memory footprint to the table metadata. E.g. for a HdfsTable that has 6M partitions and 6M files (one file per partition), the HdfsTable object itself takes 7.3GB in memory. This temp list takes 8.9GB. See more details in the JIRA description. This patch fixes the issue by iterating the partition map directly to avoid creating this temp list. For each partition, we collect its thrift representation and add it into the topic update directly. In local-catalog mode, the thrift representation of the partition can be simplified to just contain the names of db, table, partition and the partition ids. This is also optimized to reduce the topic update size in local-catalog mode. Change-Id: I41dfd48278cb87bdc6f251a13919466d15f8d52d Reviewed-on: http://gerrit.cloudera.org:8080/22538 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/catalog/CatalogObject.java | 10 ++++++---- .../impala/catalog/CatalogServiceCatalog.java | 21 +++++++++++++++++++-- .../org/apache/impala/catalog/HdfsPartition.java | 3 +++ .../java/org/apache/impala/catalog/HdfsTable.java | 22 ---------------------- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java index 23b970092..d550ec8c0 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java @@ -28,10 +28,12 @@ public interface CatalogObject extends HasName { * Catalog objects are often serialized to Thrift. When doing so, many of the objects * have a minimal "descriptor" form used in query execution as well as a more complete * "full" form with all information, used in catalog topic updates and DDL responses to - * coordinators. When sending incremental update for a hdfs table, its "descriptor" form - * is used with no partitions. Its incremental partition updates will follow it in the - * same topic update. "invalidation" form means only the name will be included. "none" - * form means return nothing, i.e. null. + * coordinators in the legacy catalog mode. When sending incremental update for a hdfs + * table, its "descriptor" form is used with no partitions. Its incremental partition + * updates will follow it in the same topic update. "invalidation" form means only + * collecting information that is enough for local-catalog mode coordinators to + * invalidate the stale cache item. Usually only names and ids will be included. + * "none" form means return nothing, i.e. null. */ enum ThriftObjectType { FULL, diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 9bbd0e915..a5f68f6ab 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -1795,10 +1795,27 @@ public class CatalogServiceCatalog extends Catalog { // Add updates for new partitions. long maxSentId = hdfsTable.getMaxSentPartitionId(); - for (TCatalogObject catalogPart : hdfsTable.getNewPartitionsSinceLastUpdate()) { - maxSentId = Math.max(maxSentId, catalogPart.getHdfs_partition().getId()); + int numSkippedParts = 0; + for (PrunablePartition p: hdfsTable.getPartitions()) { + HdfsPartition partition = (HdfsPartition) p; + if (partition.getId() <= hdfsTable.getMaxSentPartitionId()) { + numSkippedParts++; + continue; + } + maxSentId = Math.max(maxSentId, partition.getId()); + TCatalogObject catalogPart; + if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) { + catalogPart = new TCatalogObject( + TCatalogObjectType.HDFS_PARTITION, hdfsTable.getCatalogVersion()); + partition.setTCatalogObject(catalogPart); + } else { + Preconditions.checkState(topicMode_ == TopicMode.MINIMAL); + catalogPart = partition.toMinimalTCatalogObject(); + } ctx.addCatalogObject(catalogPart, false, updateSummary); } + LOG.info("Skipped {} partitions of table {} in the incremental update", + numSkippedParts, hdfsTable.getFullName()); hdfsTable.setMaxSentPartitionId(maxSentId); for (HdfsPartition part : hdfsTable.getDroppedPartitions()) { diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index d967d050b..3d337beec 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -792,6 +792,9 @@ public class HdfsPartition extends CatalogObjectImpl part.setTbl_name(table_.getName()); part.setPartition_name(partName_); part.setId(id_); + if (prevId_ != INITIAL_PARTITION_ID - 1) { + part.setPrev_id(prevId_); + } return part; } diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 141f38286..b68d21eb4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -2318,28 +2318,6 @@ public class HdfsTable extends Table implements FeFsTable { */ public void resetDroppedPartitions() { droppedPartitions_.clear(); } - /** - * Gets catalog objects of new partitions since last catalog update. They are partitions - * that coordinators are not aware of. - */ - public List<TCatalogObject> getNewPartitionsSinceLastUpdate() { - List<TCatalogObject> result = new ArrayList<>(); - int numSkippedParts = 0; - for (HdfsPartition partition: partitionMap_.values()) { - if (partition.getId() <= maxSentPartitionId_) { - numSkippedParts++; - continue; - } - TCatalogObject catalogPart = - new TCatalogObject(TCatalogObjectType.HDFS_PARTITION, getCatalogVersion()); - partition.setTCatalogObject(catalogPart); - result.add(catalogPart); - } - LOG.info("Skipped {} partitions of table {} in the incremental update", - numSkippedParts, getFullName()); - return result; - } - public TGetPartialCatalogObjectResponse getPartialInfo( TGetPartialCatalogObjectRequest req, Map<HdfsPartition, TPartialPartitionInfo> missingPartitionInfos)
