This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 23bc28feb15 branch-4.0: [fix](hive) fix invalid edit log after
inserting hive partition table (#58747)
23bc28feb15 is described below
commit 23bc28feb15e9dc34b664cf8cb4b8701b94977ec
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Fri Dec 5 17:48:03 2025 +0800
branch-4.0: [fix](hive) fix invalid edit log after inserting hive partition
table (#58747)
bp #58748
---
.../org/apache/doris/catalog/RefreshManager.java | 23 +++++++++++++------
.../apache/doris/datasource/ExternalObjectLog.java | 14 ++++++++++--
.../doris/datasource/hive/HiveMetaStoreCache.java | 26 +++++++++++++---------
.../plans/commands/insert/HiveInsertExecutor.java | 21 ++++++-----------
4 files changed, 51 insertions(+), 33 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 67c87688994..9738ee39355 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -26,7 +26,9 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.persist.OperationType;
import com.google.common.base.Strings;
@@ -183,13 +185,20 @@ public class RefreshManager {
db.get().unregisterTable(log.getTableName());
db.get().resetMetaCacheNames();
} else {
- List<String> partitionNames = log.getPartitionNames();
- if (partitionNames != null && !partitionNames.isEmpty()) {
- // Partition-level cache invalidation
- Env.getCurrentEnv().getExtMetaCacheMgr()
- .invalidatePartitionsCache(table.get(),
partitionNames);
- LOG.info("replay refresh partitions for table {}, partitions
count: {}",
- table.get().getName(), partitionNames.size());
+ List<String> modifiedPartNames = log.getPartitionNames();
+ List<String> newPartNames = log.getNewPartitionNames();
+ if (catalog instanceof HMSExternalCatalog
+ && ((modifiedPartNames != null &&
!modifiedPartNames.isEmpty())
+ || (newPartNames != null && !newPartNames.isEmpty()))) {
+ // Partition-level cache invalidation, only for hive catalog
+ HiveMetaStoreCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog) catalog);
+ cache.refreshAffectedPartitionsCache((HMSExternalTable)
table.get(), modifiedPartNames, newPartNames);
+ LOG.info("replay refresh partitions for table {}, "
+ + "modified partitions count: {}, "
+ + "new partitions count: {}",
+ table.get().getName(), modifiedPartNames == null ? 0 :
modifiedPartNames.size(),
+ newPartNames == null ? 0 : newPartNames.size());
} else {
// Full table cache invalidation
refreshTableInternal(db.get(), table.get(),
log.getLastUpdateTime());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
index 313f6fc403f..43a0675d783 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -58,6 +58,9 @@ public class ExternalObjectLog implements Writable {
@SerializedName(value = "partitionNames")
private List<String> partitionNames;
+ @SerializedName(value = "newPartitionNames")
+ private List<String> newPartitionNames;
+
@SerializedName(value = "lastUpdateTime")
private long lastUpdateTime;
@@ -81,12 +84,13 @@ public class ExternalObjectLog implements Writable {
}
public static ExternalObjectLog createForRefreshPartitions(long catalogId,
String dbName, String tblName,
- List<String> partitionNames) {
+ List<String> modifiedPartNames, List<String> newPartNames) {
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
externalObjectLog.setCatalogId(catalogId);
externalObjectLog.setDbName(dbName);
externalObjectLog.setTableName(tblName);
- externalObjectLog.setPartitionNames(partitionNames);
+ externalObjectLog.setPartitionNames(modifiedPartNames);
+ externalObjectLog.setNewPartitionNames(newPartNames);
return externalObjectLog;
}
@@ -134,6 +138,12 @@ public class ExternalObjectLog implements Writable {
} else {
sb.append("tableId: " + tableId + "]");
}
+ if (partitionNames != null && !partitionNames.isEmpty()) {
+ sb.append(", partitionNames: " + partitionNames);
+ }
+ if (newPartitionNames != null && !newPartitionNames.isEmpty()) {
+ sb.append(", newPartitionNames: " + newPartitionNames);
+ }
return sb.toString();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 82504111e34..c6ceb2f4f20 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -573,16 +573,16 @@ public class HiveMetaStoreCache {
*
* @param table The Hive table whose partitions were modified
* @param partitionUpdates List of partition updates from BE
+ * @param modifiedPartNames Output list to collect names of modified
partitions
+ * @param newPartNames Output list to collect names of new partitions
*/
public void refreshAffectedPartitions(HMSExternalTable table,
- List<org.apache.doris.thrift.THivePartitionUpdate>
partitionUpdates) {
+ List<org.apache.doris.thrift.THivePartitionUpdate>
partitionUpdates,
+ List<String> modifiedPartNames, List<String> newPartNames) {
if (partitionUpdates == null || partitionUpdates.isEmpty()) {
return;
}
- List<String> modifiedPartitionNames = new ArrayList<>();
- List<String> newPartitionNames = new ArrayList<>();
-
for (org.apache.doris.thrift.THivePartitionUpdate update :
partitionUpdates) {
String partitionName = update.getName();
// Skip if partition name is null/empty (non-partitioned table
case)
@@ -593,10 +593,10 @@ public class HiveMetaStoreCache {
switch (update.getUpdateMode()) {
case APPEND:
case OVERWRITE:
- modifiedPartitionNames.add(partitionName);
+ modifiedPartNames.add(partitionName);
break;
case NEW:
- newPartitionNames.add(partitionName);
+ newPartNames.add(partitionName);
break;
default:
LOG.warn("Unknown update mode {} for partition {}",
@@ -605,20 +605,26 @@ public class HiveMetaStoreCache {
}
}
+ refreshAffectedPartitionsCache(table, modifiedPartNames, newPartNames);
+ }
+
+ public void refreshAffectedPartitionsCache(HMSExternalTable table,
+ List<String> modifiedPartNames, List<String> newPartNames) {
+
// Invalidate cache for modified partitions (both partition cache and
file cache)
- for (String partitionName : modifiedPartitionNames) {
+ for (String partitionName : modifiedPartNames) {
invalidatePartitionCache(table, partitionName);
}
// Add new partitions to partition values cache
- if (!newPartitionNames.isEmpty()) {
- addPartitionsCache(table.getOrBuildNameMapping(),
newPartitionNames,
+ if (!newPartNames.isEmpty()) {
+ addPartitionsCache(table.getOrBuildNameMapping(), newPartNames,
table.getPartitionColumnTypes(Optional.empty()));
}
// Log summary
LOG.info("Refreshed cache for table {}: {} modified partitions, {} new
partitions",
- table.getName(), modifiedPartitionNames.size(),
newPartitionNames.size());
+ table.getName(), modifiedPartNames.size(),
newPartNames.size());
}
public void invalidateDbCache(String dbName) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index fed495dbb35..9f8be52afec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -34,10 +34,10 @@ import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionType;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -86,20 +86,12 @@ public class HiveInsertExecutor extends
BaseExternalTableInsertExecutor {
// For partitioned tables, do selective partition refresh
// For non-partitioned tables, do full table cache invalidation
- List<String> affectedPartitionNames = null;
+ List<String> modifiedPartNames = Lists.newArrayList();
+ List<String> newPartNames = Lists.newArrayList();
if (hmsTable.isPartitionedTable() && partitionUpdates != null &&
!partitionUpdates.isEmpty()) {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
- cache.refreshAffectedPartitions(hmsTable, partitionUpdates);
-
- // Collect partition names for edit log
- affectedPartitionNames = new ArrayList<>();
- for (THivePartitionUpdate update : partitionUpdates) {
- String partitionName = update.getName();
- if (partitionName != null && !partitionName.isEmpty()) {
- affectedPartitionNames.add(partitionName);
- }
- }
+ cache.refreshAffectedPartitions(hmsTable, partitionUpdates,
modifiedPartNames, newPartNames);
} else {
// Non-partitioned table or no partition updates, do full table
refresh
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable);
@@ -107,13 +99,14 @@ public class HiveInsertExecutor extends
BaseExternalTableInsertExecutor {
// Write edit log to notify other FEs
ExternalObjectLog log;
- if (affectedPartitionNames != null &&
!affectedPartitionNames.isEmpty()) {
+ if (!modifiedPartNames.isEmpty() || !newPartNames.isEmpty()) {
// Partition-level refresh for other FEs
log = ExternalObjectLog.createForRefreshPartitions(
hmsTable.getCatalog().getId(),
table.getDatabase().getFullName(),
table.getName(),
- affectedPartitionNames);
+ modifiedPartNames,
+ newPartNames);
} else {
// Full table refresh for other FEs
log = ExternalObjectLog.createForRefreshTable(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]