This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 1b97f99a07cf4d8746dc075e1431e0fcd3b64b3d Author: Xiangyu Wang <[email protected]> AuthorDate: Thu May 18 18:34:18 2023 +0800 [Fix](multi-catalog) Fix sync hms event failed. (#19555) A similar situation with #19344 , because sometimes hms meta info is newer than hms events, if we try to invoke org.apache.doris.datasource.hive.PooledHiveMetaStoreClient#getTable and this table is not exists, some error will throws and this event can not be handled. --- .../org/apache/doris/datasource/CatalogMgr.java | 28 +++++++++++++++++----- .../doris/datasource/ExternalMetaCacheMgr.java | 3 +-- .../doris/datasource/hive/HiveMetaStoreCache.java | 21 +++++++++------- .../datasource/hive/event/AlterPartitionEvent.java | 2 +- .../datasource/hive/event/MetastoreEvent.java | 3 +++ .../hive/event/MetastoreEventsProcessor.java | 10 ++++++++ .../apache/doris/datasource/CatalogMgrTest.java | 4 ++-- 7 files changed, 52 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index df74d9a123..d89faf4c27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -848,12 +848,13 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return; } + Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), + (ExternalTable) table, partitionNames); ExternalObjectLog log = new ExternalObjectLog(); log.setCatalogId(catalog.getId()); log.setDbId(db.getId()); log.setTableId(table.getId()); log.setPartitionNames(partitionNames); - replayAddExternalPartitions(log); Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log); } @@ -875,8 +876,14 @@ public class CatalogMgr implements Writable, GsonPostProcessable { LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); return; } - Env.getCurrentEnv().getExtMetaCacheMgr() + try { + Env.getCurrentEnv().getExtMetaCacheMgr() .addPartitionsCache(catalog.getId(), table, log.getPartitionNames()); + } catch (HMSClientException e) { + LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), + db.getFullName(), table.getName()); + } } public void dropExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames, @@ -937,23 +944,32 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } public void refreshExternalPartitions(String catalogName, String dbName, String tableName, - List<String> partitionNames) + List<String> partitionNames, boolean ignoreIfNotExists) throws DdlException { CatalogIf catalog = nameToCatalog.get(catalogName); if (catalog == null) { - throw new DdlException("No catalog found with name: " + catalogName); + if (!ignoreIfNotExists) { + throw new DdlException("No catalog found with name: " + catalogName); + } + return; } if (!(catalog instanceof ExternalCatalog)) { throw new DdlException("Only support ExternalCatalog"); } DatabaseIf db = catalog.getDbNullable(dbName); if (db == null) { - throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + if (!ignoreIfNotExists) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + return; } TableIf table = db.getTableNullable(tableName); if (table == null) { - throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + if (!ignoreIfNotExists) { + throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + } + return; } ExternalObjectLog log = new ExternalObjectLog(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index afe1cce61c..6d05eb2648 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -146,8 +146,7 @@ public class ExternalMetaCacheMgr { String dbName = ClusterNamespace.getNameFromFullName(table.getDbName()); HiveMetaStoreCache metaCache = cacheMap.get(catalogId); if (metaCache != null) { - metaCache.dropPartitionsCache(dbName, table.getName(), partitionNames, - ((HMSExternalTable) table).getPartitionColumnTypes(), true); + metaCache.dropPartitionsCache(dbName, table.getName(), partitionNames, true); } LOG.debug("drop partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 95f0b957c4..42391636d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -481,8 +481,8 @@ public class HiveMetaStoreCache { } public void dropPartitionsCache(String dbName, String tblName, List<String> partitionNames, - List<Type> partitionColumnTypes, boolean invalidPartitionCache) { - PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, partitionColumnTypes); + boolean invalidPartitionCache) { + PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, null); HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key); if (partitionValues == null) { return; @@ -505,17 +505,22 @@ public class HiveMetaStoreCache { idToPartitionItemBefore.remove(partitionId); partitionValuesMap.remove(partitionId); List<UniqueId> uniqueIds = idToUniqueIdsMapBefore.remove(partitionId); - if (key.types.size() > 1) { - for (UniqueId uniqueId : uniqueIds) { + for (UniqueId uniqueId : uniqueIds) { + if (uidToPartitionRangeBefore != null) { Range<PartitionKey> range = uidToPartitionRangeBefore.remove(uniqueId); - rangeToIdBefore.remove(range); + if (range != null) { + rangeToIdBefore.remove(range); + } } - } else { - for (UniqueId uniqueId : uniqueIds) { + + if (singleUidToColumnRangeMapBefore != null) { Range<ColumnBound> range = singleUidToColumnRangeMapBefore.remove(uniqueId); - singleColumnRangeMapBefore.remove(range); + if (range != null) { + singleColumnRangeMapBefore.remove(range); + } } } + if (invalidPartitionCache) { invalidatePartitionCache(dbName, tblName, partitionName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java index 1e2eb6d06c..bc0eeeee16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java @@ -87,7 +87,7 @@ public class AlterPartitionEvent extends MetastoreTableEvent { } else { Env.getCurrentEnv().getCatalogMgr() .refreshExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), - Lists.newArrayList(partitionNameAfter)); + Lists.newArrayList(partitionNameAfter), true); } } catch (DdlException e) { throw new MetastoreNotificationException( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java index 132496a9f4..9693bb0c4c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java @@ -128,6 +128,9 @@ public abstract class MetastoreEvent { /** * Process the information available in the NotificationEvent. + * Better not to call (direct/indirect) apis of {@link org.apache.doris.datasource.hive.PooledHiveMetaStoreClient} + * during handling hms events (Reference to https://github.com/apache/doris/pull/19120). + * Try to add some fallback strategies if it is highly necessary. */ protected abstract void process() throws MetastoreNotificationException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index ad89bb2fbe..624349f46d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -22,8 +22,10 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.HMSClientException; import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; @@ -92,6 +94,14 @@ public class MetastoreEventsProcessor extends MasterDaemon { for (MetastoreEvent event : events) { try { event.process(); + } catch (HMSClientException hmsClientException) { + if (hmsClientException.getCause() != null + && hmsClientException.getCause() instanceof NoSuchObjectException) { + LOG.warn(event.debugString("Failed to process event and skip"), hmsClientException); + } else { + hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1); + throw hmsClientException; + } } catch (Exception e) { hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1); throw e; diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java index 78ab98ef1e..c31320e719 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java @@ -458,7 +458,7 @@ public class CatalogMgrTest extends TestWithFeService { Lists.newArrayList("y=2020/m=1", "y=2020/m=2"), metaStoreCache); metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues); metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("y=2020/m=1", "y=2020/m=2"), - partitionValueCacheKey.getTypes(), false); + false); HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey); Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 0); } @@ -488,7 +488,7 @@ public class CatalogMgrTest extends TestWithFeService { Lists.newArrayList("m=1", "m=2"), metaStoreCache); metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues); metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("m=1", "m=2"), - partitionValueCacheKey.getTypes(), false); + false); HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey); Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 0); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
