This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new d054ab9dc7 [feature-wip](multi-catalog) support automatic sync hive
metastore events (#15401)
d054ab9dc7 is described below
commit d054ab9dc7fb4d19aeceb1618430c15b4070319e
Author: zhangdong <[email protected]>
AuthorDate: Tue Jan 3 13:59:14 2023 +0800
[feature-wip](multi-catalog) support automatic sync hive metastore events
(#15401)
Poll metastore for create/alter/drop operations on database, table,
partition events at a given frequency.
By observing such events, we can take appropriate action on the
(refresh/invalidate/add/remove)
so that represents the latest information available in metastore.
We keep track of the last synced event id in each polling
iteration so the next batch can be requested appropriately.
---
.../java/org/apache/doris/catalog/DatabaseIf.java | 2 +
.../main/java/org/apache/doris/catalog/Env.java | 7 +
.../doris/catalog/external/ExternalDatabase.java | 5 +
.../catalog/external/HMSExternalDatabase.java | 10 +
.../main/java/org/apache/doris/common/Config.java | 18 ++
.../org/apache/doris/datasource/CatalogMgr.java | 53 +++++-
.../apache/doris/datasource/ExternalCatalog.java | 3 +
.../doris/datasource/HMSExternalCatalog.java | 54 ++++++
.../datasource/PooledHiveMetaStoreClient.java | 27 +++
.../datasource/hive/event/DropTableEvent.java | 89 +++++++++
.../doris/datasource/hive/event/EventFactory.java | 32 ++++
.../doris/datasource/hive/event/IgnoredEvent.java | 43 +++++
.../datasource/hive/event/MetastoreEvent.java | 203 +++++++++++++++++++++
.../hive/event/MetastoreEventFactory.java | 81 ++++++++
.../datasource/hive/event/MetastoreEventType.java | 68 +++++++
.../hive/event/MetastoreEventsProcessor.java | 151 +++++++++++++++
.../hive/event/MetastoreNotificationException.java | 37 ++++
.../event/MetastoreNotificationFetchException.java | 37 ++++
.../datasource/hive/event/MetastoreTableEvent.java | 50 +++++
.../org/apache/doris/journal/JournalEntity.java | 1 +
.../java/org/apache/doris/persist/EditLog.java | 9 +
.../org/apache/doris/persist/OperationType.java | 2 +
22 files changed, 976 insertions(+), 6 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
index 80a7f61cc9..ee07b41872 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
@@ -204,4 +204,6 @@ public interface DatabaseIf<T extends TableIf> {
}
return (OlapTable) table;
}
+
+ void dropTable(String tableName);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index da7df2caef..5ca6dab330 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -126,6 +126,7 @@ import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
import org.apache.doris.deploy.DeployManager;
import org.apache.doris.deploy.impl.AmbariDeployManager;
import org.apache.doris.deploy.impl.K8sDeployManager;
@@ -318,6 +319,7 @@ public class Env {
private DeleteHandler deleteHandler;
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
+ private MetastoreEventsProcessor metastoreEventsProcessor;
private MasterDaemon labelCleaner; // To clean old LabelInfo,
ExportJobInfos
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
@@ -554,6 +556,7 @@ public class Env {
this.deleteHandler = new DeleteHandler();
this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
this.partitionInMemoryInfoCollector = new
PartitionInMemoryInfoCollector();
+ this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.replayedJournalId = new AtomicLong(0L);
this.isElectable = false;
@@ -1427,6 +1430,10 @@ public class Env {
this.statisticsJobScheduler.start();
this.statisticsTaskScheduler.start();
new InternalSchemaInitializer().start();
+ if (Config.enable_hms_events_incremental_sync) {
+ metastoreEventsProcessor.start();
+ }
+
}
// start threads that should running on all FE
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index 6ae8594c07..65c027713e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -258,4 +258,9 @@ public class ExternalDatabase<T extends ExternalTable>
implements DatabaseIf<T>,
@Override
public void gsonPostProcess() throws IOException {}
+
+ @Override
+ public void dropTable(String tableName) {
+ throw new NotImplementedException();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index decef86caa..a1f6bcddab 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -170,4 +170,14 @@ public class HMSExternalDatabase extends
ExternalDatabase<HMSExternalTable> impl
idToTbl.put(tbl.getId(), tbl);
tableNameToId.put(tbl.getName(), tbl.getId());
}
+
+ @Override
+ public void dropTable(String tableName) {
+ LOG.debug("drop table [{}]", tableName);
+ Long tableId = tableNameToId.remove(tableName);
+ if (tableId == null) {
+ LOG.warn("drop table [{}] failed", tableName);
+ }
+ idToTbl.remove(tableId);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 52840060cd..e611614f36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1941,5 +1941,23 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static boolean enable_func_pushdown = true;
+
+ /**
+ * If set to true, doris will automatically synchronize hms metadata to
the cache in fe.
+ */
+ @ConfField(masterOnly = true)
+ public static boolean enable_hms_events_incremental_sync = false;
+
+ /**
+ * Maximum number of events to poll in each RPC.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int hms_events_batch_size_per_rpc = 500;
+
+ /**
+ * HMS polling interval in milliseconds.
+ */
+ @ConfField(masterOnly = true)
+ public static int hms_events_polling_interval_ms = 20000;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index d969ed1ef6..e6cf9a4ee1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ReferenceType;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.cluster.ClusterNamespace;
@@ -428,13 +429,17 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
* Refresh the catalog meta and write the meta log.
*/
public void refreshCatalog(RefreshCatalogStmt stmt) throws UserException {
+ CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
stmt.getCatalogName());
+ }
+ CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(),
stmt);
+ refreshCatalog(log);
+ }
+
+ public void refreshCatalog(CatalogLog log) {
writeLock();
try {
- CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
- if (catalog == null) {
- throw new DdlException("No catalog found with name: " +
stmt.getCatalogName());
- }
- CatalogLog log =
CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
replayRefreshCatalog(log);
Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_REFRESH_CATALOG,
log);
} finally {
@@ -470,7 +475,7 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
/**
* Reply for refresh catalog event.
*/
- public void replayRefreshCatalog(CatalogLog log) throws DdlException {
+ public void replayRefreshCatalog(CatalogLog log) {
writeLock();
try {
unprotectedRefreshCatalog(log.getCatalogId(),
log.isInvalidCache());
@@ -543,6 +548,42 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
.invalidateTableCache(catalog.getId(), db.getFullName(),
table.getName());
}
+ public void dropExternalTable(String dbName, String tableName, String
catalogName) throws DdlException {
+ CatalogIf catalog = nameToCatalog.get(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support drop ExternalCatalog Tables");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbName);
+ if (db == null) {
+ throw new DdlException("Database " + dbName + " does not exist in
catalog " + catalog.getName());
+ }
+
+ TableIf table = db.getTableNullable(tableName);
+ if (table == null) {
+ throw new DdlException("Table " + tableName + " does not exist in
db " + dbName);
+ }
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(db.getId());
+ log.setTableId(table.getId());
+ replayDropExternalTable(log);
+ Env.getCurrentEnv().getEditLog().logDropExternalTable(log);
+ }
+
+ public void replayDropExternalTable(ExternalObjectLog log) {
+
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]",
log.getCatalogId(), log.getDbId(),
+ log.getTableId());
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ ExternalTable table = db.getTableForReplay(log.getTableId());
+ db.dropTable(table.getName());
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+ .invalidateTableCache(catalog.getId(), db.getFullName(),
table.getName());
+ }
+
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index ddbb05d956..22726c3b96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -301,6 +301,9 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
db.setTableExtCatalog(this);
}
objectCreated = false;
+ if (this instanceof HMSExternalCatalog) {
+ ((HMSExternalCatalog) this).setLastSyncedEventId(-1L);
+ }
}
public void addDatabaseForTest(ExternalDatabase db) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 90233c6705..5556535b6c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -25,13 +25,19 @@ import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.common.Config;
+import
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
@@ -41,8 +47,12 @@ import java.util.Map;
* External catalog for hive metastore compatible data sources.
*/
public class HMSExternalCatalog extends ExternalCatalog {
+ private static final Logger LOG =
LogManager.getLogger(HMSExternalCatalog.class);
+
private static final int MAX_CLIENT_POOL_SIZE = 8;
protected PooledHiveMetaStoreClient client;
+ // Record the latest synced event id when processing hive events
+ private long lastSyncedEventId;
/**
* Default constructor for HMSExternalCatalog.
@@ -160,4 +170,48 @@ public class HMSExternalCatalog extends ExternalCatalog {
}
return tmpSchema;
}
+
+ public void setLastSyncedEventId(long lastSyncedEventId) {
+ this.lastSyncedEventId = lastSyncedEventId;
+ }
+
+ public NotificationEventResponse getNextEventResponse(HMSExternalCatalog
hmsExternalCatalog)
+ throws MetastoreNotificationFetchException {
+ makeSureInitialized();
+ if (lastSyncedEventId < 0) {
+ lastSyncedEventId = getCurrentEventId();
+ refreshCatalog(hmsExternalCatalog);
+ LOG.info(
+ "First pulling events on catalog [{}],refreshCatalog and
init lastSyncedEventId,"
+ + "lastSyncedEventId is [{}]",
+ hmsExternalCatalog.getName(), lastSyncedEventId);
+ return null;
+ }
+
+ long currentEventId = getCurrentEventId();
+ LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is
{},lastSyncedEventId is {}",
+ hmsExternalCatalog.getName(), currentEventId,
lastSyncedEventId);
+ if (currentEventId == lastSyncedEventId) {
+ LOG.info("Event id not updated when pulling events on catalog
[{}]", hmsExternalCatalog.getName());
+ return null;
+ }
+ return client.getNextNotification(lastSyncedEventId,
Config.hms_events_batch_size_per_rpc, null);
+ }
+
+ private void refreshCatalog(HMSExternalCatalog hmsExternalCatalog) {
+ CatalogLog log = new CatalogLog();
+ log.setCatalogId(hmsExternalCatalog.getId());
+ log.setInvalidCache(true);
+ Env.getCurrentEnv().getCatalogMgr().refreshCatalog(log);
+ }
+
+ private long getCurrentEventId() {
+ makeSureInitialized();
+ CurrentNotificationEventId currentNotificationEventId =
client.getCurrentNotificationEventId();
+ if (currentNotificationEventId == null) {
+ LOG.warn("Get currentNotificationEventId is null");
+ return -1;
+ }
+ return currentNotificationEventId.getEventId();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
index 05e3d9d15c..008253c450 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
@@ -19,6 +19,7 @@ package org.apache.doris.datasource;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.common.Config;
+import
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.google.common.base.Preconditions;
@@ -29,8 +30,10 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
@@ -145,6 +148,30 @@ public class PooledHiveMetaStoreClient {
}
}
+ public CurrentNotificationEventId getCurrentNotificationEventId() {
+ try (CachedClient client = getClient()) {
+ return client.client.getCurrentNotificationEventId();
+ } catch (Exception e) {
+ LOG.warn("Failed to fetch current notification event id", e);
+ throw new MetastoreNotificationFetchException(
+ "Failed to get current notification event id. msg: " +
e.getMessage());
+ }
+ }
+
+ public NotificationEventResponse getNextNotification(long lastEventId,
+ int maxEvents,
+ IMetaStoreClient.NotificationFilter filter)
+ throws MetastoreNotificationFetchException {
+ try (CachedClient client = getClient()) {
+ return client.client.getNextNotification(lastEventId, maxEvents,
filter);
+ } catch (Exception e) {
+ LOG.warn("Failed to get next notification based on last event id
{}", lastEventId, e);
+ throw new MetastoreNotificationFetchException(
+ "Failed to get next notification based on last event id: "
+ lastEventId + ". msg: " + e
+ .getMessage());
+ }
+ }
+
private class CachedClient implements AutoCloseable {
private final IMetaStoreClient client;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
new file mode 100644
index 0000000000..8647e47b78
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * MetastoreEvent for DROP_TABLE event type
+ */
+public class DropTableEvent extends MetastoreTableEvent {
+ private static final Logger LOG =
LogManager.getLogger(DropTableEvent.class);
+ private final String dbName;
+ private final String tableName;
+
+ private DropTableEvent(NotificationEvent event,
+ String catalogName) {
+ super(event, catalogName);
+
Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType()));
+ JSONDropTableMessage dropTableMessage =
+ (JSONDropTableMessage)
MetastoreEventsProcessor.getMessageDeserializer()
+ .getDropTableMessage(event.getMessage());
+ try {
+ dbName = dropTableMessage.getDB();
+ tableName = dropTableMessage.getTable();
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(debugString(
+ "Could not parse event message. "
+ + "Check if %s is set to true in metastore
configuration",
+
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
+ }
+ }
+
+ public static List<MetastoreEvent> getEvents(NotificationEvent event,
+ String catalogName) {
+ return Lists.newArrayList(new DropTableEvent(event, catalogName));
+ }
+
+ @Override
+ protected boolean existInCache() {
+ return true;
+ }
+
+ @Override
+ protected boolean canBeSkipped() {
+ return false;
+ }
+
+ protected boolean isSupported() {
+ return true;
+ }
+
+ @Override
+ protected void process() throws MetastoreNotificationException {
+ try {
+ LOG.info("DropTable event
process,catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName,
+ tableName);
+ Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName,
tableName, catalogName);
+ } catch (DdlException e) {
+ LOG.warn("DropExternalTable
failed,dbName:[{}],tableName:[{}],catalogName:[{}].", dbName, tableName,
+ catalogName, e);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java
new file mode 100644
index 0000000000..333687e2ab
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.util.List;
+
+/**
+ * Factory interface to generate a {@link MetastoreEvent} from a {@link
NotificationEvent} object.
+ */
+public interface EventFactory {
+
+ List<MetastoreEvent>
transferNotificationEventToMetastoreEvents(NotificationEvent hmsEvent,
+ String catalogName) throws MetastoreNotificationException;
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
new file mode 100644
index 0000000000..4d2dc1a178
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.util.List;
+
+/**
+ * An event type which is ignored. Useful for unsupported metastore event types
+ */
+public class IgnoredEvent extends MetastoreEvent {
+ protected IgnoredEvent(NotificationEvent event, String catalogName) {
+ super(event, catalogName);
+ }
+
+ private static List<MetastoreEvent> getEvents(NotificationEvent event,
+ String catalogName) {
+ return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+ }
+
+ @Override
+ public void process() {
+ debugLog("Ignoring unknown event type " +
metastoreNotificationEvent.getEventType());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
new file mode 100644
index 0000000000..5cc4594457
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Abstract base class for all MetastoreEvents. A MetastoreEvent is an object
used to
+ * process a NotificationEvent received from metastore.
+ */
+public abstract class MetastoreEvent {
+ private static final Logger LOG =
LogManager.getLogger(MetastoreEvent.class);
+ // String.format compatible string to prepend event id and type
+ private static final String STR_FORMAT_EVENT_ID_TYPE = "EventId: %d
EventType: %s ";
+
+ // logger format compatible string to prepend to a log formatted message
+ private static final String LOG_FORMAT_EVENT_ID_TYPE = "EventId: {}
EventType: {} ";
+
+ // the notification received from metastore which is processed by this
+ protected final NotificationEvent event;
+
+ // dbName from the event
+ protected final String dbName;
+
+ // tblName from the event
+ protected final String tblName;
+
+ // eventId of the event. Used instead of calling getter on event everytime
+ private final long eventId;
+
+ // eventType from the NotificationEvent
+ private final MetastoreEventType eventType;
+
+ // Actual notificationEvent object received from Metastore
+ protected final NotificationEvent metastoreNotificationEvent;
+
+ protected final String catalogName;
+
+ protected MetastoreEvent(NotificationEvent event, String catalogName) {
+ this.event = event;
+ this.dbName = event.getDbName();
+ this.tblName = event.getTableName();
+ this.eventId = event.getEventId();
+ this.eventType = MetastoreEventType.from(event.getEventType());
+ this.metastoreNotificationEvent = event;
+ this.catalogName = catalogName;
+ }
+
+ public long getEventId() {
+ return eventId;
+ }
+
+ public MetastoreEventType getEventType() {
+ return eventType;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTblName() {
+ return tblName;
+ }
+
+ /**
+ * Checks if the given event can be batched into this event. Default
behavior is
+ * to return false which can be overridden by a sub-class.
+ * The current version is relatively simple to process batch events, so
all that need to be processed are true.
+ *
+ * @param event The event under consideration to be batched into this
event.
+ * @return false if event cannot be batched into this event; otherwise
true.
+ */
+ protected boolean canBeBatched(MetastoreEvent event) {
+ return false;
+ }
+
+ /**
+ * Adds the given event into the batch of events represented by this
event. Default
+ * implementation is to return null. Sub-classes must override this method
to
+ * implement batching.
+ *
+ * @param event The event which needs to be added to the batch.
+ * @return The batch event which represents all the events batched into
this event
+ * until now including the given event.
+ */
+ protected MetastoreEvent addToBatchEvents(MetastoreEvent event) {
+ return null;
+ }
+
+
+ protected boolean existInCache() throws MetastoreNotificationException {
+ return false;
+ }
+
+ /**
+ * Returns the number of events represented by this event. For most events
this is 1.
+ * In case of batch events this could be more than 1.
+ */
+ protected int getNumberOfEvents() {
+ return 1;
+ }
+
+ /**
+ * Certain events like ALTER_TABLE or ALTER_PARTITION implement logic to
ignore
+ * some events because they do not affect query results.
+ *
+ * @return true if this event can be skipped.
+ */
+ protected boolean canBeSkipped() {
+ return false;
+ }
+
+ /**
+ * Whether the current version of FE supports processing of some events,
some events are reserved,
+ * and may be processed later version.
+ */
+ protected boolean isSupported() {
+ return false;
+ }
+
+ /**
+ * Process the information available in the NotificationEvent.
+ */
+ protected abstract void process() throws MetastoreNotificationException;
+
+ /**
+ * Helper method to get debug string with helpful event information
prepended to the
+ * message. This can be used to generate helpful exception messages
+ *
+ * @param msgFormatString String value to be used in String.format() for
the given message
+ * @param args args to the <code>String.format()</code> for the given
msgFormatString
+ */
+ protected String debugString(String msgFormatString, Object... args) {
+ String formatString = STR_FORMAT_EVENT_ID_TYPE + msgFormatString;
+ Object[] formatArgs = getLogFormatArgs(args);
+ return String.format(formatString, formatArgs);
+ }
+
+ /**
+ * Helper method to generate the format args after prepending the event id
and type
+ */
+ private Object[] getLogFormatArgs(Object[] args) {
+ Object[] formatArgs = new Object[args.length + 2];
+ formatArgs[0] = getEventId();
+ formatArgs[1] = getEventType();
+ int i = 2;
+ for (Object arg : args) {
+ formatArgs[i] = arg;
+ i++;
+ }
+ return formatArgs;
+ }
+
+ /**
+ * Logs at info level the given log formatted string and its args. The log
formatted
+ * string should have {} pair at the appropriate location in the string
for each arg
+ * value provided. This method prepends the event id and event type before
logging the
+ * message. No-op if the log level is not at INFO
+ */
+ protected void infoLog(String logFormattedStr, Object... args) {
+ if (!LOG.isInfoEnabled()) {
+ return;
+ }
+ String formatString = LOG_FORMAT_EVENT_ID_TYPE + logFormattedStr;
+ Object[] formatArgs = getLogFormatArgs(args);
+ LOG.info(formatString, formatArgs);
+ }
+
+ /**
+ * Similar to infoLog excepts logs at debug level
+ */
+ protected void debugLog(String logFormattedStr, Object... args) {
+ if (!LOG.isDebugEnabled()) {
+ return;
+ }
+ String formatString = LOG_FORMAT_EVENT_ID_TYPE + logFormattedStr;
+ Object[] formatArgs = getLogFormatArgs(args);
+ LOG.debug(formatString, formatArgs);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
new file mode 100644
index 0000000000..2719158c8e
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.datasource.HMSExternalCatalog;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Factory class to create various MetastoreEvents.
+ */
+public class MetastoreEventFactory implements EventFactory {
+ private static final Logger LOG =
LogManager.getLogger(MetastoreEventFactory.class);
+
+ @Override
+ public List<MetastoreEvent>
transferNotificationEventToMetastoreEvents(NotificationEvent event,
+ String catalogName) {
+ Preconditions.checkNotNull(event.getEventType());
+ MetastoreEventType metastoreEventType =
MetastoreEventType.from(event.getEventType());
+ switch (metastoreEventType) {
+ case DROP_TABLE:
+ return DropTableEvent.getEvents(event, catalogName);
+ default:
+ // ignore all the unknown events by creating a IgnoredEvent
+ return Lists.newArrayList(new IgnoredEvent(event,
catalogName));
+ }
+ }
+
+ List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events,
HMSExternalCatalog hmsExternalCatalog) {
+ List<MetastoreEvent> metastoreEvents = Lists.newArrayList();
+
+ for (NotificationEvent event : events) {
+
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event,
hmsExternalCatalog.getName()));
+ }
+
+ List<MetastoreEvent> tobeProcessEvents = metastoreEvents.stream()
+ .filter(MetastoreEvent::isSupported)
+ .collect(Collectors.toList());
+
+ if (tobeProcessEvents.isEmpty()) {
+ LOG.info("The metastore events to process is empty on catalog {}",
hmsExternalCatalog.getName());
+ return Collections.emptyList();
+ }
+
+ return createBatchEvents(tobeProcessEvents);
+ }
+
+ /**
+ * Create batch event tasks according to HivePartitionName to facilitate
subsequent parallel processing.
+ * For ADD_PARTITION and DROP_PARTITION, we directly override any events
before that partition.
+ * For a partition, it is meaningless to process any events before the
drop partition.
+ */
+ List<MetastoreEvent> createBatchEvents(List<MetastoreEvent> events) {
+ // now do nothing
+ return events;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java
new file mode 100644
index 0000000000..31dce29366
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+/**
+ * Currently we only support handling some events.
+ */
+public enum MetastoreEventType {
+ CREATE_TABLE("CREATE_TABLE"),
+ DROP_TABLE("DROP_TABLE"),
+ ALTER_TABLE("ALTER_TABLE"),
+ CREATE_DATABASE("CREATE_DATABASE"),
+ DROP_DATABASE("DROP_DATABASE"),
+ ALTER_DATABASE("ALTER_DATABASE"),
+ ADD_PARTITION("ADD_PARTITION"),
+ ALTER_PARTITION("ALTER_PARTITION"),
+ ALTER_PARTITIONS("ALTER_PARTITIONS"),
+ DROP_PARTITION("DROP_PARTITION"),
+ INSERT("INSERT"),
+ INSERT_PARTITIONS("INSERT_PARTITIONS"),
+ ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"),
+ COMMIT_TXN("COMMIT_TXN"),
+ ABORT_TXN("ABORT_TXN"),
+ OTHER("OTHER");
+
+ private final String eventType;
+
+ MetastoreEventType(String msEventType) {
+ this.eventType = msEventType;
+ }
+
+ @Override
+ public String toString() {
+ return eventType;
+ }
+
+ /**
+ * Returns the MetastoreEventType from a given string value of event from
Metastore's
+ * NotificationEvent.eventType. If none of the supported
MetastoreEventTypes match,
+ * return OTHER
+ *
+ * @param eventType EventType value from the {@link
org.apache.hadoop.hive.metastore.api.NotificationEvent}
+ */
+ public static MetastoreEventType from(String eventType) {
+ for (MetastoreEventType metastoreEventType : values()) {
+ if (metastoreEventType.eventType.equalsIgnoreCase(eventType)) {
+ return metastoreEventType;
+ }
+ }
+ return OTHER;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
new file mode 100644
index 0000000000..1ff3bd98b2
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.HMSExternalCatalog;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A metastore event is a instance of the class
+ * {@link NotificationEvent}. Metastore can be
+ * configured, to work with Listeners which are called on various DDL
operations like
+ * create/alter/drop operations on database, table, partition etc. Each event
has a unique
+ * incremental id and the generated events are be fetched from Metastore to get
+ * incremental updates to the metadata stored in Hive metastore using the the
public API
+ * <code>get_next_notification</code> These events could be generated by
external
+ * Metastore clients like Apache Hive or Apache Spark configured to talk with
the same metastore.
+ * <p>
+ * This class is used to poll metastore for such events at a given frequency.
By observing
+ * such events, we can take appropriate action on the {@link
org.apache.doris.datasource.hive.HiveMetaStoreCache}
+ * (refresh/invalidate/add/remove) so that represents the latest information
+ * available in metastore. We keep track of the last synced event id in each
polling
+ * iteration so the next batch can be requested appropriately. The current
batch size is
+ * constant and set to {@link
org.apache.doris.common.Config#hms_events_batch_size_per_rpc}.
+ */
+public class MetastoreEventsProcessor extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(MetastoreEventsProcessor.class);
+ public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY =
+ "hive.metastore.notifications.add.thrift.objects";
+
+ // for deserializing from JSON strings from metastore event
+ private static final MessageDeserializer MESSAGE_DESERIALIZER = new
JSONMessageDeserializer();
+
+
+ // event factory which is used to get or create MetastoreEvents
+ private final MetastoreEventFactory metastoreEventFactory;
+
+ private boolean isRunning;
+
+ public MetastoreEventsProcessor() {
+ super(MetastoreEventsProcessor.class.getName(),
Config.hms_events_polling_interval_ms);
+ this.metastoreEventFactory = new MetastoreEventFactory();
+ this.isRunning = false;
+ }
+
+ /**
+ * Fetch the next batch of NotificationEvents from metastore. The default
batch size is
+ * <code>{@link Config#hms_events_batch_size_per_rpc}</code>
+ */
+ private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog
hmsExternalCatalog) {
+ LOG.debug("Start to pull events on catalog [{}]",
hmsExternalCatalog.getName());
+ NotificationEventResponse response =
hmsExternalCatalog.getNextEventResponse(hmsExternalCatalog);
+
+ if (response == null) {
+ return Collections.emptyList();
+ }
+ return response.getEvents();
+ }
+
+ private void doExecute(List<MetastoreEvent> events, HMSExternalCatalog
hmsExternalCatalog) {
+ for (MetastoreEvent event : events) {
+ try {
+ event.process();
+ } catch (Exception e) {
+ hmsExternalCatalog.setLastSyncedEventId(event.getEventId() -
1);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Process the given list of notification events. Useful for tests which
provide a list of events
+ */
+ private void processEvents(List<NotificationEvent> events,
HMSExternalCatalog hmsExternalCatalog) {
+ //transfer
+ List<MetastoreEvent> metastoreEvents =
metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog);
+ doExecute(metastoreEvents, hmsExternalCatalog);
+ hmsExternalCatalog.setLastSyncedEventId(events.get(events.size() -
1).getEventId());
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ if (isRunning) {
+ LOG.warn("Last task not finished,ignore current task.");
+ return;
+ }
+ isRunning = true;
+ try {
+ realRun();
+ } catch (Exception ex) {
+ LOG.warn("Task failed", ex);
+ }
+ isRunning = false;
+ }
+
+ private void realRun() {
+ List<Long> catalogIds =
Env.getCurrentEnv().getCatalogMgr().getCatalogIds();
+ for (Long catalogId : catalogIds) {
+ CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+ if (catalog instanceof HMSExternalCatalog) {
+ HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog)
catalog;
+ List<NotificationEvent> events = Collections.emptyList();
+ try {
+ events = getNextHMSEvents(hmsExternalCatalog);
+ if (!events.isEmpty()) {
+ LOG.info("Events size are {} on catalog [{}]",
events.size(),
+ hmsExternalCatalog.getName());
+ processEvents(events, hmsExternalCatalog);
+ }
+ } catch (MetastoreNotificationFetchException e) {
+ LOG.warn("Failed to fetch hms events on {}. msg: ",
hmsExternalCatalog.getName(), e);
+ } catch (Exception ex) {
+ LOG.warn("Failed to process hive metastore [{}] events .",
+ hmsExternalCatalog.getName(), ex);
+ }
+ }
+ }
+ }
+
+ public static MessageDeserializer getMessageDeserializer() {
+ return MESSAGE_DESERIALIZER;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java
new file mode 100644
index 0000000000..2bd5c4c40c
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+/**
+ * Utility exception class to be thrown for errors during event processing
+ */
+public class MetastoreNotificationException extends RuntimeException {
+
+ public MetastoreNotificationException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public MetastoreNotificationException(String msg) {
+ super(msg);
+ }
+
+ public MetastoreNotificationException(Exception e) {
+ super(e);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java
new file mode 100644
index 0000000000..487165eeca
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+/**
+ * Utility exception class to be thrown for errors during event processing
+ */
+public class MetastoreNotificationFetchException extends
MetastoreNotificationException {
+
+ public MetastoreNotificationFetchException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public MetastoreNotificationFetchException(String msg) {
+ super(msg);
+ }
+
+ public MetastoreNotificationFetchException(Exception e) {
+ super(e);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
new file mode 100644
index 0000000000..70f56bdbb0
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.util.List;
+
+/**
+ * Base class for all the table events
+ */
+public abstract class MetastoreTableEvent extends MetastoreEvent {
+
+
+ protected MetastoreTableEvent(NotificationEvent event, String catalogName)
{
+ super(event, catalogName);
+ Preconditions.checkNotNull(dbName, "Database name cannot be null");
+ Preconditions.checkNotNull(tblName, "Table name cannot be null");
+ }
+
+ /**
+ * Returns a list of parameters that are set by Hive for tables/partitions
that can be
+ * ignored to determine if the alter table/partition event is a trivial
one.
+ */
+ private static final List<String> PARAMETERS_TO_IGNORE =
+ new ImmutableList.Builder<String>()
+ .add("transient_lastDdlTime")
+ .add("numFilesErasureCoded")
+ .add("numFiles")
+ .add("comment")
+ .build();
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 96ae663015..fb8fbb5b99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -706,6 +706,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_REFRESH_EXTERNAL_DB:
+ case OperationType.OP_DROP_EXTERNAL_TABLE:
case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
data = ExternalObjectLog.read(in);
isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 217c0c117a..9254583294 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -949,6 +949,11 @@ public class EditLog {
env.getCatalogMgr().replayRefreshExternalTable(log);
break;
}
+ case OperationType.OP_DROP_EXTERNAL_TABLE: {
+ final ExternalObjectLog log = (ExternalObjectLog)
journal.getData();
+ env.getCatalogMgr().replayDropExternalTable(log);
+ break;
+ }
case OperationType.OP_INIT_EXTERNAL_TABLE: {
// Do nothing.
break;
@@ -1624,6 +1629,10 @@ public class EditLog {
logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log);
}
+ public void logDropExternalTable(ExternalObjectLog log) {
+ logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log);
+ }
+
public Journal getJournal() {
return this.journal;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 6204fc1836..c5889a8001 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -255,6 +255,8 @@ public class OperationType {
public static final short OP_DROP_MTMV_TASK = 341;
public static final short OP_ALTER_MTMV_TASK = 342;
+ public static final short OP_DROP_EXTERNAL_TABLE = 350;
+
public static final short OP_ALTER_USER = 400;
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]