This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bb4122a9066 [opt](hive) Speed up Hive insert on partition tables using
cache (#58166)
bb4122a9066 is described below
commit bb4122a9066684904ed540f528c0b363d0b78fd7
Author: zy-kkk <[email protected]>
AuthorDate: Mon Dec 1 09:56:40 2025 +0800
[opt](hive) Speed up Hive insert on partition tables using cache (#58166)
### What problem does this PR solve?
For Hive tables with massive partitions (10K+), INSERT operations are
extremely slow because:
- FE fetches all partition metadata from HMS directly (expensive RPC
calls)
- Full table cache invalidation after each insert (unnecessary)
Problem Summary:
1. **Use cache for partition metadata in INSERT**
- FE now fetches partition info from cache instead of directly querying
HMS when preparing INSERT
- Avoid expensive HMS RPC calls for every INSERT operation
2. **Selective cache refresh after commit**
- Only invalidate affected partitions instead of full table cache
- Based on partition update info from BE (NEW/APPEND/OVERWRITE)
- Significantly reduces cache invalidation overhead
3. **Handle cache inconsistency gracefully**
- When BE marks partition as NEW but it already exists in HMS (cache
miss)
- FE detects this by checking HMS and treats it as APPEND instead of
failing
- Prevents `AlreadyExistsException` errors
For tables with partitions:
- **Before**: HMS calls per INSERT + full cache invalidation
- **After**: cache lookup + selective partition refresh
- Expected speedup: 10x-100x for partition metadata fetching phas
---
.../org/apache/doris/catalog/RefreshManager.java | 12 ++-
.../doris/common/profile/SummaryProfile.java | 17 ++++
.../apache/doris/datasource/ExternalObjectLog.java | 10 +++
.../doris/datasource/hive/HMSTransaction.java | 98 +++++++++++++++++-----
.../doris/datasource/hive/HiveMetaStoreCache.java | 57 +++++++++++++
.../insert/BaseExternalTableInsertExecutor.java | 23 +++--
.../plans/commands/insert/HiveInsertExecutor.java | 57 +++++++++++++
.../org/apache/doris/planner/HiveTableSink.java | 49 +++++++++--
.../apache/doris/planner/HiveTableSinkTest.java | 9 ++
.../hive/test_hive_partitions.groovy | 97 +++++++++++++++++++++
10 files changed, 393 insertions(+), 36 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 494946fd1c0..67c87688994 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -183,7 +183,17 @@ public class RefreshManager {
db.get().unregisterTable(log.getTableName());
db.get().resetMetaCacheNames();
} else {
- refreshTableInternal(db.get(), table.get(),
log.getLastUpdateTime());
+ List<String> partitionNames = log.getPartitionNames();
+ if (partitionNames != null && !partitionNames.isEmpty()) {
+ // Partition-level cache invalidation
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+ .invalidatePartitionsCache(table.get(),
partitionNames);
+ LOG.info("replay refresh partitions for table {}, partitions
count: {}",
+ table.get().getName(), partitionNames.size());
+ } else {
+ // Full table cache invalidation
+ refreshTableInternal(db.get(), table.get(),
log.getLastUpdateTime());
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 4e99ee14cf7..da86a3cef55 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -85,6 +85,7 @@ public class SummaryProfile {
public static final String GET_PARTITIONS_TIME = "Get Partitions Time";
public static final String GET_PARTITION_FILES_TIME = "Get Partition Files
Time";
public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range
Time";
+ public static final String SINK_SET_PARTITION_VALUES_TIME = "Sink Set
Partition Values Time";
public static final String PLAN_TIME = "Plan Time";
public static final String SCHEDULE_TIME = "Schedule Time";
public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time";
@@ -161,6 +162,7 @@ public class SummaryProfile {
GET_SPLITS_TIME,
GET_PARTITIONS_TIME,
GET_PARTITION_FILES_TIME,
+ SINK_SET_PARTITION_VALUES_TIME,
CREATE_SCAN_RANGE_TIME,
NEREIDS_DISTRIBUTE_TIME,
GET_META_VERSION_TIME,
@@ -211,6 +213,7 @@ public class SummaryProfile {
.put(NEREIDS_BE_FOLD_CONST_TIME, 2)
.put(GET_PARTITIONS_TIME, 3)
.put(GET_PARTITION_FILES_TIME, 3)
+ .put(SINK_SET_PARTITION_VALUES_TIME, 3)
.put(CREATE_SCAN_RANGE_TIME, 2)
.put(GET_PARTITION_VERSION_TIME, 1)
.put(GET_PARTITION_VERSION_COUNT, 1)
@@ -281,6 +284,10 @@ public class SummaryProfile {
private long getPartitionsFinishTime = -1;
@SerializedName(value = "getPartitionFilesFinishTime")
private long getPartitionFilesFinishTime = -1;
+ @SerializedName(value = "sinkSetPartitionValuesStartTime")
+ private long sinkSetPartitionValuesStartTime = -1;
+ @SerializedName(value = "sinkSetPartitionValuesFinishTime")
+ private long sinkSetPartitionValuesFinishTime = -1;
@SerializedName(value = "getSplitsFinishTime")
private long getSplitsFinishTime = -1;
@SerializedName(value = "createScanRangeFinishTime")
@@ -477,6 +484,8 @@ public class SummaryProfile {
getPrettyTime(getPartitionsFinishTime, getSplitsStartTime,
TUnit.TIME_MS));
executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME,
getPrettyTime(getPartitionFilesFinishTime,
getPartitionsFinishTime, TUnit.TIME_MS));
+ executionSummaryProfile.addInfoString(SINK_SET_PARTITION_VALUES_TIME,
+ getPrettyTime(sinkSetPartitionValuesFinishTime,
sinkSetPartitionValuesStartTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME,
getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime,
TUnit.TIME_MS));
executionSummaryProfile.addInfoString(SCHEDULE_TIME,
@@ -619,6 +628,14 @@ public class SummaryProfile {
this.getPartitionsFinishTime = TimeUtils.getStartTimeMs();
}
+ public void setSinkGetPartitionsStartTime() {
+ this.sinkSetPartitionValuesStartTime = TimeUtils.getStartTimeMs();
+ }
+
+ public void setSinkGetPartitionsFinishTime() {
+ this.sinkSetPartitionValuesFinishTime = TimeUtils.getStartTimeMs();
+ }
+
public void setGetPartitionFilesFinishTime() {
this.getPartitionFilesFinishTime = TimeUtils.getStartTimeMs();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
index cf7872f8b39..313f6fc403f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -80,6 +80,16 @@ public class ExternalObjectLog implements Writable {
return externalObjectLog;
}
+ public static ExternalObjectLog createForRefreshPartitions(long catalogId,
String dbName, String tblName,
+ List<String> partitionNames) {
+ ExternalObjectLog externalObjectLog = new ExternalObjectLog();
+ externalObjectLog.setCatalogId(catalogId);
+ externalObjectLog.setDbName(dbName);
+ externalObjectLog.setTableName(tblName);
+ externalObjectLog.setPartitionNames(partitionNames);
+ return externalObjectLog;
+ }
+
public static ExternalObjectLog createForRenameTable(long catalogId,
String dbName, String tblName,
String newTblName) {
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 7141bdd9874..54ee9f46ec5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -47,6 +47,7 @@ import org.apache.doris.transaction.Transaction;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@@ -268,30 +269,48 @@ public class HMSTransaction implements Transaction {
insertExistsPartitions.add(Pair.of(pu,
hivePartitionStatistics));
break;
case NEW:
+ // Check if partition really exists in HMS (may be
cache miss in Doris)
+ String partitionName = pu.getName();
+ if (Strings.isNullOrEmpty(partitionName)) {
+ // This should not happen for partitioned tables
+ LOG.warn("Partition name is null/empty for NEW
mode in partitioned table, skipping");
+ break;
+ }
+ List<String> partitionValues =
HiveUtil.toPartitionValues(partitionName);
+ boolean existsInHMS = false;
+ try {
+ Partition hmsPartition =
hiveOps.getClient().getPartition(
+ nameMapping.getRemoteDbName(),
+ nameMapping.getRemoteTblName(),
+ partitionValues);
+ existsInHMS = (hmsPartition != null);
+ } catch (Exception e) {
+ // Partition not found in HMS, treat as truly new
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition {} not found in HMS, will
create it", pu.getName());
+ }
+ }
+
+ if (existsInHMS) {
+ // Partition exists in HMS but not in Doris cache
+ // Treat as APPEND instead of NEW to avoid
creation error
+ LOG.info("Partition {} already exists in HMS
(Doris cache miss), treating as APPEND",
+ pu.getName());
+ insertExistsPartitions.add(Pair.of(pu,
hivePartitionStatistics));
+ } else {
+ // Truly new partition, create it
+ createAndAddPartition(nameMapping, table,
partitionValues, writePath,
+ pu, hivePartitionStatistics, false);
+ }
+ break;
case OVERWRITE:
- StorageDescriptor sd = table.getSd();
- // For object storage (FILE_S3), use writePath to keep
original scheme (oss://, cos://)
- // For HDFS, use targetPath which is the final path
after rename
- String pathForHMS = this.fileType == TFileType.FILE_S3
- ? writePath
- : pu.getLocation().getTargetPath();
- HivePartition hivePartition = new HivePartition(
- nameMapping,
- false,
- sd.getInputFormat(),
- pathForHMS,
- HiveUtil.toPartitionValues(pu.getName()),
- Maps.newHashMap(),
- sd.getOutputFormat(),
- sd.getSerdeInfo().getSerializationLib(),
- sd.getCols()
- );
- if (updateMode == TUpdateMode.OVERWRITE) {
- dropPartition(nameMapping,
hivePartition.getPartitionValues(), true);
+ String overwritePartitionName = pu.getName();
+ if (Strings.isNullOrEmpty(overwritePartitionName)) {
+ LOG.warn("Partition name is null/empty for
OVERWRITE mode in partitioned table, skipping");
+ break;
}
- addPartition(
- nameMapping, hivePartition, writePath,
- pu.getName(), pu.getFileNames(),
hivePartitionStatistics, pu);
+ createAndAddPartition(nameMapping, table,
HiveUtil.toPartitionValues(overwritePartitionName),
+ writePath, pu, hivePartitionStatistics, true);
break;
default:
throw new RuntimeException("Not support mode:[" +
updateMode + "] in partitioned table");
@@ -377,6 +396,10 @@ public class HMSTransaction implements Transaction {
return
hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
}
+ public List<THivePartitionUpdate> getHivePartitionUpdates() {
+ return hivePartitionUpdates;
+ }
+
private void convertToInsertExistingPartitionAction(
NameMapping nameMapping,
List<Pair<THivePartitionUpdate, HivePartitionStatistics>>
partitions) {
@@ -1029,6 +1052,37 @@ public class HMSTransaction implements Transaction {
}
}
+ private void createAndAddPartition(
+ NameMapping nameMapping,
+ Table table,
+ List<String> partitionValues,
+ String writePath,
+ THivePartitionUpdate pu,
+ HivePartitionStatistics hivePartitionStatistics,
+ boolean dropFirst) {
+ StorageDescriptor sd = table.getSd();
+ String pathForHMS = this.fileType == TFileType.FILE_S3
+ ? writePath
+ : pu.getLocation().getTargetPath();
+ HivePartition hivePartition = new HivePartition(
+ nameMapping,
+ false,
+ sd.getInputFormat(),
+ pathForHMS,
+ partitionValues,
+ Maps.newHashMap(),
+ sd.getOutputFormat(),
+ sd.getSerdeInfo().getSerializationLib(),
+ sd.getCols()
+ );
+ if (dropFirst) {
+ dropPartition(nameMapping, hivePartition.getPartitionValues(),
true);
+ }
+ addPartition(
+ nameMapping, hivePartition, writePath,
+ pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu);
+ }
+
public synchronized void addPartition(
NameMapping nameMapping,
HivePartition partition,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 4fbda7c19f1..82504111e34 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -60,6 +60,7 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
@@ -85,6 +86,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -564,6 +566,61 @@ public class HiveMetaStoreCache {
}
}
+ /**
+ * Selectively refreshes cache for affected partitions based on update
information from BE.
+ * For APPEND/OVERWRITE: invalidate both partition cache and file cache
using existing method.
+ * For NEW: add to partition values cache.
+ *
+ * @param table The Hive table whose partitions were modified
+ * @param partitionUpdates List of partition updates from BE
+ */
+ public void refreshAffectedPartitions(HMSExternalTable table,
+ List<org.apache.doris.thrift.THivePartitionUpdate>
partitionUpdates) {
+ if (partitionUpdates == null || partitionUpdates.isEmpty()) {
+ return;
+ }
+
+ List<String> modifiedPartitionNames = new ArrayList<>();
+ List<String> newPartitionNames = new ArrayList<>();
+
+ for (org.apache.doris.thrift.THivePartitionUpdate update :
partitionUpdates) {
+ String partitionName = update.getName();
+ // Skip if partition name is null/empty (non-partitioned table
case)
+ if (Strings.isNullOrEmpty(partitionName)) {
+ continue;
+ }
+
+ switch (update.getUpdateMode()) {
+ case APPEND:
+ case OVERWRITE:
+ modifiedPartitionNames.add(partitionName);
+ break;
+ case NEW:
+ newPartitionNames.add(partitionName);
+ break;
+ default:
+ LOG.warn("Unknown update mode {} for partition {}",
+ update.getUpdateMode(), partitionName);
+ break;
+ }
+ }
+
+ // Invalidate cache for modified partitions (both partition cache and
file cache)
+ for (String partitionName : modifiedPartitionNames) {
+ invalidatePartitionCache(table, partitionName);
+ }
+
+ // Add new partitions to partition values cache
+ if (!newPartitionNames.isEmpty()) {
+ addPartitionsCache(table.getOrBuildNameMapping(),
newPartitionNames,
+ table.getPartitionColumnTypes(Optional.empty()));
+ }
+
+ // Log summary
+ LOG.info("Refreshed cache for table {}: {} modified partitions, {} new
partitions",
+ table.getName(), modifiedPartitionNames.size(),
newPartitionNames.size());
+ }
+
public void invalidateDbCache(String dbName) {
long start = System.currentTimeMillis();
Set<PartitionValueCacheKey> keys =
partitionValuesCache.asMap().keySet();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
index b1f2856b767..1caef21851e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
@@ -112,14 +113,26 @@ public abstract class BaseExternalTableInsertExecutor
extends AbstractInsertExec
}
summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
txnStatus = TransactionStatus.COMMITTED;
- Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
- catalogName,
- table.getDatabase().getFullName(),
- table.getName(),
- true);
+
+ // Handle post-commit operations (e.g., cache refresh)
+ doAfterCommit();
}
}
+ /**
+ * Called after transaction commit.
+ * Subclasses can override this to customize post-commit behavior.
+ * Default: full table refresh.
+ */
+ protected void doAfterCommit() throws DdlException {
+ // Default: full table refresh
+ Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
+ catalogName,
+ table.getDatabase().getFullName(),
+ table.getName(),
+ true);
+ }
+
@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index 2a423b8a26b..fed495dbb35 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -17,13 +17,19 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSTransaction;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionType;
@@ -31,6 +37,8 @@ import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
/**
@@ -39,6 +47,8 @@ import java.util.Optional;
public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
private static final Logger LOG =
LogManager.getLogger(HiveInsertExecutor.class);
+ private List<THivePartitionUpdate> partitionUpdates;
+
/**
* constructor
*/
@@ -65,6 +75,53 @@ public class HiveInsertExecutor extends
BaseExternalTableInsertExecutor {
HMSTransaction transaction = (HMSTransaction)
transactionManager.getTransaction(txnId);
loadedRows = transaction.getUpdateCnt();
transaction.finishInsertTable(((ExternalTable)
table).getOrBuildNameMapping());
+
+ // Save partition updates for cache refresh after commit
+ partitionUpdates = transaction.getHivePartitionUpdates();
+ }
+
+ @Override
+ protected void doAfterCommit() throws DdlException {
+ HMSExternalTable hmsTable = (HMSExternalTable) table;
+
+ // For partitioned tables, do selective partition refresh
+ // For non-partitioned tables, do full table cache invalidation
+ List<String> affectedPartitionNames = null;
+ if (hmsTable.isPartitionedTable() && partitionUpdates != null &&
!partitionUpdates.isEmpty()) {
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
+ cache.refreshAffectedPartitions(hmsTable, partitionUpdates);
+
+ // Collect partition names for edit log
+ affectedPartitionNames = new ArrayList<>();
+ for (THivePartitionUpdate update : partitionUpdates) {
+ String partitionName = update.getName();
+ if (partitionName != null && !partitionName.isEmpty()) {
+ affectedPartitionNames.add(partitionName);
+ }
+ }
+ } else {
+ // Non-partitioned table or no partition updates, do full table
refresh
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable);
+ }
+
+ // Write edit log to notify other FEs
+ ExternalObjectLog log;
+ if (affectedPartitionNames != null &&
!affectedPartitionNames.isEmpty()) {
+ // Partition-level refresh for other FEs
+ log = ExternalObjectLog.createForRefreshPartitions(
+ hmsTable.getCatalog().getId(),
+ table.getDatabase().getFullName(),
+ table.getName(),
+ affectedPartitionNames);
+ } else {
+ // Full table refresh for other FEs
+ log = ExternalObjectLog.createForRefreshTable(
+ hmsTable.getCatalog().getId(),
+ table.getDatabase().getFullName(),
+ table.getName());
+ }
+ Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index b9ad7ced616..54dae1df135 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -21,12 +21,16 @@
package org.apache.doris.planner;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
+import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.HiveProperties;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.qe.ConnectContext;
@@ -44,10 +48,12 @@ import org.apache.doris.thrift.THiveSerDeProperties;
import org.apache.doris.thrift.THiveTableSink;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -191,18 +197,40 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
}
private void setPartitionValues(THiveTableSink tSink) throws
AnalysisException {
+ if (ConnectContext.get().getExecutor() != null) {
+
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsStartTime();
+ }
+
List<THivePartition> partitions = new ArrayList<>();
- List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions =
- ((HMSExternalCatalog) targetTable.getCatalog())
-
.getClient().listPartitions(targetTable.getRemoteDbName(),
targetTable.getRemoteName());
- for (org.apache.hadoop.hive.metastore.api.Partition partition :
hivePartitions) {
+
+ List<HivePartition> hivePartitions;
+ if (targetTable.isPartitionedTable()) {
+ // Get partitions from cache instead of HMS client (similar to
HiveScanNode)
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog)
targetTable.getCatalog());
+ HiveMetaStoreCache.HivePartitionValues partitionValues =
+
targetTable.getHivePartitionValues(MvccUtil.getSnapshotFromContext(targetTable));
+ List<List<String>> partitionValuesList =
+ new
ArrayList<>(partitionValues.getPartitionValuesMap().values());
+ hivePartitions = cache.getAllPartitionsWithCache(targetTable,
partitionValuesList);
+ } else {
+ // Non-partitioned table, create dummy partition
+ hivePartitions = Lists.newArrayList();
+ StorageDescriptor sd = targetTable.getRemoteTable().getSd();
+ HivePartition dummyPartition = new
HivePartition(targetTable.getOrBuildNameMapping(), true,
+ sd.getInputFormat(), sd.getLocation(),
Lists.newArrayList(),
+ sd.getParameters() != null ? sd.getParameters() : new
HashMap<>());
+ hivePartitions.add(dummyPartition);
+ }
+
+ // Convert HivePartition to THivePartition (same logic as before)
+ for (HivePartition partition : hivePartitions) {
THivePartition hivePartition = new THivePartition();
- StorageDescriptor sd = partition.getSd();
-
hivePartition.setFileFormat(getTFileFormatType(sd.getInputFormat()));
+
hivePartition.setFileFormat(getTFileFormatType(partition.getInputFormat()));
+ hivePartition.setValues(partition.getPartitionValues());
- hivePartition.setValues(partition.getValues());
THiveLocationParams locationParams = new THiveLocationParams();
- String location = sd.getLocation();
+ String location = partition.getPath();
// pass the same of write path and target path to partition
locationParams.setWritePath(location);
locationParams.setTargetPath(location);
@@ -210,7 +238,12 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
hivePartition.setLocation(locationParams);
partitions.add(hivePartition);
}
+
tSink.setPartitions(partitions);
+
+ if (ConnectContext.get().getExecutor() != null) {
+
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsFinishTime();
+ }
}
private void setSerDeProperties(THiveTableSink tSink) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
index c8bc82e67a1..8f4adff727c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.ThriftHMSCachedClient;
import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.qe.ConnectContext;
import mockit.Mock;
import mockit.MockUp;
@@ -48,10 +49,13 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+
public class HiveTableSinkTest {
@Test
public void testBindDataSink() throws UserException {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setThreadLocalInfo();
new MockUp<ThriftHMSCachedClient>() {
@Mock
@@ -123,6 +127,11 @@ public class HiveTableSinkTest {
private void mockDifferLocationTable(String location) {
new MockUp<HMSExternalTable>() {
+ @Mock
+ public boolean isPartitionedTable() {
+ return false;
+ }
+
@Mock
public Set<String> getPartitionColumnNames() {
return new HashSet<String>() {{
diff --git
a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
index cc3425106a5..b8b3a39a2f9 100644
--- a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
+++ b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
@@ -77,6 +77,7 @@ suite("test_hive_partitions",
"p0,external,hive,external_docker,external_docker_
}
for (String hivePrefix : ["hive2", "hive3"]) {
+ setHivePrefix(hivePrefix)
try {
String hms_port = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
String catalog_name = "${hivePrefix}_test_partitions"
@@ -91,6 +92,102 @@ suite("test_hive_partitions",
"p0,external,hive,external_docker,external_docker_
q01()
+ // Test cache miss scenario: Hive adds partition, then Doris
writes to it
+ def test_cache_miss = {
+ def dbName = "test_cache_miss_db"
+ def tblName = "test_cache_miss_table"
+
+ try {
+ // Clean up
+ hive_docker """DROP TABLE IF EXISTS ${dbName}.${tblName}"""
+ hive_docker """DROP DATABASE IF EXISTS ${dbName}"""
+
+ // Create database and partitioned table in Hive
+ hive_docker """CREATE DATABASE ${dbName}"""
+ hive_docker """
+ CREATE TABLE ${dbName}.${tblName} (
+ id INT,
+ name STRING
+ )
+ PARTITIONED BY (pt INT)
+ STORED AS ORC
+ """
+
+ // Hive writes 3 partitions
+ hive_docker """
+ INSERT INTO ${dbName}.${tblName} PARTITION(pt=1)
+ VALUES (1, 'hive_pt1')
+ """
+ hive_docker """
+ INSERT INTO ${dbName}.${tblName} PARTITION(pt=2)
+ VALUES (2, 'hive_pt2')
+ """
+ hive_docker """
+ INSERT INTO ${dbName}.${tblName} PARTITION(pt=3)
+ VALUES (3, 'hive_pt3')
+ """
+
+ sql """refresh catalog `${catalog_name}`"""
+ // Doris reads data to populate cache (only knows about 3
partitions)
+ def result1 = sql """SELECT COUNT(*) as cnt FROM
`${catalog_name}`.`${dbName}`.`${tblName}`"""
+ assertEquals(3, result1[0][0])
+ logger.info("Doris cache populated with 3 partitions")
+
+ // Hive writes 4th partition (Doris cache doesn't know
about it)
+ hive_docker """
+ INSERT INTO ${dbName}.${tblName} PARTITION(pt=4)
+ VALUES (4, 'hive_pt4')
+ """
+ logger.info("Hive added 4th partition (pt=4)")
+
+ // Doris writes to the 4th partition
+ // This should trigger cache miss detection and treat as
APPEND instead of NEW
+ sql """
+ INSERT INTO `${catalog_name}`.`${dbName}`.`${tblName}`
+ VALUES (40, 'doris_pt4', 4)
+ """
+ logger.info("Doris wrote to 4th partition (should handle
cache miss)")
+
+ // Verify: should have 5 rows total (3 from hive + 1 from
hive pt4 + 1 from doris pt4)
+ def result2 = sql """SELECT COUNT(*) as cnt FROM
`${catalog_name}`.`${dbName}`.`${tblName}`"""
+ assertEquals(5, result2[0][0])
+
+ // Verify partition 4 has 2 rows
+ def result3 = sql """
+ SELECT COUNT(*) as cnt
+ FROM `${catalog_name}`.`${dbName}`.`${tblName}`
+ WHERE pt = 4
+ """
+ assertEquals(2, result3[0][0])
+
+ // Verify data content
+ def result4 = sql """
+ SELECT id, name
+ FROM `${catalog_name}`.`${dbName}`.`${tblName}`
+ WHERE pt = 4
+ ORDER BY id
+ """
+ assertEquals(2, result4.size())
+ assertEquals(4, result4[0][0])
+ assertEquals("hive_pt4", result4[0][1])
+ assertEquals(40, result4[1][0])
+ assertEquals("doris_pt4", result4[1][1])
+
+ logger.info("Cache miss test passed!")
+
+ } finally {
+ // Clean up
+ try {
+ hive_docker """DROP TABLE IF EXISTS
${dbName}.${tblName}"""
+ hive_docker """DROP DATABASE IF EXISTS ${dbName}"""
+ } catch (Exception e) {
+ logger.warn("Cleanup failed: ${e.message}")
+ }
+ }
+ }
+
+ test_cache_miss()
+
qt_string_partition_table_with_comma """
select * from
partition_tables.string_partition_table_with_comma order by id;
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]