morningman commented on code in PR #58166:
URL: https://github.com/apache/doris/pull/58166#discussion_r2544990667
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java:
##########
@@ -268,10 +268,52 @@ public void finishInsertTable(NameMapping nameMapping) {
insertExistsPartitions.add(Pair.of(pu,
hivePartitionStatistics));
break;
case NEW:
+ // Check if partition really exists in HMS (may be
cache miss in Doris)
+ List<String> partitionValues =
HiveUtil.toPartitionValues(pu.getName());
+ boolean existsInHMS = false;
+ try {
+ Partition hmsPartition =
hiveOps.getClient().getPartition(
+ nameMapping.getRemoteDbName(),
+ nameMapping.getRemoteTblName(),
+ partitionValues);
+ existsInHMS = (hmsPartition != null);
+ } catch (Exception e) {
+ // Partition not found in HMS, treat as truly new
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition {} not found in HMS, will
create it", pu.getName());
+ }
+ }
+
+ if (existsInHMS) {
+ // Partition exists in HMS but not in Doris cache
+ // Treat as APPEND instead of NEW to avoid
creation error
+ LOG.info("Partition {} already exists in HMS
(Doris cache miss), treating as APPEND",
+ pu.getName());
+ insertExistsPartitions.add(Pair.of(pu,
hivePartitionStatistics));
+ } else {
Review Comment:
this `else` block is same as `case OVERWRITE`.
extract a method for it.
Take care of `addPartition()` and `dropPartition()`
##########
fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java:
##########
@@ -191,26 +198,57 @@ private void setCompressType(THiveTableSink tSink,
TFileFormatType formatType) {
}
private void setPartitionValues(THiveTableSink tSink) throws
AnalysisException {
+ if (ConnectContext.get().getExecutor() != null) {
+
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsStartTime();
+ }
+
List<THivePartition> partitions = new ArrayList<>();
- List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions =
- ((HMSExternalCatalog) targetTable.getCatalog())
-
.getClient().listPartitions(targetTable.getRemoteDbName(),
targetTable.getRemoteName());
- for (org.apache.hadoop.hive.metastore.api.Partition partition :
hivePartitions) {
+
+ // Get partitions from cache instead of HMS client (similar to
HiveScanNode)
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog)
targetTable.getCatalog());
+
+ List<HivePartition> hivePartitions;
+ if (targetTable.isPartitionedTable()) {
+ // Get all partition values from cache
+ List<Type> partitionColumnTypes =
targetTable.getPartitionColumnTypes(
+ MvccUtil.getSnapshotFromContext(targetTable));
+ HiveMetaStoreCache.HivePartitionValues partitionValues =
Review Comment:
Rebase and use `targetTable.getHivePartitionValues()` directly
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java:
##########
@@ -561,6 +562,93 @@ public void invalidatePartitionCache(ExternalTable
dorisTable, String partitionN
}
}
+ /**
+ * Selectively refreshes cache for affected partitions based on update
information from BE.
+ * This method optimizes cache refresh by only invalidating specific
partitions
+ * rather than the entire table cache.
+ *
+ * @param table The Hive table whose partitions were modified
+ * @param partitionUpdates List of partition updates from BE
+ */
+ public void refreshAffectedPartitions(HMSExternalTable table,
+ List<org.apache.doris.thrift.THivePartitionUpdate>
partitionUpdates) {
+ if (partitionUpdates == null || partitionUpdates.isEmpty()) {
+ return;
+ }
+
+ NameMapping nameMapping = table.getOrBuildNameMapping();
+ long tableId = Util.genIdByName(nameMapping.getLocalDbName(),
nameMapping.getLocalTblName());
+
+ // Classify partition updates by type
+ Set<List<String>> modifiedPartitions = new HashSet<>();
+ boolean hasNewPartitions = false;
+
+ for (org.apache.doris.thrift.THivePartitionUpdate update :
partitionUpdates) {
+ List<String> partitionValues =
HiveUtil.toPartitionValues(update.getName());
Review Comment:
Move this line to `case OVERWRITE:`
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java:
##########
@@ -65,6 +73,19 @@ protected void doBeforeCommit() throws UserException {
HMSTransaction transaction = (HMSTransaction)
transactionManager.getTransaction(txnId);
loadedRows = transaction.getUpdateCnt();
transaction.finishInsertTable(((ExternalTable)
table).getOrBuildNameMapping());
+
+ // Save partition updates for cache refresh after commit
+ partitionUpdates = transaction.getHivePartitionUpdates();
+ }
+
+ @Override
+ protected void doAfterCommit() throws DdlException {
+ // For Hive tables, do selective partition refresh instead of full
table refresh
+ if (partitionUpdates != null && !partitionUpdates.isEmpty()) {
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog)
((HMSExternalTable) table).getCatalog());
+ cache.refreshAffectedPartitions((HMSExternalTable) table,
partitionUpdates);
Review Comment:
Before, the `Env.getCurrentEnv().getRefreshManager().handleRefreshTable()`
will write edit log, so that non-master FE will get the latest partition info.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java:
##########
@@ -561,6 +562,93 @@ public void invalidatePartitionCache(ExternalTable
dorisTable, String partitionN
}
}
+ /**
+ * Selectively refreshes cache for affected partitions based on update
information from BE.
+ * This method optimizes cache refresh by only invalidating specific
partitions
+ * rather than the entire table cache.
+ *
+ * @param table The Hive table whose partitions were modified
+ * @param partitionUpdates List of partition updates from BE
+ */
+ public void refreshAffectedPartitions(HMSExternalTable table,
+ List<org.apache.doris.thrift.THivePartitionUpdate>
partitionUpdates) {
+ if (partitionUpdates == null || partitionUpdates.isEmpty()) {
+ return;
+ }
+
+ NameMapping nameMapping = table.getOrBuildNameMapping();
+ long tableId = Util.genIdByName(nameMapping.getLocalDbName(),
nameMapping.getLocalTblName());
+
+ // Classify partition updates by type
+ Set<List<String>> modifiedPartitions = new HashSet<>();
+ boolean hasNewPartitions = false;
+
+ for (org.apache.doris.thrift.THivePartitionUpdate update :
partitionUpdates) {
+ List<String> partitionValues =
HiveUtil.toPartitionValues(update.getName());
+
+ switch (update.getUpdateMode()) {
+ case APPEND:
+ case OVERWRITE:
+ modifiedPartitions.add(partitionValues);
+ break;
+ case NEW:
+ hasNewPartitions = true;
+ break;
+ default:
+ LOG.warn("Unknown update mode {} for partition {}",
+ update.getUpdateMode(), update.getName());
+ break;
+ }
+ }
+
+ // Refresh file cache for modified partitions
+ if (!modifiedPartitions.isEmpty()) {
+ invalidateFileCache(nameMapping, tableId, modifiedPartitions);
+ }
+
+ // Refresh partition values cache if new partitions were created
+ if (hasNewPartitions) {
+ invalidatePartitionValuesCache(nameMapping);
Review Comment:
Can we just modify the cache value instead of invalidate it all?
Otherwise, for insert with new partition, it always invalidate all cache
##########
fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java:
##########
@@ -191,26 +198,57 @@ private void setCompressType(THiveTableSink tSink,
TFileFormatType formatType) {
}
private void setPartitionValues(THiveTableSink tSink) throws
AnalysisException {
+ if (ConnectContext.get().getExecutor() != null) {
+
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsStartTime();
+ }
+
List<THivePartition> partitions = new ArrayList<>();
- List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions =
- ((HMSExternalCatalog) targetTable.getCatalog())
-
.getClient().listPartitions(targetTable.getRemoteDbName(),
targetTable.getRemoteName());
- for (org.apache.hadoop.hive.metastore.api.Partition partition :
hivePartitions) {
+
+ // Get partitions from cache instead of HMS client (similar to
HiveScanNode)
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
Review Comment:
Move this line to `if (targetTable.isPartitionedTable()) `
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java:
##########
@@ -561,6 +562,93 @@ public void invalidatePartitionCache(ExternalTable
dorisTable, String partitionN
}
}
+ /**
+ * Selectively refreshes cache for affected partitions based on update
information from BE.
+ * This method optimizes cache refresh by only invalidating specific
partitions
+ * rather than the entire table cache.
+ *
+ * @param table The Hive table whose partitions were modified
+ * @param partitionUpdates List of partition updates from BE
+ */
+ public void refreshAffectedPartitions(HMSExternalTable table,
+ List<org.apache.doris.thrift.THivePartitionUpdate>
partitionUpdates) {
+ if (partitionUpdates == null || partitionUpdates.isEmpty()) {
+ return;
+ }
+
+ NameMapping nameMapping = table.getOrBuildNameMapping();
Review Comment:
Move these 2 lines right before `if (!modifiedPartitions.isEmpty()) {`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]