This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bb4122a9066 [opt](hive) Speed up Hive insert on partition tables using 
cache (#58166)
bb4122a9066 is described below

commit bb4122a9066684904ed540f528c0b363d0b78fd7
Author: zy-kkk <[email protected]>
AuthorDate: Mon Dec 1 09:56:40 2025 +0800

    [opt](hive) Speed up Hive insert on partition tables using cache (#58166)
    
    ### What problem does this PR solve?
    
    For Hive tables with massive partitions (10K+), INSERT operations are
    extremely slow because:
    - FE fetches all partition metadata from HMS directly (expensive RPC
    calls)
      - Full table cache invalidation after each insert (unnecessary)
    
    
    Problem Summary:
    
    1. **Use cache for partition metadata in INSERT**
    - FE now fetches partition info from cache instead of directly querying
    HMS when preparing INSERT
      - Avoid expensive HMS RPC calls for every INSERT operation
    
    2. **Selective cache refresh after commit**
      - Only invalidate affected partitions instead of full table cache
      - Based on partition update info from BE (NEW/APPEND/OVERWRITE)
      - Significantly reduces cache invalidation overhead
    
    3. **Handle cache inconsistency gracefully**
    - When BE marks partition as NEW but it already exists in HMS (cache
    miss)
    - FE detects this by checking HMS and treats it as APPEND instead of
    failing
      - Prevents `AlreadyExistsException` errors
    
    For tables with partitions:
      - **Before**: HMS calls per INSERT + full cache invalidation
      - **After**: cache lookup + selective partition refresh
      - Expected speedup: 10x-100x for partition metadata fetching phas
---
 .../org/apache/doris/catalog/RefreshManager.java   | 12 ++-
 .../doris/common/profile/SummaryProfile.java       | 17 ++++
 .../apache/doris/datasource/ExternalObjectLog.java | 10 +++
 .../doris/datasource/hive/HMSTransaction.java      | 98 +++++++++++++++++-----
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 57 +++++++++++++
 .../insert/BaseExternalTableInsertExecutor.java    | 23 +++--
 .../plans/commands/insert/HiveInsertExecutor.java  | 57 +++++++++++++
 .../org/apache/doris/planner/HiveTableSink.java    | 49 +++++++++--
 .../apache/doris/planner/HiveTableSinkTest.java    |  9 ++
 .../hive/test_hive_partitions.groovy               | 97 +++++++++++++++++++++
 10 files changed, 393 insertions(+), 36 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 494946fd1c0..67c87688994 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -183,7 +183,17 @@ public class RefreshManager {
             db.get().unregisterTable(log.getTableName());
             db.get().resetMetaCacheNames();
         } else {
-            refreshTableInternal(db.get(), table.get(), 
log.getLastUpdateTime());
+            List<String> partitionNames = log.getPartitionNames();
+            if (partitionNames != null && !partitionNames.isEmpty()) {
+                // Partition-level cache invalidation
+                Env.getCurrentEnv().getExtMetaCacheMgr()
+                        .invalidatePartitionsCache(table.get(), 
partitionNames);
+                LOG.info("replay refresh partitions for table {}, partitions 
count: {}",
+                        table.get().getName(), partitionNames.size());
+            } else {
+                // Full table cache invalidation
+                refreshTableInternal(db.get(), table.get(), 
log.getLastUpdateTime());
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 4e99ee14cf7..da86a3cef55 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -85,6 +85,7 @@ public class SummaryProfile {
     public static final String GET_PARTITIONS_TIME = "Get Partitions Time";
     public static final String GET_PARTITION_FILES_TIME = "Get Partition Files 
Time";
     public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range 
Time";
+    public static final String SINK_SET_PARTITION_VALUES_TIME = "Sink Set 
Partition Values Time";
     public static final String PLAN_TIME = "Plan Time";
     public static final String SCHEDULE_TIME = "Schedule Time";
     public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time";
@@ -161,6 +162,7 @@ public class SummaryProfile {
             GET_SPLITS_TIME,
             GET_PARTITIONS_TIME,
             GET_PARTITION_FILES_TIME,
+            SINK_SET_PARTITION_VALUES_TIME,
             CREATE_SCAN_RANGE_TIME,
             NEREIDS_DISTRIBUTE_TIME,
             GET_META_VERSION_TIME,
@@ -211,6 +213,7 @@ public class SummaryProfile {
             .put(NEREIDS_BE_FOLD_CONST_TIME, 2)
             .put(GET_PARTITIONS_TIME, 3)
             .put(GET_PARTITION_FILES_TIME, 3)
+            .put(SINK_SET_PARTITION_VALUES_TIME, 3)
             .put(CREATE_SCAN_RANGE_TIME, 2)
             .put(GET_PARTITION_VERSION_TIME, 1)
             .put(GET_PARTITION_VERSION_COUNT, 1)
@@ -281,6 +284,10 @@ public class SummaryProfile {
     private long getPartitionsFinishTime = -1;
     @SerializedName(value = "getPartitionFilesFinishTime")
     private long getPartitionFilesFinishTime = -1;
+    @SerializedName(value = "sinkSetPartitionValuesStartTime")
+    private long sinkSetPartitionValuesStartTime = -1;
+    @SerializedName(value = "sinkSetPartitionValuesFinishTime")
+    private long sinkSetPartitionValuesFinishTime = -1;
     @SerializedName(value = "getSplitsFinishTime")
     private long getSplitsFinishTime = -1;
     @SerializedName(value = "createScanRangeFinishTime")
@@ -477,6 +484,8 @@ public class SummaryProfile {
                 getPrettyTime(getPartitionsFinishTime, getSplitsStartTime, 
TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME,
                 getPrettyTime(getPartitionFilesFinishTime, 
getPartitionsFinishTime, TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(SINK_SET_PARTITION_VALUES_TIME,
+                getPrettyTime(sinkSetPartitionValuesFinishTime, 
sinkSetPartitionValuesStartTime, TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME,
                 getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, 
TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(SCHEDULE_TIME,
@@ -619,6 +628,14 @@ public class SummaryProfile {
         this.getPartitionsFinishTime = TimeUtils.getStartTimeMs();
     }
 
+    public void setSinkGetPartitionsStartTime() {
+        this.sinkSetPartitionValuesStartTime = TimeUtils.getStartTimeMs();
+    }
+
+    public void setSinkGetPartitionsFinishTime() {
+        this.sinkSetPartitionValuesFinishTime = TimeUtils.getStartTimeMs();
+    }
+
     public void setGetPartitionFilesFinishTime() {
         this.getPartitionFilesFinishTime = TimeUtils.getStartTimeMs();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
index cf7872f8b39..313f6fc403f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -80,6 +80,16 @@ public class ExternalObjectLog implements Writable {
         return externalObjectLog;
     }
 
+    public static ExternalObjectLog createForRefreshPartitions(long catalogId, 
String dbName, String tblName,
+            List<String> partitionNames) {
+        ExternalObjectLog externalObjectLog = new ExternalObjectLog();
+        externalObjectLog.setCatalogId(catalogId);
+        externalObjectLog.setDbName(dbName);
+        externalObjectLog.setTableName(tblName);
+        externalObjectLog.setPartitionNames(partitionNames);
+        return externalObjectLog;
+    }
+
     public static ExternalObjectLog createForRenameTable(long catalogId, 
String dbName, String tblName,
             String newTblName) {
         ExternalObjectLog externalObjectLog = new ExternalObjectLog();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 7141bdd9874..54ee9f46ec5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -47,6 +47,7 @@ import org.apache.doris.transaction.Transaction;
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -268,30 +269,48 @@ public class HMSTransaction implements Transaction {
                         insertExistsPartitions.add(Pair.of(pu, 
hivePartitionStatistics));
                         break;
                     case NEW:
+                        // Check if partition really exists in HMS (may be 
cache miss in Doris)
+                        String partitionName = pu.getName();
+                        if (Strings.isNullOrEmpty(partitionName)) {
+                            // This should not happen for partitioned tables
+                            LOG.warn("Partition name is null/empty for NEW 
mode in partitioned table, skipping");
+                            break;
+                        }
+                        List<String> partitionValues = 
HiveUtil.toPartitionValues(partitionName);
+                        boolean existsInHMS = false;
+                        try {
+                            Partition hmsPartition = 
hiveOps.getClient().getPartition(
+                                    nameMapping.getRemoteDbName(),
+                                    nameMapping.getRemoteTblName(),
+                                    partitionValues);
+                            existsInHMS = (hmsPartition != null);
+                        } catch (Exception e) {
+                            // Partition not found in HMS, treat as truly new
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Partition {} not found in HMS, will 
create it", pu.getName());
+                            }
+                        }
+
+                        if (existsInHMS) {
+                            // Partition exists in HMS but not in Doris cache
+                            // Treat as APPEND instead of NEW to avoid 
creation error
+                            LOG.info("Partition {} already exists in HMS 
(Doris cache miss), treating as APPEND",
+                                    pu.getName());
+                            insertExistsPartitions.add(Pair.of(pu, 
hivePartitionStatistics));
+                        } else {
+                            // Truly new partition, create it
+                            createAndAddPartition(nameMapping, table, 
partitionValues, writePath,
+                                    pu, hivePartitionStatistics, false);
+                        }
+                        break;
                     case OVERWRITE:
-                        StorageDescriptor sd = table.getSd();
-                        // For object storage (FILE_S3), use writePath to keep 
original scheme (oss://, cos://)
-                        // For HDFS, use targetPath which is the final path 
after rename
-                        String pathForHMS = this.fileType == TFileType.FILE_S3
-                                ? writePath
-                                : pu.getLocation().getTargetPath();
-                        HivePartition hivePartition = new HivePartition(
-                                nameMapping,
-                                false,
-                                sd.getInputFormat(),
-                                pathForHMS,
-                                HiveUtil.toPartitionValues(pu.getName()),
-                                Maps.newHashMap(),
-                                sd.getOutputFormat(),
-                                sd.getSerdeInfo().getSerializationLib(),
-                                sd.getCols()
-                        );
-                        if (updateMode == TUpdateMode.OVERWRITE) {
-                            dropPartition(nameMapping, 
hivePartition.getPartitionValues(), true);
+                        String overwritePartitionName = pu.getName();
+                        if (Strings.isNullOrEmpty(overwritePartitionName)) {
+                            LOG.warn("Partition name is null/empty for 
OVERWRITE mode in partitioned table, skipping");
+                            break;
                         }
-                        addPartition(
-                                nameMapping, hivePartition, writePath,
-                                pu.getName(), pu.getFileNames(), 
hivePartitionStatistics, pu);
+                        createAndAddPartition(nameMapping, table, 
HiveUtil.toPartitionValues(overwritePartitionName),
+                                writePath, pu, hivePartitionStatistics, true);
                         break;
                     default:
                         throw new RuntimeException("Not support mode:[" + 
updateMode + "] in partitioned table");
@@ -377,6 +396,10 @@ public class HMSTransaction implements Transaction {
         return 
hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
     }
 
+    public List<THivePartitionUpdate> getHivePartitionUpdates() {
+        return hivePartitionUpdates;
+    }
+
     private void convertToInsertExistingPartitionAction(
             NameMapping nameMapping,
             List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
partitions) {
@@ -1029,6 +1052,37 @@ public class HMSTransaction implements Transaction {
         }
     }
 
+    private void createAndAddPartition(
+            NameMapping nameMapping,
+            Table table,
+            List<String> partitionValues,
+            String writePath,
+            THivePartitionUpdate pu,
+            HivePartitionStatistics hivePartitionStatistics,
+            boolean dropFirst) {
+        StorageDescriptor sd = table.getSd();
+        String pathForHMS = this.fileType == TFileType.FILE_S3
+                ? writePath
+                : pu.getLocation().getTargetPath();
+        HivePartition hivePartition = new HivePartition(
+                nameMapping,
+                false,
+                sd.getInputFormat(),
+                pathForHMS,
+                partitionValues,
+                Maps.newHashMap(),
+                sd.getOutputFormat(),
+                sd.getSerdeInfo().getSerializationLib(),
+                sd.getCols()
+        );
+        if (dropFirst) {
+            dropPartition(nameMapping, hivePartition.getPartitionValues(), 
true);
+        }
+        addPartition(
+                nameMapping, hivePartition, writePath,
+                pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu);
+    }
+
     public synchronized void addPartition(
             NameMapping nameMapping,
             HivePartition partition,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 4fbda7c19f1..82504111e34 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -60,6 +60,7 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Iterables;
@@ -85,6 +86,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -564,6 +566,61 @@ public class HiveMetaStoreCache {
         }
     }
 
+    /**
+     * Selectively refreshes cache for affected partitions based on update 
information from BE.
+     * For APPEND/OVERWRITE: invalidate both partition cache and file cache 
using existing method.
+     * For NEW: add to partition values cache.
+     *
+     * @param table The Hive table whose partitions were modified
+     * @param partitionUpdates List of partition updates from BE
+     */
+    public void refreshAffectedPartitions(HMSExternalTable table,
+            List<org.apache.doris.thrift.THivePartitionUpdate> 
partitionUpdates) {
+        if (partitionUpdates == null || partitionUpdates.isEmpty()) {
+            return;
+        }
+
+        List<String> modifiedPartitionNames = new ArrayList<>();
+        List<String> newPartitionNames = new ArrayList<>();
+
+        for (org.apache.doris.thrift.THivePartitionUpdate update : 
partitionUpdates) {
+            String partitionName = update.getName();
+            // Skip if partition name is null/empty (non-partitioned table 
case)
+            if (Strings.isNullOrEmpty(partitionName)) {
+                continue;
+            }
+
+            switch (update.getUpdateMode()) {
+                case APPEND:
+                case OVERWRITE:
+                    modifiedPartitionNames.add(partitionName);
+                    break;
+                case NEW:
+                    newPartitionNames.add(partitionName);
+                    break;
+                default:
+                    LOG.warn("Unknown update mode {} for partition {}",
+                            update.getUpdateMode(), partitionName);
+                    break;
+            }
+        }
+
+        // Invalidate cache for modified partitions (both partition cache and 
file cache)
+        for (String partitionName : modifiedPartitionNames) {
+            invalidatePartitionCache(table, partitionName);
+        }
+
+        // Add new partitions to partition values cache
+        if (!newPartitionNames.isEmpty()) {
+            addPartitionsCache(table.getOrBuildNameMapping(), 
newPartitionNames,
+                    table.getPartitionColumnTypes(Optional.empty()));
+        }
+
+        // Log summary
+        LOG.info("Refreshed cache for table {}: {} modified partitions, {} new 
partitions",
+                table.getName(), modifiedPartitionNames.size(), 
newPartitionNames.size());
+    }
+
     public void invalidateDbCache(String dbName) {
         long start = System.currentTimeMillis();
         Set<PartitionValueCacheKey> keys = 
partitionValuesCache.asMap().keySet();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
index b1f2856b767..1caef21851e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.SummaryProfile;
@@ -112,14 +113,26 @@ public abstract class BaseExternalTableInsertExecutor 
extends AbstractInsertExec
             }
             summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
             txnStatus = TransactionStatus.COMMITTED;
-            Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
-                    catalogName,
-                    table.getDatabase().getFullName(),
-                    table.getName(),
-                    true);
+
+            // Handle post-commit operations (e.g., cache refresh)
+            doAfterCommit();
         }
     }
 
+    /**
+     * Called after transaction commit.
+     * Subclasses can override this to customize post-commit behavior.
+     * Default: full table refresh.
+     */
+    protected void doAfterCommit() throws DdlException {
+        // Default: full table refresh
+        Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
+                catalogName,
+                table.getDatabase().getFullName(),
+                table.getName(),
+                true);
+    }
+
     @Override
     protected void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink) {
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index 2a423b8a26b..fed495dbb35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -17,13 +17,19 @@
 
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HMSTransaction;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionType;
 
@@ -31,6 +37,8 @@ import com.google.common.base.Preconditions;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -39,6 +47,8 @@ import java.util.Optional;
 public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
     private static final Logger LOG = 
LogManager.getLogger(HiveInsertExecutor.class);
 
+    private List<THivePartitionUpdate> partitionUpdates;
+
     /**
      * constructor
      */
@@ -65,6 +75,53 @@ public class HiveInsertExecutor extends 
BaseExternalTableInsertExecutor {
         HMSTransaction transaction = (HMSTransaction) 
transactionManager.getTransaction(txnId);
         loadedRows = transaction.getUpdateCnt();
         transaction.finishInsertTable(((ExternalTable) 
table).getOrBuildNameMapping());
+
+        // Save partition updates for cache refresh after commit
+        partitionUpdates = transaction.getHivePartitionUpdates();
+    }
+
+    @Override
+    protected void doAfterCommit() throws DdlException {
+        HMSExternalTable hmsTable = (HMSExternalTable) table;
+
+        // For partitioned tables, do selective partition refresh
+        // For non-partitioned tables, do full table cache invalidation
+        List<String> affectedPartitionNames = null;
+        if (hmsTable.isPartitionedTable() && partitionUpdates != null && 
!partitionUpdates.isEmpty()) {
+            HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                    .getMetaStoreCache((HMSExternalCatalog) 
hmsTable.getCatalog());
+            cache.refreshAffectedPartitions(hmsTable, partitionUpdates);
+
+            // Collect partition names for edit log
+            affectedPartitionNames = new ArrayList<>();
+            for (THivePartitionUpdate update : partitionUpdates) {
+                String partitionName = update.getName();
+                if (partitionName != null && !partitionName.isEmpty()) {
+                    affectedPartitionNames.add(partitionName);
+                }
+            }
+        } else {
+            // Non-partitioned table or no partition updates, do full table 
refresh
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable);
+        }
+
+        // Write edit log to notify other FEs
+        ExternalObjectLog log;
+        if (affectedPartitionNames != null && 
!affectedPartitionNames.isEmpty()) {
+            // Partition-level refresh for other FEs
+            log = ExternalObjectLog.createForRefreshPartitions(
+                    hmsTable.getCatalog().getId(),
+                    table.getDatabase().getFullName(),
+                    table.getName(),
+                    affectedPartitionNames);
+        } else {
+            // Full table refresh for other FEs
+            log = ExternalObjectLog.createForRefreshTable(
+                    hmsTable.getCatalog().getId(),
+                    table.getDatabase().getFullName(),
+                    table.getName());
+        }
+        Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index b9ad7ced616..54dae1df135 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -21,12 +21,16 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
+import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.datasource.hive.HiveProperties;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
 import org.apache.doris.qe.ConnectContext;
@@ -44,10 +48,12 @@ import org.apache.doris.thrift.THiveSerDeProperties;
 import org.apache.doris.thrift.THiveTableSink;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -191,18 +197,40 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
     }
 
     private void setPartitionValues(THiveTableSink tSink) throws 
AnalysisException {
+        if (ConnectContext.get().getExecutor() != null) {
+            
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsStartTime();
+        }
+
         List<THivePartition> partitions = new ArrayList<>();
-        List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions =
-                ((HMSExternalCatalog) targetTable.getCatalog())
-                        
.getClient().listPartitions(targetTable.getRemoteDbName(), 
targetTable.getRemoteName());
-        for (org.apache.hadoop.hive.metastore.api.Partition partition : 
hivePartitions) {
+
+        List<HivePartition> hivePartitions;
+        if (targetTable.isPartitionedTable()) {
+            // Get partitions from cache instead of HMS client (similar to 
HiveScanNode)
+            HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                    .getMetaStoreCache((HMSExternalCatalog) 
targetTable.getCatalog());
+            HiveMetaStoreCache.HivePartitionValues partitionValues =
+                    
targetTable.getHivePartitionValues(MvccUtil.getSnapshotFromContext(targetTable));
+            List<List<String>> partitionValuesList =
+                    new 
ArrayList<>(partitionValues.getPartitionValuesMap().values());
+            hivePartitions = cache.getAllPartitionsWithCache(targetTable, 
partitionValuesList);
+        } else {
+            // Non-partitioned table, create dummy partition
+            hivePartitions = Lists.newArrayList();
+            StorageDescriptor sd = targetTable.getRemoteTable().getSd();
+            HivePartition dummyPartition = new 
HivePartition(targetTable.getOrBuildNameMapping(), true,
+                    sd.getInputFormat(), sd.getLocation(), 
Lists.newArrayList(),
+                    sd.getParameters() != null ? sd.getParameters() : new 
HashMap<>());
+            hivePartitions.add(dummyPartition);
+        }
+
+        // Convert HivePartition to THivePartition (same logic as before)
+        for (HivePartition partition : hivePartitions) {
             THivePartition hivePartition = new THivePartition();
-            StorageDescriptor sd = partition.getSd();
-            
hivePartition.setFileFormat(getTFileFormatType(sd.getInputFormat()));
+            
hivePartition.setFileFormat(getTFileFormatType(partition.getInputFormat()));
+            hivePartition.setValues(partition.getPartitionValues());
 
-            hivePartition.setValues(partition.getValues());
             THiveLocationParams locationParams = new THiveLocationParams();
-            String location = sd.getLocation();
+            String location = partition.getPath();
             // pass the same of write path and target path to partition
             locationParams.setWritePath(location);
             locationParams.setTargetPath(location);
@@ -210,7 +238,12 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
             hivePartition.setLocation(locationParams);
             partitions.add(hivePartition);
         }
+
         tSink.setPartitions(partitions);
+
+        if (ConnectContext.get().getExecutor() != null) {
+            
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsFinishTime();
+        }
     }
 
     private void setSerDeProperties(THiveTableSink tSink) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
index c8bc82e67a1..8f4adff727c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.hive.HMSExternalDatabase;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.ThriftHMSCachedClient;
 import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.qe.ConnectContext;
 
 import mockit.Mock;
 import mockit.MockUp;
@@ -48,10 +49,13 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+
 public class HiveTableSinkTest {
 
     @Test
     public void testBindDataSink() throws UserException {
+        ConnectContext ctx = new ConnectContext();
+        ctx.setThreadLocalInfo();
 
         new MockUp<ThriftHMSCachedClient>() {
             @Mock
@@ -123,6 +127,11 @@ public class HiveTableSinkTest {
 
     private void mockDifferLocationTable(String location) {
         new MockUp<HMSExternalTable>() {
+            @Mock
+            public boolean isPartitionedTable() {
+                return false;
+            }
+
             @Mock
             public Set<String> getPartitionColumnNames() {
                 return new HashSet<String>() {{
diff --git 
a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy 
b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
index cc3425106a5..b8b3a39a2f9 100644
--- a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
+++ b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
@@ -77,6 +77,7 @@ suite("test_hive_partitions", 
"p0,external,hive,external_docker,external_docker_
     }
 
     for (String hivePrefix : ["hive2", "hive3"]) {
+        setHivePrefix(hivePrefix)
         try {
             String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
             String catalog_name = "${hivePrefix}_test_partitions"
@@ -91,6 +92,102 @@ suite("test_hive_partitions", 
"p0,external,hive,external_docker,external_docker_
 
             q01()
 
+            // Test cache miss scenario: Hive adds partition, then Doris 
writes to it
+            def test_cache_miss = {
+                def dbName = "test_cache_miss_db"
+                def tblName = "test_cache_miss_table"
+
+                try {
+                    // Clean up
+                    hive_docker """DROP TABLE IF EXISTS ${dbName}.${tblName}"""
+                    hive_docker """DROP DATABASE IF EXISTS ${dbName}"""
+
+                    // Create database and partitioned table in Hive
+                    hive_docker """CREATE DATABASE ${dbName}"""
+                    hive_docker """
+                        CREATE TABLE ${dbName}.${tblName} (
+                            id INT,
+                            name STRING
+                        )
+                        PARTITIONED BY (pt INT)
+                        STORED AS ORC
+                    """
+
+                    // Hive writes 3 partitions
+                    hive_docker """
+                        INSERT INTO ${dbName}.${tblName} PARTITION(pt=1)
+                        VALUES (1, 'hive_pt1')
+                    """
+                    hive_docker """
+                        INSERT INTO ${dbName}.${tblName} PARTITION(pt=2)
+                        VALUES (2, 'hive_pt2')
+                    """
+                    hive_docker """
+                        INSERT INTO ${dbName}.${tblName} PARTITION(pt=3)
+                        VALUES (3, 'hive_pt3')
+                    """
+
+                    sql """refresh catalog `${catalog_name}`"""      
+                    // Doris reads data to populate cache (only knows about 3 
partitions)
+                    def result1 = sql """SELECT COUNT(*) as cnt FROM 
`${catalog_name}`.`${dbName}`.`${tblName}`"""
+                    assertEquals(3, result1[0][0])
+                    logger.info("Doris cache populated with 3 partitions")
+
+                    // Hive writes 4th partition (Doris cache doesn't know 
about it)
+                    hive_docker """
+                        INSERT INTO ${dbName}.${tblName} PARTITION(pt=4)
+                        VALUES (4, 'hive_pt4')
+                    """
+                    logger.info("Hive added 4th partition (pt=4)")
+
+                    // Doris writes to the 4th partition
+                    // This should trigger cache miss detection and treat as 
APPEND instead of NEW
+                    sql """
+                        INSERT INTO `${catalog_name}`.`${dbName}`.`${tblName}`
+                        VALUES (40, 'doris_pt4', 4)
+                    """
+                    logger.info("Doris wrote to 4th partition (should handle 
cache miss)")
+
+                    // Verify: should have 5 rows total (3 from hive + 1 from 
hive pt4 + 1 from doris pt4)
+                    def result2 = sql """SELECT COUNT(*) as cnt FROM 
`${catalog_name}`.`${dbName}`.`${tblName}`"""
+                    assertEquals(5, result2[0][0])
+
+                    // Verify partition 4 has 2 rows
+                    def result3 = sql """
+                        SELECT COUNT(*) as cnt
+                        FROM `${catalog_name}`.`${dbName}`.`${tblName}`
+                        WHERE pt = 4
+                    """
+                    assertEquals(2, result3[0][0])
+
+                    // Verify data content
+                    def result4 = sql """
+                        SELECT id, name
+                        FROM `${catalog_name}`.`${dbName}`.`${tblName}`
+                        WHERE pt = 4
+                        ORDER BY id
+                    """
+                    assertEquals(2, result4.size())
+                    assertEquals(4, result4[0][0])
+                    assertEquals("hive_pt4", result4[0][1])
+                    assertEquals(40, result4[1][0])
+                    assertEquals("doris_pt4", result4[1][1])
+
+                    logger.info("Cache miss test passed!")
+
+                } finally {
+                    // Clean up
+                    try {
+                        hive_docker """DROP TABLE IF EXISTS 
${dbName}.${tblName}"""
+                        hive_docker """DROP DATABASE IF EXISTS ${dbName}"""
+                    } catch (Exception e) {
+                        logger.warn("Cleanup failed: ${e.message}")
+                    }
+                }
+            }
+
+            test_cache_miss()
+
             qt_string_partition_table_with_comma """
                 select * from 
partition_tables.string_partition_table_with_comma order by id;
             """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to