This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new a5ce97a159b branch-3.1: [opt](hive) Speed up Hive insert on partition
tables using cache #58166 #58606 #58748 (#58886)
a5ce97a159b is described below
commit a5ce97a159b1347204f4a26214c419cf9b1f8207
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Dec 15 11:15:42 2025 +0800
branch-3.1: [opt](hive) Speed up Hive insert on partition tables using
cache #58166 #58606 #58748 (#58886)
picked from #58166 #58606 #58748
---------
Co-authored-by: zy-kkk <[email protected]>
Co-authored-by: Calvin Kirs <[email protected]>
---
.../org/apache/doris/catalog/RefreshManager.java | 21 ++++-
.../doris/common/profile/SummaryProfile.java | 17 ++++
.../apache/doris/datasource/ExternalObjectLog.java | 20 +++++
.../doris/datasource/hive/HMSTransaction.java | 98 +++++++++++++++++-----
.../doris/datasource/hive/HiveMetaStoreCache.java | 63 ++++++++++++++
.../insert/BaseExternalTableInsertExecutor.java | 23 +++--
.../plans/commands/insert/HiveInsertExecutor.java | 50 +++++++++++
.../org/apache/doris/planner/HiveTableSink.java | 39 +++++++--
.../apache/doris/planner/HiveTableSinkTest.java | 9 ++
.../hive/test_hive_partitions.groovy | 97 +++++++++++++++++++++
10 files changed, 401 insertions(+), 36 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 494946fd1c0..9738ee39355 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -26,7 +26,9 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.persist.OperationType;
import com.google.common.base.Strings;
@@ -183,7 +185,24 @@ public class RefreshManager {
db.get().unregisterTable(log.getTableName());
db.get().resetMetaCacheNames();
} else {
- refreshTableInternal(db.get(), table.get(),
log.getLastUpdateTime());
+ List<String> modifiedPartNames = log.getPartitionNames();
+ List<String> newPartNames = log.getNewPartitionNames();
+ if (catalog instanceof HMSExternalCatalog
+ && ((modifiedPartNames != null &&
!modifiedPartNames.isEmpty())
+ || (newPartNames != null && !newPartNames.isEmpty()))) {
+ // Partition-level cache invalidation, only for hive catalog
+ HiveMetaStoreCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog) catalog);
+ cache.refreshAffectedPartitionsCache((HMSExternalTable)
table.get(), modifiedPartNames, newPartNames);
+ LOG.info("replay refresh partitions for table {}, "
+ + "modified partitions count: {}, "
+ + "new partitions count: {}",
+ table.get().getName(), modifiedPartNames == null ? 0 :
modifiedPartNames.size(),
+ newPartNames == null ? 0 : newPartNames.size());
+ } else {
+ // Full table cache invalidation
+ refreshTableInternal(db.get(), table.get(),
log.getLastUpdateTime());
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 1410f961881..87516a19a77 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -83,6 +83,7 @@ public class SummaryProfile {
public static final String GET_PARTITIONS_TIME = "Get Partitions Time";
public static final String GET_PARTITION_FILES_TIME = "Get Partition Files
Time";
public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range
Time";
+ public static final String SINK_SET_PARTITION_VALUES_TIME = "Sink Set
Partition Values Time";
public static final String PLAN_TIME = "Plan Time";
public static final String SCHEDULE_TIME = "Schedule Time";
public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time";
@@ -158,6 +159,7 @@ public class SummaryProfile {
GET_SPLITS_TIME,
GET_PARTITIONS_TIME,
GET_PARTITION_FILES_TIME,
+ SINK_SET_PARTITION_VALUES_TIME,
CREATE_SCAN_RANGE_TIME,
DISTRIBUTE_TIME,
GET_META_VERSION_TIME,
@@ -208,6 +210,7 @@ public class SummaryProfile {
.put(NEREIDS_BE_FOLD_CONST_TIME, 2)
.put(GET_PARTITIONS_TIME, 3)
.put(GET_PARTITION_FILES_TIME, 3)
+ .put(SINK_SET_PARTITION_VALUES_TIME, 3)
.put(CREATE_SCAN_RANGE_TIME, 2)
.put(GET_PARTITION_VERSION_TIME, 1)
.put(GET_PARTITION_VERSION_COUNT, 1)
@@ -277,6 +280,10 @@ public class SummaryProfile {
private long getPartitionsFinishTime = -1;
@SerializedName(value = "getPartitionFilesFinishTime")
private long getPartitionFilesFinishTime = -1;
+ @SerializedName(value = "sinkSetPartitionValuesStartTime")
+ private long sinkSetPartitionValuesStartTime = -1;
+ @SerializedName(value = "sinkSetPartitionValuesFinishTime")
+ private long sinkSetPartitionValuesFinishTime = -1;
@SerializedName(value = "getSplitsFinishTime")
private long getSplitsFinishTime = -1;
@SerializedName(value = "createScanRangeFinishTime")
@@ -463,6 +470,8 @@ public class SummaryProfile {
getPrettyTime(getPartitionsFinishTime, getSplitsStartTime,
TUnit.TIME_MS));
executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME,
getPrettyTime(getPartitionFilesFinishTime,
getPartitionsFinishTime, TUnit.TIME_MS));
+ executionSummaryProfile.addInfoString(SINK_SET_PARTITION_VALUES_TIME,
+ getPrettyTime(sinkSetPartitionValuesFinishTime,
sinkSetPartitionValuesStartTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME,
getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime,
TUnit.TIME_MS));
executionSummaryProfile.addInfoString(SCHEDULE_TIME,
@@ -601,6 +610,14 @@ public class SummaryProfile {
this.getPartitionsFinishTime = TimeUtils.getStartTimeMs();
}
+ public void setSinkGetPartitionsStartTime() {
+ this.sinkSetPartitionValuesStartTime = TimeUtils.getStartTimeMs();
+ }
+
+ public void setSinkGetPartitionsFinishTime() {
+ this.sinkSetPartitionValuesFinishTime = TimeUtils.getStartTimeMs();
+ }
+
public void setGetPartitionFilesFinishTime() {
this.getPartitionFilesFinishTime = TimeUtils.getStartTimeMs();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
index cf7872f8b39..43a0675d783 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -58,6 +58,9 @@ public class ExternalObjectLog implements Writable {
@SerializedName(value = "partitionNames")
private List<String> partitionNames;
+ @SerializedName(value = "newPartitionNames")
+ private List<String> newPartitionNames;
+
@SerializedName(value = "lastUpdateTime")
private long lastUpdateTime;
@@ -80,6 +83,17 @@ public class ExternalObjectLog implements Writable {
return externalObjectLog;
}
+ public static ExternalObjectLog createForRefreshPartitions(long catalogId,
String dbName, String tblName,
+ List<String> modifiedPartNames, List<String> newPartNames) {
+ ExternalObjectLog externalObjectLog = new ExternalObjectLog();
+ externalObjectLog.setCatalogId(catalogId);
+ externalObjectLog.setDbName(dbName);
+ externalObjectLog.setTableName(tblName);
+ externalObjectLog.setPartitionNames(modifiedPartNames);
+ externalObjectLog.setNewPartitionNames(newPartNames);
+ return externalObjectLog;
+ }
+
public static ExternalObjectLog createForRenameTable(long catalogId,
String dbName, String tblName,
String newTblName) {
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
@@ -124,6 +138,12 @@ public class ExternalObjectLog implements Writable {
} else {
sb.append("tableId: " + tableId + "]");
}
+ if (partitionNames != null && !partitionNames.isEmpty()) {
+ sb.append(", partitionNames: " + partitionNames);
+ }
+ if (newPartitionNames != null && !newPartitionNames.isEmpty()) {
+ sb.append(", newPartitionNames: " + newPartitionNames);
+ }
return sb.toString();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 2864f146f03..182e214b00c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -47,6 +47,7 @@ import org.apache.doris.transaction.Transaction;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@@ -268,30 +269,48 @@ public class HMSTransaction implements Transaction {
insertExistsPartitions.add(Pair.of(pu,
hivePartitionStatistics));
break;
case NEW:
+ // Check if partition really exists in HMS (may be
cache miss in Doris)
+ String partitionName = pu.getName();
+ if (Strings.isNullOrEmpty(partitionName)) {
+ // This should not happen for partitioned tables
+ LOG.warn("Partition name is null/empty for NEW
mode in partitioned table, skipping");
+ break;
+ }
+ List<String> partitionValues =
HiveUtil.toPartitionValues(partitionName);
+ boolean existsInHMS = false;
+ try {
+ Partition hmsPartition =
hiveOps.getClient().getPartition(
+ nameMapping.getRemoteDbName(),
+ nameMapping.getRemoteTblName(),
+ partitionValues);
+ existsInHMS = (hmsPartition != null);
+ } catch (Exception e) {
+ // Partition not found in HMS, treat as truly new
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition {} not found in HMS, will
create it", pu.getName());
+ }
+ }
+
+ if (existsInHMS) {
+ // Partition exists in HMS but not in Doris cache
+ // Treat as APPEND instead of NEW to avoid
creation error
+ LOG.info("Partition {} already exists in HMS
(Doris cache miss), treating as APPEND",
+ pu.getName());
+ insertExistsPartitions.add(Pair.of(pu,
hivePartitionStatistics));
+ } else {
+ // Truly new partition, create it
+ createAndAddPartition(nameMapping, table,
partitionValues, writePath,
+ pu, hivePartitionStatistics, false);
+ }
+ break;
case OVERWRITE:
- StorageDescriptor sd = table.getSd();
- // For object storage (FILE_S3), use writePath to keep
original scheme (oss://, cos://)
- // For HDFS, use targetPath which is the final path
after rename
- String pathForHMS = this.fileType == TFileType.FILE_S3
- ? writePath
- : pu.getLocation().getTargetPath();
- HivePartition hivePartition = new HivePartition(
- nameMapping,
- false,
- sd.getInputFormat(),
- pathForHMS,
- HiveUtil.toPartitionValues(pu.getName()),
- Maps.newHashMap(),
- sd.getOutputFormat(),
- sd.getSerdeInfo().getSerializationLib(),
- sd.getCols()
- );
- if (updateMode == TUpdateMode.OVERWRITE) {
- dropPartition(nameMapping,
hivePartition.getPartitionValues(), true);
+ String overwritePartitionName = pu.getName();
+ if (Strings.isNullOrEmpty(overwritePartitionName)) {
+ LOG.warn("Partition name is null/empty for
OVERWRITE mode in partitioned table, skipping");
+ break;
}
- addPartition(
- nameMapping, hivePartition, writePath,
- pu.getName(), pu.getFileNames(),
hivePartitionStatistics, pu);
+ createAndAddPartition(nameMapping, table,
HiveUtil.toPartitionValues(overwritePartitionName),
+ writePath, pu, hivePartitionStatistics, true);
break;
default:
throw new RuntimeException("Not support mode:[" +
updateMode + "] in partitioned table");
@@ -377,6 +396,10 @@ public class HMSTransaction implements Transaction {
return
hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
}
+ public List<THivePartitionUpdate> getHivePartitionUpdates() {
+ return hivePartitionUpdates;
+ }
+
private void convertToInsertExistingPartitionAction(
NameMapping nameMapping,
List<Pair<THivePartitionUpdate, HivePartitionStatistics>>
partitions) {
@@ -1028,6 +1051,37 @@ public class HMSTransaction implements Transaction {
}
}
+ private void createAndAddPartition(
+ NameMapping nameMapping,
+ Table table,
+ List<String> partitionValues,
+ String writePath,
+ THivePartitionUpdate pu,
+ HivePartitionStatistics hivePartitionStatistics,
+ boolean dropFirst) {
+ StorageDescriptor sd = table.getSd();
+ String pathForHMS = this.fileType == TFileType.FILE_S3
+ ? writePath
+ : pu.getLocation().getTargetPath();
+ HivePartition hivePartition = new HivePartition(
+ nameMapping,
+ false,
+ sd.getInputFormat(),
+ pathForHMS,
+ partitionValues,
+ Maps.newHashMap(),
+ sd.getOutputFormat(),
+ sd.getSerdeInfo().getSerializationLib(),
+ sd.getCols()
+ );
+ if (dropFirst) {
+ dropPartition(nameMapping, hivePartition.getPartitionValues(),
true);
+ }
+ addPartition(
+ nameMapping, hivePartition, writePath,
+ pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu);
+ }
+
public synchronized void addPartition(
NameMapping nameMapping,
HivePartition partition,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 883be276f6f..c0dbf68495e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -61,6 +61,7 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
@@ -90,6 +91,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -580,6 +582,67 @@ public class HiveMetaStoreCache {
}
}
+ /**
+ * Selectively refreshes cache for affected partitions based on update
information from BE.
+ * For APPEND/OVERWRITE: invalidate both partition cache and file cache
using existing method.
+ * For NEW: add to partition values cache.
+ *
+ * @param table The Hive table whose partitions were modified
+ * @param partitionUpdates List of partition updates from BE
+ * @param modifiedPartNames Output list to collect names of modified
partitions
+ * @param newPartNames Output list to collect names of new partitions
+ */
+ public void refreshAffectedPartitions(HMSExternalTable table,
+ List<org.apache.doris.thrift.THivePartitionUpdate>
partitionUpdates,
+ List<String> modifiedPartNames, List<String> newPartNames) {
+ if (partitionUpdates == null || partitionUpdates.isEmpty()) {
+ return;
+ }
+
+ for (org.apache.doris.thrift.THivePartitionUpdate update :
partitionUpdates) {
+ String partitionName = update.getName();
+ // Skip if partition name is null/empty (non-partitioned table
case)
+ if (Strings.isNullOrEmpty(partitionName)) {
+ continue;
+ }
+
+ switch (update.getUpdateMode()) {
+ case APPEND:
+ case OVERWRITE:
+ modifiedPartNames.add(partitionName);
+ break;
+ case NEW:
+ newPartNames.add(partitionName);
+ break;
+ default:
+ LOG.warn("Unknown update mode {} for partition {}",
+ update.getUpdateMode(), partitionName);
+ break;
+ }
+ }
+
+ refreshAffectedPartitionsCache(table, modifiedPartNames, newPartNames);
+ }
+
+ public void refreshAffectedPartitionsCache(HMSExternalTable table,
+ List<String> modifiedPartNames, List<String> newPartNames) {
+
+ // Invalidate cache for modified partitions (both partition cache and
file cache)
+ for (String partitionName : modifiedPartNames) {
+ invalidatePartitionCache(table, partitionName);
+ }
+
+ // Add new partitions to partition values cache
+ if (!newPartNames.isEmpty()) {
+ addPartitionsCache(table.getOrBuildNameMapping(), newPartNames,
+ table.getPartitionColumnTypes(Optional.empty()));
+ }
+
+ // Log summary
+ LOG.info("Refreshed cache for table {}: {} modified partitions, {} new
partitions",
+ table.getName(), modifiedPartNames.size(),
newPartNames.size());
+ }
+
public void invalidateDbCache(String dbName) {
long start = System.currentTimeMillis();
Set<PartitionValueCacheKey> keys =
partitionValuesCache.asMap().keySet();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
index 93693233e11..f82fefd89f2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
@@ -18,6 +18,7 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
@@ -110,14 +111,26 @@ public abstract class BaseExternalTableInsertExecutor
extends AbstractInsertExec
}
summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
txnStatus = TransactionStatus.COMMITTED;
- Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
- catalogName,
- table.getDatabase().getFullName(),
- table.getName(),
- true);
+
+ // Handle post-commit operations (e.g., cache refresh)
+ doAfterCommit();
}
}
+ /**
+ * Called after transaction commit.
+ * Subclasses can override this to customize post-commit behavior.
+ * Default: full table refresh.
+ */
+ protected void doAfterCommit() throws DdlException {
+ // Default: full table refresh
+ Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
+ catalogName,
+ table.getDatabase().getFullName(),
+ table.getName(),
+ true);
+ }
+
@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index 64f68454d84..f9345a4495c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -17,20 +17,28 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSTransaction;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionType;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Optional;
/**
@@ -39,6 +47,8 @@ import java.util.Optional;
public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
private static final Logger LOG =
LogManager.getLogger(HiveInsertExecutor.class);
+ private List<THivePartitionUpdate> partitionUpdates;
+
/**
* constructor
*/
@@ -65,6 +75,46 @@ public class HiveInsertExecutor extends
BaseExternalTableInsertExecutor {
HMSTransaction transaction = (HMSTransaction)
transactionManager.getTransaction(txnId);
loadedRows = transaction.getUpdateCnt();
transaction.finishInsertTable(((ExternalTable)
table).getOrBuildNameMapping());
+
+ // Save partition updates for cache refresh after commit
+ partitionUpdates = transaction.getHivePartitionUpdates();
+ }
+
+ @Override
+ protected void doAfterCommit() throws DdlException {
+ HMSExternalTable hmsTable = (HMSExternalTable) table;
+
+ // For partitioned tables, do selective partition refresh
+ // For non-partitioned tables, do full table cache invalidation
+ List<String> modifiedPartNames = Lists.newArrayList();
+ List<String> newPartNames = Lists.newArrayList();
+ if (hmsTable.isPartitionedTable() && partitionUpdates != null &&
!partitionUpdates.isEmpty()) {
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
+ cache.refreshAffectedPartitions(hmsTable, partitionUpdates,
modifiedPartNames, newPartNames);
+ } else {
+ // Non-partitioned table or no partition updates, do full table
refresh
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable);
+ }
+
+ // Write edit log to notify other FEs
+ ExternalObjectLog log;
+ if (!modifiedPartNames.isEmpty() || !newPartNames.isEmpty()) {
+ // Partition-level refresh for other FEs
+ log = ExternalObjectLog.createForRefreshPartitions(
+ hmsTable.getCatalog().getId(),
+ table.getDatabase().getFullName(),
+ table.getName(),
+ modifiedPartNames,
+ newPartNames);
+ } else {
+ // Full table refresh for other FEs
+ log = ExternalObjectLog.createForRefreshTable(
+ hmsTable.getCatalog().getId(),
+ table.getDatabase().getFullName(),
+ table.getName());
+ }
+ Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index 68a0edc430f..2ae4ae8e397 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -21,12 +21,16 @@
package org.apache.doris.planner;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
+import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.HiveProperties;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.qe.ConnectContext;
@@ -191,18 +195,32 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
}
private void setPartitionValues(THiveTableSink tSink) throws
AnalysisException {
+ if (ConnectContext.get().getExecutor() != null) {
+
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsStartTime();
+ }
+
List<THivePartition> partitions = new ArrayList<>();
- List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions =
- ((HMSExternalCatalog) targetTable.getCatalog())
-
.getClient().listPartitions(targetTable.getRemoteDbName(),
targetTable.getRemoteName());
- for (org.apache.hadoop.hive.metastore.api.Partition partition :
hivePartitions) {
+
+ List<HivePartition> hivePartitions = new ArrayList<>();
+ if (targetTable.isPartitionedTable()) {
+ // Get partitions from cache instead of HMS client (similar to
HiveScanNode)
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog)
targetTable.getCatalog());
+ HiveMetaStoreCache.HivePartitionValues partitionValues =
+
targetTable.getHivePartitionValues(MvccUtil.getSnapshotFromContext(targetTable));
+ List<List<String>> partitionValuesList =
+ new
ArrayList<>(partitionValues.getPartitionValuesMap().values());
+ hivePartitions = cache.getAllPartitionsWithCache(targetTable,
partitionValuesList);
+ }
+
+ // Convert HivePartition to THivePartition (same logic as before)
+ for (HivePartition partition : hivePartitions) {
THivePartition hivePartition = new THivePartition();
- StorageDescriptor sd = partition.getSd();
-
hivePartition.setFileFormat(getTFileFormatType(sd.getInputFormat()));
+
hivePartition.setFileFormat(getTFileFormatType(partition.getInputFormat()));
+ hivePartition.setValues(partition.getPartitionValues());
- hivePartition.setValues(partition.getValues());
THiveLocationParams locationParams = new THiveLocationParams();
- String location = sd.getLocation();
+ String location = partition.getPath();
// pass the same of write path and target path to partition
locationParams.setWritePath(location);
locationParams.setTargetPath(location);
@@ -210,7 +228,12 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
hivePartition.setLocation(locationParams);
partitions.add(hivePartition);
}
+
tSink.setPartitions(partitions);
+
+ if (ConnectContext.get().getExecutor() != null) {
+
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsFinishTime();
+ }
}
private void setSerDeProperties(THiveTableSink tSink) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
index c8bc82e67a1..8f4adff727c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.ThriftHMSCachedClient;
import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.qe.ConnectContext;
import mockit.Mock;
import mockit.MockUp;
@@ -48,10 +49,13 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+
public class HiveTableSinkTest {
@Test
public void testBindDataSink() throws UserException {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setThreadLocalInfo();
new MockUp<ThriftHMSCachedClient>() {
@Mock
@@ -123,6 +127,11 @@ public class HiveTableSinkTest {
private void mockDifferLocationTable(String location) {
new MockUp<HMSExternalTable>() {
+ @Mock
+ public boolean isPartitionedTable() {
+ return false;
+ }
+
@Mock
public Set<String> getPartitionColumnNames() {
return new HashSet<String>() {{
diff --git
a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
index cc3425106a5..b8b3a39a2f9 100644
--- a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
+++ b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
@@ -77,6 +77,7 @@ suite("test_hive_partitions",
"p0,external,hive,external_docker,external_docker_
}
for (String hivePrefix : ["hive2", "hive3"]) {
+ setHivePrefix(hivePrefix)
try {
String hms_port = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
String catalog_name = "${hivePrefix}_test_partitions"
@@ -91,6 +92,102 @@ suite("test_hive_partitions",
"p0,external,hive,external_docker,external_docker_
q01()
+ // Test cache miss scenario: Hive adds partition, then Doris
writes to it
+ def test_cache_miss = {
+ def dbName = "test_cache_miss_db"
+ def tblName = "test_cache_miss_table"
+
+ try {
+ // Clean up
+ hive_docker """DROP TABLE IF EXISTS ${dbName}.${tblName}"""
+ hive_docker """DROP DATABASE IF EXISTS ${dbName}"""
+
+ // Create database and partitioned table in Hive
+ hive_docker """CREATE DATABASE ${dbName}"""
+ hive_docker """
+ CREATE TABLE ${dbName}.${tblName} (
+ id INT,
+ name STRING
+ )
+ PARTITIONED BY (pt INT)
+ STORED AS ORC
+ """
+
+ // Hive writes 3 partitions
+ hive_docker """
+ INSERT INTO ${dbName}.${tblName} PARTITION(pt=1)
+ VALUES (1, 'hive_pt1')
+ """
+ hive_docker """
+ INSERT INTO ${dbName}.${tblName} PARTITION(pt=2)
+ VALUES (2, 'hive_pt2')
+ """
+ hive_docker """
+ INSERT INTO ${dbName}.${tblName} PARTITION(pt=3)
+ VALUES (3, 'hive_pt3')
+ """
+
+ sql """refresh catalog `${catalog_name}`"""
+ // Doris reads data to populate cache (only knows about 3
partitions)
+ def result1 = sql """SELECT COUNT(*) as cnt FROM
`${catalog_name}`.`${dbName}`.`${tblName}`"""
+ assertEquals(3, result1[0][0])
+ logger.info("Doris cache populated with 3 partitions")
+
+ // Hive writes 4th partition (Doris cache doesn't know
about it)
+ hive_docker """
+ INSERT INTO ${dbName}.${tblName} PARTITION(pt=4)
+ VALUES (4, 'hive_pt4')
+ """
+ logger.info("Hive added 4th partition (pt=4)")
+
+ // Doris writes to the 4th partition
+ // This should trigger cache miss detection and treat as
APPEND instead of NEW
+ sql """
+ INSERT INTO `${catalog_name}`.`${dbName}`.`${tblName}`
+ VALUES (40, 'doris_pt4', 4)
+ """
+ logger.info("Doris wrote to 4th partition (should handle
cache miss)")
+
+ // Verify: should have 5 rows total (3 from hive + 1 from
hive pt4 + 1 from doris pt4)
+ def result2 = sql """SELECT COUNT(*) as cnt FROM
`${catalog_name}`.`${dbName}`.`${tblName}`"""
+ assertEquals(5, result2[0][0])
+
+ // Verify partition 4 has 2 rows
+ def result3 = sql """
+ SELECT COUNT(*) as cnt
+ FROM `${catalog_name}`.`${dbName}`.`${tblName}`
+ WHERE pt = 4
+ """
+ assertEquals(2, result3[0][0])
+
+ // Verify data content
+ def result4 = sql """
+ SELECT id, name
+ FROM `${catalog_name}`.`${dbName}`.`${tblName}`
+ WHERE pt = 4
+ ORDER BY id
+ """
+ assertEquals(2, result4.size())
+ assertEquals(4, result4[0][0])
+ assertEquals("hive_pt4", result4[0][1])
+ assertEquals(40, result4[1][0])
+ assertEquals("doris_pt4", result4[1][1])
+
+ logger.info("Cache miss test passed!")
+
+ } finally {
+ // Clean up
+ try {
+ hive_docker """DROP TABLE IF EXISTS
${dbName}.${tblName}"""
+ hive_docker """DROP DATABASE IF EXISTS ${dbName}"""
+ } catch (Exception e) {
+ logger.warn("Cleanup failed: ${e.message}")
+ }
+ }
+ }
+
+ test_cache_miss()
+
qt_string_partition_table_with_comma """
select * from
partition_tables.string_partition_table_with_comma order by id;
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]