This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new a5ce97a159b branch-3.1: [opt](hive) Speed up Hive insert on partition 
tables using cache #58166 #58606 #58748 (#58886)
a5ce97a159b is described below

commit a5ce97a159b1347204f4a26214c419cf9b1f8207
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Dec 15 11:15:42 2025 +0800

    branch-3.1: [opt](hive) Speed up Hive insert on partition tables using 
cache #58166 #58606 #58748 (#58886)
    
    picked from #58166 #58606 #58748
    
    ---------
    
    Co-authored-by: zy-kkk <[email protected]>
    Co-authored-by: Calvin Kirs <[email protected]>
---
 .../org/apache/doris/catalog/RefreshManager.java   | 21 ++++-
 .../doris/common/profile/SummaryProfile.java       | 17 ++++
 .../apache/doris/datasource/ExternalObjectLog.java | 20 +++++
 .../doris/datasource/hive/HMSTransaction.java      | 98 +++++++++++++++++-----
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 63 ++++++++++++++
 .../insert/BaseExternalTableInsertExecutor.java    | 23 +++--
 .../plans/commands/insert/HiveInsertExecutor.java  | 50 +++++++++++
 .../org/apache/doris/planner/HiveTableSink.java    | 39 +++++++--
 .../apache/doris/planner/HiveTableSinkTest.java    |  9 ++
 .../hive/test_hive_partitions.groovy               | 97 +++++++++++++++++++++
 10 files changed, 401 insertions(+), 36 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 494946fd1c0..9738ee39355 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -26,7 +26,9 @@ import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalDatabase;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.persist.OperationType;
 
 import com.google.common.base.Strings;
@@ -183,7 +185,24 @@ public class RefreshManager {
             db.get().unregisterTable(log.getTableName());
             db.get().resetMetaCacheNames();
         } else {
-            refreshTableInternal(db.get(), table.get(), 
log.getLastUpdateTime());
+            List<String> modifiedPartNames = log.getPartitionNames();
+            List<String> newPartNames = log.getNewPartitionNames();
+            if (catalog instanceof HMSExternalCatalog
+                    && ((modifiedPartNames != null && 
!modifiedPartNames.isEmpty())
+                    || (newPartNames != null && !newPartNames.isEmpty()))) {
+                // Partition-level cache invalidation, only for hive catalog
+                HiveMetaStoreCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr()
+                        .getMetaStoreCache((HMSExternalCatalog) catalog);
+                cache.refreshAffectedPartitionsCache((HMSExternalTable) 
table.get(), modifiedPartNames, newPartNames);
+                LOG.info("replay refresh partitions for table {}, "
+                                + "modified partitions count: {}, "
+                                + "new partitions count: {}",
+                        table.get().getName(), modifiedPartNames == null ? 0 : 
modifiedPartNames.size(),
+                        newPartNames == null ? 0 : newPartNames.size());
+            } else {
+                // Full table cache invalidation
+                refreshTableInternal(db.get(), table.get(), 
log.getLastUpdateTime());
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 1410f961881..87516a19a77 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -83,6 +83,7 @@ public class SummaryProfile {
     public static final String GET_PARTITIONS_TIME = "Get Partitions Time";
     public static final String GET_PARTITION_FILES_TIME = "Get Partition Files 
Time";
     public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range 
Time";
+    public static final String SINK_SET_PARTITION_VALUES_TIME = "Sink Set 
Partition Values Time";
     public static final String PLAN_TIME = "Plan Time";
     public static final String SCHEDULE_TIME = "Schedule Time";
     public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time";
@@ -158,6 +159,7 @@ public class SummaryProfile {
             GET_SPLITS_TIME,
             GET_PARTITIONS_TIME,
             GET_PARTITION_FILES_TIME,
+            SINK_SET_PARTITION_VALUES_TIME,
             CREATE_SCAN_RANGE_TIME,
             DISTRIBUTE_TIME,
             GET_META_VERSION_TIME,
@@ -208,6 +210,7 @@ public class SummaryProfile {
             .put(NEREIDS_BE_FOLD_CONST_TIME, 2)
             .put(GET_PARTITIONS_TIME, 3)
             .put(GET_PARTITION_FILES_TIME, 3)
+            .put(SINK_SET_PARTITION_VALUES_TIME, 3)
             .put(CREATE_SCAN_RANGE_TIME, 2)
             .put(GET_PARTITION_VERSION_TIME, 1)
             .put(GET_PARTITION_VERSION_COUNT, 1)
@@ -277,6 +280,10 @@ public class SummaryProfile {
     private long getPartitionsFinishTime = -1;
     @SerializedName(value = "getPartitionFilesFinishTime")
     private long getPartitionFilesFinishTime = -1;
+    @SerializedName(value = "sinkSetPartitionValuesStartTime")
+    private long sinkSetPartitionValuesStartTime = -1;
+    @SerializedName(value = "sinkSetPartitionValuesFinishTime")
+    private long sinkSetPartitionValuesFinishTime = -1;
     @SerializedName(value = "getSplitsFinishTime")
     private long getSplitsFinishTime = -1;
     @SerializedName(value = "createScanRangeFinishTime")
@@ -463,6 +470,8 @@ public class SummaryProfile {
                 getPrettyTime(getPartitionsFinishTime, getSplitsStartTime, 
TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME,
                 getPrettyTime(getPartitionFilesFinishTime, 
getPartitionsFinishTime, TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(SINK_SET_PARTITION_VALUES_TIME,
+                getPrettyTime(sinkSetPartitionValuesFinishTime, 
sinkSetPartitionValuesStartTime, TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME,
                 getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, 
TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(SCHEDULE_TIME,
@@ -601,6 +610,14 @@ public class SummaryProfile {
         this.getPartitionsFinishTime = TimeUtils.getStartTimeMs();
     }
 
+    public void setSinkGetPartitionsStartTime() {
+        this.sinkSetPartitionValuesStartTime = TimeUtils.getStartTimeMs();
+    }
+
+    public void setSinkGetPartitionsFinishTime() {
+        this.sinkSetPartitionValuesFinishTime = TimeUtils.getStartTimeMs();
+    }
+
     public void setGetPartitionFilesFinishTime() {
         this.getPartitionFilesFinishTime = TimeUtils.getStartTimeMs();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
index cf7872f8b39..43a0675d783 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -58,6 +58,9 @@ public class ExternalObjectLog implements Writable {
     @SerializedName(value = "partitionNames")
     private List<String> partitionNames;
 
+    @SerializedName(value = "newPartitionNames")
+    private List<String> newPartitionNames;
+
     @SerializedName(value = "lastUpdateTime")
     private long lastUpdateTime;
 
@@ -80,6 +83,17 @@ public class ExternalObjectLog implements Writable {
         return externalObjectLog;
     }
 
+    public static ExternalObjectLog createForRefreshPartitions(long catalogId, 
String dbName, String tblName,
+            List<String> modifiedPartNames, List<String> newPartNames) {
+        ExternalObjectLog externalObjectLog = new ExternalObjectLog();
+        externalObjectLog.setCatalogId(catalogId);
+        externalObjectLog.setDbName(dbName);
+        externalObjectLog.setTableName(tblName);
+        externalObjectLog.setPartitionNames(modifiedPartNames);
+        externalObjectLog.setNewPartitionNames(newPartNames);
+        return externalObjectLog;
+    }
+
     public static ExternalObjectLog createForRenameTable(long catalogId, 
String dbName, String tblName,
             String newTblName) {
         ExternalObjectLog externalObjectLog = new ExternalObjectLog();
@@ -124,6 +138,12 @@ public class ExternalObjectLog implements Writable {
         } else {
             sb.append("tableId: " + tableId + "]");
         }
+        if (partitionNames != null && !partitionNames.isEmpty()) {
+            sb.append(", partitionNames: " + partitionNames);
+        }
+        if (newPartitionNames != null && !newPartitionNames.isEmpty()) {
+            sb.append(", newPartitionNames: " + newPartitionNames);
+        }
         return sb.toString();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 2864f146f03..182e214b00c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -47,6 +47,7 @@ import org.apache.doris.transaction.Transaction;
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -268,30 +269,48 @@ public class HMSTransaction implements Transaction {
                         insertExistsPartitions.add(Pair.of(pu, 
hivePartitionStatistics));
                         break;
                     case NEW:
+                        // Check if partition really exists in HMS (may be 
cache miss in Doris)
+                        String partitionName = pu.getName();
+                        if (Strings.isNullOrEmpty(partitionName)) {
+                            // This should not happen for partitioned tables
+                            LOG.warn("Partition name is null/empty for NEW 
mode in partitioned table, skipping");
+                            break;
+                        }
+                        List<String> partitionValues = 
HiveUtil.toPartitionValues(partitionName);
+                        boolean existsInHMS = false;
+                        try {
+                            Partition hmsPartition = 
hiveOps.getClient().getPartition(
+                                    nameMapping.getRemoteDbName(),
+                                    nameMapping.getRemoteTblName(),
+                                    partitionValues);
+                            existsInHMS = (hmsPartition != null);
+                        } catch (Exception e) {
+                            // Partition not found in HMS, treat as truly new
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Partition {} not found in HMS, will 
create it", pu.getName());
+                            }
+                        }
+
+                        if (existsInHMS) {
+                            // Partition exists in HMS but not in Doris cache
+                            // Treat as APPEND instead of NEW to avoid 
creation error
+                            LOG.info("Partition {} already exists in HMS 
(Doris cache miss), treating as APPEND",
+                                    pu.getName());
+                            insertExistsPartitions.add(Pair.of(pu, 
hivePartitionStatistics));
+                        } else {
+                            // Truly new partition, create it
+                            createAndAddPartition(nameMapping, table, 
partitionValues, writePath,
+                                    pu, hivePartitionStatistics, false);
+                        }
+                        break;
                     case OVERWRITE:
-                        StorageDescriptor sd = table.getSd();
-                        // For object storage (FILE_S3), use writePath to keep 
original scheme (oss://, cos://)
-                        // For HDFS, use targetPath which is the final path 
after rename
-                        String pathForHMS = this.fileType == TFileType.FILE_S3
-                                ? writePath
-                                : pu.getLocation().getTargetPath();
-                        HivePartition hivePartition = new HivePartition(
-                                nameMapping,
-                                false,
-                                sd.getInputFormat(),
-                                pathForHMS,
-                                HiveUtil.toPartitionValues(pu.getName()),
-                                Maps.newHashMap(),
-                                sd.getOutputFormat(),
-                                sd.getSerdeInfo().getSerializationLib(),
-                                sd.getCols()
-                        );
-                        if (updateMode == TUpdateMode.OVERWRITE) {
-                            dropPartition(nameMapping, 
hivePartition.getPartitionValues(), true);
+                        String overwritePartitionName = pu.getName();
+                        if (Strings.isNullOrEmpty(overwritePartitionName)) {
+                            LOG.warn("Partition name is null/empty for 
OVERWRITE mode in partitioned table, skipping");
+                            break;
                         }
-                        addPartition(
-                                nameMapping, hivePartition, writePath,
-                                pu.getName(), pu.getFileNames(), 
hivePartitionStatistics, pu);
+                        createAndAddPartition(nameMapping, table, 
HiveUtil.toPartitionValues(overwritePartitionName),
+                                writePath, pu, hivePartitionStatistics, true);
                         break;
                     default:
                         throw new RuntimeException("Not support mode:[" + 
updateMode + "] in partitioned table");
@@ -377,6 +396,10 @@ public class HMSTransaction implements Transaction {
         return 
hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
     }
 
+    public List<THivePartitionUpdate> getHivePartitionUpdates() {
+        return hivePartitionUpdates;
+    }
+
     private void convertToInsertExistingPartitionAction(
             NameMapping nameMapping,
             List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
partitions) {
@@ -1028,6 +1051,37 @@ public class HMSTransaction implements Transaction {
         }
     }
 
+    private void createAndAddPartition(
+            NameMapping nameMapping,
+            Table table,
+            List<String> partitionValues,
+            String writePath,
+            THivePartitionUpdate pu,
+            HivePartitionStatistics hivePartitionStatistics,
+            boolean dropFirst) {
+        StorageDescriptor sd = table.getSd();
+        String pathForHMS = this.fileType == TFileType.FILE_S3
+                ? writePath
+                : pu.getLocation().getTargetPath();
+        HivePartition hivePartition = new HivePartition(
+                nameMapping,
+                false,
+                sd.getInputFormat(),
+                pathForHMS,
+                partitionValues,
+                Maps.newHashMap(),
+                sd.getOutputFormat(),
+                sd.getSerdeInfo().getSerializationLib(),
+                sd.getCols()
+        );
+        if (dropFirst) {
+            dropPartition(nameMapping, hivePartition.getPartitionValues(), 
true);
+        }
+        addPartition(
+                nameMapping, hivePartition, writePath,
+                pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu);
+    }
+
     public synchronized void addPartition(
             NameMapping nameMapping,
             HivePartition partition,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 883be276f6f..c0dbf68495e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -61,6 +61,7 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Iterables;
@@ -90,6 +91,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -580,6 +582,67 @@ public class HiveMetaStoreCache {
         }
     }
 
+    /**
+     * Selectively refreshes cache for affected partitions based on update 
information from BE.
+     * For APPEND/OVERWRITE: invalidate both partition cache and file cache 
using existing method.
+     * For NEW: add to partition values cache.
+     *
+     * @param table The Hive table whose partitions were modified
+     * @param partitionUpdates List of partition updates from BE
+     * @param modifiedPartNames Output list to collect names of modified 
partitions
+     * @param newPartNames Output list to collect names of new partitions
+     */
+    public void refreshAffectedPartitions(HMSExternalTable table,
+            List<org.apache.doris.thrift.THivePartitionUpdate> 
partitionUpdates,
+            List<String> modifiedPartNames, List<String> newPartNames) {
+        if (partitionUpdates == null || partitionUpdates.isEmpty()) {
+            return;
+        }
+
+        for (org.apache.doris.thrift.THivePartitionUpdate update : 
partitionUpdates) {
+            String partitionName = update.getName();
+            // Skip if partition name is null/empty (non-partitioned table 
case)
+            if (Strings.isNullOrEmpty(partitionName)) {
+                continue;
+            }
+
+            switch (update.getUpdateMode()) {
+                case APPEND:
+                case OVERWRITE:
+                    modifiedPartNames.add(partitionName);
+                    break;
+                case NEW:
+                    newPartNames.add(partitionName);
+                    break;
+                default:
+                    LOG.warn("Unknown update mode {} for partition {}",
+                            update.getUpdateMode(), partitionName);
+                    break;
+            }
+        }
+
+        refreshAffectedPartitionsCache(table, modifiedPartNames, newPartNames);
+    }
+
+    public void refreshAffectedPartitionsCache(HMSExternalTable table,
+            List<String> modifiedPartNames, List<String> newPartNames) {
+
+        // Invalidate cache for modified partitions (both partition cache and 
file cache)
+        for (String partitionName : modifiedPartNames) {
+            invalidatePartitionCache(table, partitionName);
+        }
+
+        // Add new partitions to partition values cache
+        if (!newPartNames.isEmpty()) {
+            addPartitionsCache(table.getOrBuildNameMapping(), newPartNames,
+                    table.getPartitionColumnTypes(Optional.empty()));
+        }
+
+        // Log summary
+        LOG.info("Refreshed cache for table {}: {} modified partitions, {} new 
partitions",
+                table.getName(), modifiedPartNames.size(), 
newPartNames.size());
+    }
+
     public void invalidateDbCache(String dbName) {
         long start = System.currentTimeMillis();
         Set<PartitionValueCacheKey> keys = 
partitionValuesCache.asMap().keySet();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
index 93693233e11..f82fefd89f2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.SummaryProfile;
@@ -110,14 +111,26 @@ public abstract class BaseExternalTableInsertExecutor 
extends AbstractInsertExec
             }
             summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
             txnStatus = TransactionStatus.COMMITTED;
-            Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
-                    catalogName,
-                    table.getDatabase().getFullName(),
-                    table.getName(),
-                    true);
+
+            // Handle post-commit operations (e.g., cache refresh)
+            doAfterCommit();
         }
     }
 
+    /**
+     * Called after transaction commit.
+     * Subclasses can override this to customize post-commit behavior.
+     * Default: full table refresh.
+     */
+    protected void doAfterCommit() throws DdlException {
+        // Default: full table refresh
+        Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
+                catalogName,
+                table.getDatabase().getFullName(),
+                table.getName(),
+                true);
+    }
+
     @Override
     protected void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink) {
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index 64f68454d84..f9345a4495c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -17,20 +17,28 @@
 
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HMSTransaction;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionType;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -39,6 +47,8 @@ import java.util.Optional;
 public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
     private static final Logger LOG = 
LogManager.getLogger(HiveInsertExecutor.class);
 
+    private List<THivePartitionUpdate> partitionUpdates;
+
     /**
      * constructor
      */
@@ -65,6 +75,46 @@ public class HiveInsertExecutor extends 
BaseExternalTableInsertExecutor {
         HMSTransaction transaction = (HMSTransaction) 
transactionManager.getTransaction(txnId);
         loadedRows = transaction.getUpdateCnt();
         transaction.finishInsertTable(((ExternalTable) 
table).getOrBuildNameMapping());
+
+        // Save partition updates for cache refresh after commit
+        partitionUpdates = transaction.getHivePartitionUpdates();
+    }
+
+    @Override
+    protected void doAfterCommit() throws DdlException {
+        HMSExternalTable hmsTable = (HMSExternalTable) table;
+
+        // For partitioned tables, do selective partition refresh
+        // For non-partitioned tables, do full table cache invalidation
+        List<String> modifiedPartNames = Lists.newArrayList();
+        List<String> newPartNames = Lists.newArrayList();
+        if (hmsTable.isPartitionedTable() && partitionUpdates != null && 
!partitionUpdates.isEmpty()) {
+            HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                    .getMetaStoreCache((HMSExternalCatalog) 
hmsTable.getCatalog());
+            cache.refreshAffectedPartitions(hmsTable, partitionUpdates, 
modifiedPartNames, newPartNames);
+        } else {
+            // Non-partitioned table or no partition updates, do full table 
refresh
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable);
+        }
+
+        // Write edit log to notify other FEs
+        ExternalObjectLog log;
+        if (!modifiedPartNames.isEmpty() || !newPartNames.isEmpty()) {
+            // Partition-level refresh for other FEs
+            log = ExternalObjectLog.createForRefreshPartitions(
+                    hmsTable.getCatalog().getId(),
+                    table.getDatabase().getFullName(),
+                    table.getName(),
+                    modifiedPartNames,
+                    newPartNames);
+        } else {
+            // Full table refresh for other FEs
+            log = ExternalObjectLog.createForRefreshTable(
+                    hmsTable.getCatalog().getId(),
+                    table.getDatabase().getFullName(),
+                    table.getName());
+        }
+        Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index 68a0edc430f..2ae4ae8e397 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -21,12 +21,16 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
+import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.datasource.hive.HiveProperties;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
 import org.apache.doris.qe.ConnectContext;
@@ -191,18 +195,32 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
     }
 
     private void setPartitionValues(THiveTableSink tSink) throws 
AnalysisException {
+        if (ConnectContext.get().getExecutor() != null) {
+            
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsStartTime();
+        }
+
         List<THivePartition> partitions = new ArrayList<>();
-        List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions =
-                ((HMSExternalCatalog) targetTable.getCatalog())
-                        
.getClient().listPartitions(targetTable.getRemoteDbName(), 
targetTable.getRemoteName());
-        for (org.apache.hadoop.hive.metastore.api.Partition partition : 
hivePartitions) {
+
+        List<HivePartition> hivePartitions = new ArrayList<>();
+        if (targetTable.isPartitionedTable()) {
+            // Get partitions from cache instead of HMS client (similar to 
HiveScanNode)
+            HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                    .getMetaStoreCache((HMSExternalCatalog) 
targetTable.getCatalog());
+            HiveMetaStoreCache.HivePartitionValues partitionValues =
+                    
targetTable.getHivePartitionValues(MvccUtil.getSnapshotFromContext(targetTable));
+            List<List<String>> partitionValuesList =
+                    new 
ArrayList<>(partitionValues.getPartitionValuesMap().values());
+            hivePartitions = cache.getAllPartitionsWithCache(targetTable, 
partitionValuesList);
+        }
+
+        // Convert HivePartition to THivePartition (same logic as before)
+        for (HivePartition partition : hivePartitions) {
             THivePartition hivePartition = new THivePartition();
-            StorageDescriptor sd = partition.getSd();
-            
hivePartition.setFileFormat(getTFileFormatType(sd.getInputFormat()));
+            
hivePartition.setFileFormat(getTFileFormatType(partition.getInputFormat()));
+            hivePartition.setValues(partition.getPartitionValues());
 
-            hivePartition.setValues(partition.getValues());
             THiveLocationParams locationParams = new THiveLocationParams();
-            String location = sd.getLocation();
+            String location = partition.getPath();
             // pass the same of write path and target path to partition
             locationParams.setWritePath(location);
             locationParams.setTargetPath(location);
@@ -210,7 +228,12 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
             hivePartition.setLocation(locationParams);
             partitions.add(hivePartition);
         }
+
         tSink.setPartitions(partitions);
+
+        if (ConnectContext.get().getExecutor() != null) {
+            
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsFinishTime();
+        }
     }
 
     private void setSerDeProperties(THiveTableSink tSink) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
index c8bc82e67a1..8f4adff727c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.hive.HMSExternalDatabase;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.ThriftHMSCachedClient;
 import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.qe.ConnectContext;
 
 import mockit.Mock;
 import mockit.MockUp;
@@ -48,10 +49,13 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+
 public class HiveTableSinkTest {
 
     @Test
     public void testBindDataSink() throws UserException {
+        ConnectContext ctx = new ConnectContext();
+        ctx.setThreadLocalInfo();
 
         new MockUp<ThriftHMSCachedClient>() {
             @Mock
@@ -123,6 +127,11 @@ public class HiveTableSinkTest {
 
     private void mockDifferLocationTable(String location) {
         new MockUp<HMSExternalTable>() {
+            @Mock
+            public boolean isPartitionedTable() {
+                return false;
+            }
+
             @Mock
             public Set<String> getPartitionColumnNames() {
                 return new HashSet<String>() {{
diff --git 
a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy 
b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
index cc3425106a5..b8b3a39a2f9 100644
--- a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
+++ b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
@@ -77,6 +77,7 @@ suite("test_hive_partitions", 
"p0,external,hive,external_docker,external_docker_
     }
 
     for (String hivePrefix : ["hive2", "hive3"]) {
+        setHivePrefix(hivePrefix)
         try {
             String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
             String catalog_name = "${hivePrefix}_test_partitions"
@@ -91,6 +92,102 @@ suite("test_hive_partitions", 
"p0,external,hive,external_docker,external_docker_
 
             q01()
 
+            // Test cache miss scenario: Hive adds partition, then Doris 
writes to it
+            def test_cache_miss = {
+                def dbName = "test_cache_miss_db"
+                def tblName = "test_cache_miss_table"
+
+                try {
+                    // Clean up
+                    hive_docker """DROP TABLE IF EXISTS ${dbName}.${tblName}"""
+                    hive_docker """DROP DATABASE IF EXISTS ${dbName}"""
+
+                    // Create database and partitioned table in Hive
+                    hive_docker """CREATE DATABASE ${dbName}"""
+                    hive_docker """
+                        CREATE TABLE ${dbName}.${tblName} (
+                            id INT,
+                            name STRING
+                        )
+                        PARTITIONED BY (pt INT)
+                        STORED AS ORC
+                    """
+
+                    // Hive writes 3 partitions
+                    hive_docker """
+                        INSERT INTO ${dbName}.${tblName} PARTITION(pt=1)
+                        VALUES (1, 'hive_pt1')
+                    """
+                    hive_docker """
+                        INSERT INTO ${dbName}.${tblName} PARTITION(pt=2)
+                        VALUES (2, 'hive_pt2')
+                    """
+                    hive_docker """
+                        INSERT INTO ${dbName}.${tblName} PARTITION(pt=3)
+                        VALUES (3, 'hive_pt3')
+                    """
+
+                    sql """refresh catalog `${catalog_name}`"""      
+                    // Doris reads data to populate cache (only knows about 3 
partitions)
+                    def result1 = sql """SELECT COUNT(*) as cnt FROM 
`${catalog_name}`.`${dbName}`.`${tblName}`"""
+                    assertEquals(3, result1[0][0])
+                    logger.info("Doris cache populated with 3 partitions")
+
+                    // Hive writes 4th partition (Doris cache doesn't know 
about it)
+                    hive_docker """
+                        INSERT INTO ${dbName}.${tblName} PARTITION(pt=4)
+                        VALUES (4, 'hive_pt4')
+                    """
+                    logger.info("Hive added 4th partition (pt=4)")
+
+                    // Doris writes to the 4th partition
+                    // This should trigger cache miss detection and treat as 
APPEND instead of NEW
+                    sql """
+                        INSERT INTO `${catalog_name}`.`${dbName}`.`${tblName}`
+                        VALUES (40, 'doris_pt4', 4)
+                    """
+                    logger.info("Doris wrote to 4th partition (should handle 
cache miss)")
+
+                    // Verify: should have 5 rows total (3 from hive + 1 from 
hive pt4 + 1 from doris pt4)
+                    def result2 = sql """SELECT COUNT(*) as cnt FROM 
`${catalog_name}`.`${dbName}`.`${tblName}`"""
+                    assertEquals(5, result2[0][0])
+
+                    // Verify partition 4 has 2 rows
+                    def result3 = sql """
+                        SELECT COUNT(*) as cnt
+                        FROM `${catalog_name}`.`${dbName}`.`${tblName}`
+                        WHERE pt = 4
+                    """
+                    assertEquals(2, result3[0][0])
+
+                    // Verify data content
+                    def result4 = sql """
+                        SELECT id, name
+                        FROM `${catalog_name}`.`${dbName}`.`${tblName}`
+                        WHERE pt = 4
+                        ORDER BY id
+                    """
+                    assertEquals(2, result4.size())
+                    assertEquals(4, result4[0][0])
+                    assertEquals("hive_pt4", result4[0][1])
+                    assertEquals(40, result4[1][0])
+                    assertEquals("doris_pt4", result4[1][1])
+
+                    logger.info("Cache miss test passed!")
+
+                } finally {
+                    // Clean up
+                    try {
+                        hive_docker """DROP TABLE IF EXISTS 
${dbName}.${tblName}"""
+                        hive_docker """DROP DATABASE IF EXISTS ${dbName}"""
+                    } catch (Exception e) {
+                        logger.warn("Cleanup failed: ${e.message}")
+                    }
+                }
+            }
+
+            test_cache_miss()
+
             qt_string_partition_table_with_comma """
                 select * from 
partition_tables.string_partition_table_with_comma order by id;
             """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to