This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 901f2711a3 [feature](multi-catalog) support hive metastore more events
(#15702)
901f2711a3 is described below
commit 901f2711a3fdd1037653a1c8c25a346580b645ed
Author: zhangdong <[email protected]>
AuthorDate: Mon Jan 16 14:16:12 2023 +0800
[feature](multi-catalog) support hive metastore more events (#15702)
support hive metastore more events
---
.../org/apache/doris/catalog/RefreshManager.java | 23 +-
.../doris/catalog/external/ExternalDatabase.java | 4 +
.../catalog/external/HMSExternalDatabase.java | 11 +
.../main/java/org/apache/doris/common/Config.java | 2 +-
.../org/apache/doris/datasource/CatalogMgr.java | 347 ++++++++++++++++++++-
.../apache/doris/datasource/ExternalCatalog.java | 20 ++
.../doris/datasource/ExternalMetaCacheMgr.java | 44 +++
.../apache/doris/datasource/ExternalObjectLog.java | 10 +
.../doris/datasource/HMSExternalCatalog.java | 30 ++
.../doris/datasource/hive/HiveMetaStoreCache.java | 193 +++++++++++-
.../datasource/hive/event/AddPartitionEvent.java | 88 ++++++
.../{IgnoredEvent.java => AlterDatabaseEvent.java} | 19 +-
.../datasource/hive/event/AlterPartitionEvent.java | 95 ++++++
.../datasource/hive/event/AlterTableEvent.java | 111 +++++++
...{IgnoredEvent.java => CreateDatabaseEvent.java} | 28 +-
.../datasource/hive/event/CreateTableEvent.java | 76 +++++
.../{IgnoredEvent.java => DropDatabaseEvent.java} | 28 +-
.../datasource/hive/event/DropPartitionEvent.java | 88 ++++++
.../datasource/hive/event/DropTableEvent.java | 39 +--
.../doris/datasource/hive/event/IgnoredEvent.java | 6 +-
.../datasource/hive/event/MetastoreEvent.java | 36 ++-
.../hive/event/MetastoreEventFactory.java | 33 +-
.../org/apache/doris/journal/JournalEntity.java | 6 +
.../java/org/apache/doris/persist/EditLog.java | 54 ++++
.../org/apache/doris/persist/OperationType.java | 6 +
.../doris/planner/ListPartitionPrunerV2.java | 77 +++--
.../apache/doris/datasource/CatalogMgrTest.java | 132 +++++++-
27 files changed, 1464 insertions(+), 142 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index b8d6929994..ef95b3e5f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -56,7 +56,7 @@ public class RefreshManager {
refreshInternalCtlIcebergTable(stmt, env);
} else {
// Process external catalog table refresh
- refreshExternalCtlTable(dbName, tableName, catalog);
+ env.getCatalogMgr().refreshExternalTable(dbName, tableName,
catalogName);
}
LOG.info("Successfully refresh table: {} from db: {}", tableName,
dbName);
}
@@ -146,25 +146,4 @@ public class RefreshManager {
stmt.getTableName(), "ICEBERG", icebergProperties, "");
env.createTable(createTableStmt);
}
-
- private void refreshExternalCtlTable(String dbName, String tableName,
CatalogIf catalog) throws DdlException {
- if (!(catalog instanceof ExternalCatalog)) {
- throw new DdlException("Only support refresh ExternalCatalog
Tables");
- }
- DatabaseIf db = catalog.getDbNullable(dbName);
- if (db == null) {
- throw new DdlException("Database " + dbName + " does not exist in
catalog " + catalog.getName());
- }
-
- TableIf table = db.getTableNullable(tableName);
- if (table == null) {
- throw new DdlException("Table " + tableName + " does not exist in
db " + dbName);
- }
-
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(),
dbName, tableName);
- ExternalObjectLog log = new ExternalObjectLog();
- log.setCatalogId(catalog.getId());
- log.setDbId(db.getId());
- log.setTableId(table.getId());
- Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index 65c027713e..e4efcac408 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -263,4 +263,8 @@ public class ExternalDatabase<T extends ExternalTable>
implements DatabaseIf<T>,
public void dropTable(String tableName) {
throw new NotImplementedException();
}
+
+ public void createTable(String tableName, long tableId) {
+ throw new NotImplementedException();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index a1f6bcddab..e379dd3c0d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -174,10 +174,21 @@ public class HMSExternalDatabase extends
ExternalDatabase<HMSExternalTable> impl
@Override
public void dropTable(String tableName) {
LOG.debug("drop table [{}]", tableName);
+ makeSureInitialized();
Long tableId = tableNameToId.remove(tableName);
if (tableId == null) {
LOG.warn("drop table [{}] failed", tableName);
}
idToTbl.remove(tableId);
}
+
+ @Override
+ public void createTable(String tableName, long tableId) {
+ LOG.debug("create table [{}]", tableName);
+ makeSureInitialized();
+ tableNameToId.put(tableName, tableId);
+ HMSExternalTable table = new HMSExternalTable(tableId, tableName, name,
+ (HMSExternalCatalog) extCatalog);
+ idToTbl.put(tableId, table);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index e611614f36..3d3ae9226a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1958,6 +1958,6 @@ public class Config extends ConfigBase {
* HMS polling interval in milliseconds.
*/
@ConfField(masterOnly = true)
- public static int hms_events_polling_interval_ms = 20000;
+ public static int hms_events_polling_interval_ms = 10000;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index 17b71aefaa..8054fca3e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -549,10 +549,47 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
}
}
+ public void refreshExternalTable(String dbName, String tableName, String
catalogName) throws DdlException {
+ CatalogIf catalog = nameToCatalog.get(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support refresh ExternalCatalog
Tables");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbName);
+ if (db == null) {
+ throw new DdlException("Database " + dbName + " does not exist in
catalog " + catalog.getName());
+ }
+
+ TableIf table = db.getTableNullable(tableName);
+ if (table == null) {
+ throw new DdlException("Table " + tableName + " does not exist in
db " + dbName);
+ }
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(),
dbName, tableName);
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(db.getId());
+ log.setTableId(table.getId());
+ Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
+ }
+
public void replayRefreshExternalTable(ExternalObjectLog log) {
ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ if (catalog == null) {
+ LOG.warn("No catalog found with id:[{}], it may have been
dropped.", log.getCatalogId());
+ return;
+ }
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ if (db == null) {
+ LOG.warn("No db found with id:[{}], it may have been dropped.",
log.getDbId());
+ return;
+ }
ExternalTable table = db.getTableForReplay(log.getTableId());
+ if (table == null) {
+ LOG.warn("No table found with id:[{}], it may have been dropped.",
log.getTableId());
+ return;
+ }
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidateTableCache(catalog.getId(), db.getFullName(),
table.getName());
}
@@ -586,13 +623,321 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]",
log.getCatalogId(), log.getDbId(),
log.getTableId());
ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ if (catalog == null) {
+ LOG.warn("No catalog found with id:[{}], it may have been
dropped.", log.getCatalogId());
+ return;
+ }
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ if (db == null) {
+ LOG.warn("No db found with id:[{}], it may have been dropped.",
log.getDbId());
+ return;
+ }
ExternalTable table = db.getTableForReplay(log.getTableId());
- db.dropTable(table.getName());
+ if (table == null) {
+ LOG.warn("No table found with id:[{}], it may have been dropped.",
log.getTableId());
+ return;
+ }
+ db.writeLock();
+ try {
+ db.dropTable(table.getName());
+ } finally {
+ db.writeUnlock();
+ }
+
Env.getCurrentEnv().getExtMetaCacheMgr()
.invalidateTableCache(catalog.getId(), db.getFullName(),
table.getName());
}
+ public boolean externalTableExistInLocal(String dbName, String tableName,
String catalogName) throws DdlException {
+ CatalogIf catalog = nameToCatalog.get(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support ExternalCatalog Tables");
+ }
+ return ((ExternalCatalog) catalog).tableExistInLocal(dbName,
tableName);
+ }
+
+ public void createExternalTable(String dbName, String tableName, String
catalogName) throws DdlException {
+ CatalogIf catalog = nameToCatalog.get(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support create ExternalCatalog
Tables");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbName);
+ if (db == null) {
+ throw new DdlException("Database " + dbName + " does not exist in
catalog " + catalog.getName());
+ }
+
+ TableIf table = db.getTableNullable(tableName);
+ if (table != null) {
+ throw new DdlException("Table " + tableName + " has exist in db "
+ dbName);
+ }
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(db.getId());
+ log.setTableName(tableName);
+ log.setTableId(Env.getCurrentEnv().getNextId());
+ replayCreateExternalTable(log);
+ Env.getCurrentEnv().getEditLog().logCreateExternalTable(log);
+ }
+
+ public void replayCreateExternalTable(ExternalObjectLog log) {
+
LOG.debug("ReplayCreateExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}],tableName:[{}]",
log.getCatalogId(),
+ log.getDbId(), log.getTableId(), log.getTableName());
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ if (catalog == null) {
+ LOG.warn("No catalog found with id:[{}], it may have been
dropped.", log.getCatalogId());
+ return;
+ }
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ if (db == null) {
+ LOG.warn("No db found with id:[{}], it may have been dropped.",
log.getDbId());
+ return;
+ }
+ db.writeLock();
+ try {
+ db.createTable(log.getTableName(), log.getTableId());
+ } finally {
+ db.writeUnlock();
+ }
+ }
+
+ public void dropExternalDatabase(String dbName, String catalogName) throws
DdlException {
+ CatalogIf catalog = nameToCatalog.get(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support drop ExternalCatalog
databases");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbName);
+ if (db == null) {
+ throw new DdlException("Database " + dbName + " does not exist in
catalog " + catalog.getName());
+ }
+
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(db.getId());
+ log.setInvalidCache(true);
+ replayDropExternalDatabase(log);
+ Env.getCurrentEnv().getEditLog().logDropExternalDatabase(log);
+ }
+
+ public void replayDropExternalDatabase(ExternalObjectLog log) {
+ writeLock();
+ try {
+
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]",
log.getCatalogId(),
+ log.getDbId(), log.getTableId());
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ if (catalog == null) {
+ LOG.warn("No catalog found with id:[{}], it may have been
dropped.", log.getCatalogId());
+ return;
+ }
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ if (db == null) {
+ LOG.warn("No db found with id:[{}], it may have been
dropped.", log.getDbId());
+ return;
+ }
+ catalog.dropDatabase(db.getFullName());
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(),
db.getFullName());
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void createExternalDatabase(String dbName, String catalogName)
throws DdlException {
+ CatalogIf catalog = nameToCatalog.get(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support create ExternalCatalog
databases");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbName);
+ if (db != null) {
+ throw new DdlException("Database " + dbName + " has exist in
catalog " + catalog.getName());
+ }
+
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(Env.getCurrentEnv().getNextId());
+ log.setDbName(dbName);
+ replayCreateExternalDatabase(log);
+ Env.getCurrentEnv().getEditLog().logCreateExternalDatabase(log);
+ }
+
+ public void replayCreateExternalDatabase(ExternalObjectLog log) {
+ writeLock();
+ try {
+
LOG.debug("ReplayCreateExternalDatabase,catalogId:[{}],dbId:[{}],dbName:[{}]",
log.getCatalogId(),
+ log.getDbId(), log.getDbName());
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ if (catalog == null) {
+ LOG.warn("No catalog found with id:[{}], it may have been
dropped.", log.getCatalogId());
+ return;
+ }
+ catalog.createDatabase(log.getDbId(), log.getDbName());
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void addExternalPartitions(String catalogName, String dbName,
String tableName, List<String> partitionNames)
+ throws DdlException {
+ CatalogIf catalog = nameToCatalog.get(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support ExternalCatalog");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbName);
+ if (db == null) {
+ throw new DdlException("Database " + dbName + " does not exist in
catalog " + catalog.getName());
+ }
+
+ TableIf table = db.getTableNullable(tableName);
+ if (table == null) {
+ throw new DdlException("Table " + tableName + " does not exist in
db " + dbName);
+ }
+
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(db.getId());
+ log.setTableId(table.getId());
+ log.setPartitionNames(partitionNames);
+ replayAddExternalPartitions(log);
+ Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log);
+ }
+
+ public void replayAddExternalPartitions(ExternalObjectLog log) {
+
LOG.debug("ReplayAddExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]",
log.getCatalogId(),
+ log.getDbId(), log.getTableId());
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ if (catalog == null) {
+ LOG.warn("No catalog found with id:[{}], it may have been
dropped.", log.getCatalogId());
+ return;
+ }
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ if (db == null) {
+ LOG.warn("No db found with id:[{}], it may have been dropped.",
log.getDbId());
+ return;
+ }
+ ExternalTable table = db.getTableForReplay(log.getTableId());
+ if (table == null) {
+ LOG.warn("No table found with id:[{}], it may have been dropped.",
log.getTableId());
+ return;
+ }
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+ .addPartitionsCache(catalog.getId(), table,
log.getPartitionNames());
+ }
+
+ public void dropExternalPartitions(String catalogName, String dbName,
String tableName, List<String> partitionNames)
+ throws DdlException {
+ CatalogIf catalog = nameToCatalog.get(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support ExternalCatalog");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbName);
+ if (db == null) {
+ throw new DdlException("Database " + dbName + " does not exist in
catalog " + catalog.getName());
+ }
+
+ TableIf table = db.getTableNullable(tableName);
+ if (table == null) {
+ throw new DdlException("Table " + tableName + " does not exist in
db " + dbName);
+ }
+
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(db.getId());
+ log.setTableId(table.getId());
+ log.setPartitionNames(partitionNames);
+ replayDropExternalPartitions(log);
+ Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log);
+ }
+
+ public void replayDropExternalPartitions(ExternalObjectLog log) {
+
LOG.debug("ReplayDropExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]",
log.getCatalogId(),
+ log.getDbId(), log.getTableId());
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ if (catalog == null) {
+ LOG.warn("No catalog found with id:[{}], it may have been
dropped.", log.getCatalogId());
+ return;
+ }
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ if (db == null) {
+ LOG.warn("No db found with id:[{}], it may have been dropped.",
log.getDbId());
+ return;
+ }
+ ExternalTable table = db.getTableForReplay(log.getTableId());
+ if (table == null) {
+ LOG.warn("No table found with id:[{}], it may have been dropped.",
log.getTableId());
+ return;
+ }
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+ .dropPartitionsCache(catalog.getId(), table,
log.getPartitionNames());
+ }
+
+ public void refreshExternalPartitions(String catalogName, String dbName,
String tableName,
+ List<String> partitionNames)
+ throws DdlException {
+ CatalogIf catalog = nameToCatalog.get(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support ExternalCatalog");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbName);
+ if (db == null) {
+ throw new DdlException("Database " + dbName + " does not exist in
catalog " + catalog.getName());
+ }
+
+ TableIf table = db.getTableNullable(tableName);
+ if (table == null) {
+ throw new DdlException("Table " + tableName + " does not exist in
db " + dbName);
+ }
+
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(db.getId());
+ log.setTableId(table.getId());
+ log.setPartitionNames(partitionNames);
+ replayRefreshExternalPartitions(log);
+ Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log);
+ }
+
+ public void replayRefreshExternalPartitions(ExternalObjectLog log) {
+
LOG.debug("replayRefreshExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]",
log.getCatalogId(),
+ log.getDbId(), log.getTableId());
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ if (catalog == null) {
+ LOG.warn("No catalog found with id:[{}], it may have been
dropped.", log.getCatalogId());
+ return;
+ }
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ if (db == null) {
+ LOG.warn("No db found with id:[{}], it may have been dropped.",
log.getDbId());
+ return;
+ }
+ ExternalTable table = db.getTableForReplay(log.getTableId());
+ if (table == null) {
+ LOG.warn("No table found with id:[{}], it may have been dropped.",
log.getTableId());
+ return;
+ }
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+ .invalidatePartitionsCache(catalog.getId(), db.getFullName(),
table.getName(),
+ log.getPartitionNames());
+ }
+
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 22726c3b96..3a4a711e1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
@@ -98,6 +99,17 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
*/
public abstract boolean tableExist(SessionContext ctx, String dbName,
String tblName);
+ /**
+ * check if the specified table exist in doris.
+ *
+ * @param dbName
+ * @param tblName
+ * @return true if table exists, false otherwise
+ */
+ public boolean tableExistInLocal(String dbName, String tblName) {
+ throw new NotImplementedException();
+ }
+
/**
* Catalog can't be init when creating because the external catalog may
depend on third system.
* So you have to make sure the client of third system is initialized
before any method was called.
@@ -310,4 +322,12 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
idToDb.put(db.getId(), db);
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()),
db.getId());
}
+
+ public void dropDatabase(String dbName) {
+ throw new NotImplementedException();
+ }
+
+ public void createDatabase(long dbId, String dbName) {
+ throw new NotImplementedException();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 934c3f8bbe..13875d084e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -17,6 +17,8 @@
package org.apache.doris.datasource;
+import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@@ -25,6 +27,7 @@ import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -118,4 +121,45 @@ public class ExternalMetaCacheMgr {
}
LOG.debug("invalid catalog cache for {}", catalogId);
}
+
+ public void addPartitionsCache(long catalogId, ExternalTable table,
List<String> partitionNames) {
+ if (!(table instanceof HMSExternalTable)) {
+ LOG.warn("only support HMSTable");
+ return;
+ }
+ String dbName =
ClusterNamespace.getNameFromFullName(table.getDbName());
+ HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ if (metaCache != null) {
+ metaCache.addPartitionsCache(dbName, table.getName(),
partitionNames,
+ ((HMSExternalTable) table).getPartitionColumnTypes());
+ }
+ LOG.debug("add partition cache for {}.{} in catalog {}", dbName,
table.getName(), catalogId);
+ }
+
+ public void dropPartitionsCache(long catalogId, ExternalTable table,
List<String> partitionNames) {
+ if (!(table instanceof HMSExternalTable)) {
+ LOG.warn("only support HMSTable");
+ return;
+ }
+ String dbName =
ClusterNamespace.getNameFromFullName(table.getDbName());
+ HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ if (metaCache != null) {
+ metaCache.dropPartitionsCache(dbName, table.getName(),
partitionNames,
+ ((HMSExternalTable) table).getPartitionColumnTypes(),
true);
+ }
+ LOG.debug("drop partition cache for {}.{} in catalog {}", dbName,
table.getName(), catalogId);
+ }
+
+ public void invalidatePartitionsCache(long catalogId, String dbName,
String tableName,
+ List<String> partitionNames) {
+ HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ if (metaCache != null) {
+ dbName = ClusterNamespace.getNameFromFullName(dbName);
+ for (String partitionName : partitionNames) {
+ metaCache.invalidatePartitionCache(dbName, tableName,
partitionName);
+ }
+
+ }
+ LOG.debug("invalidate partition cache for {}.{} in catalog {}",
dbName, tableName, catalogId);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
index 030f981382..04d5b06036 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -29,6 +29,7 @@ import lombok.NoArgsConstructor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.List;
@NoArgsConstructor
@Getter
@@ -40,12 +41,21 @@ public class ExternalObjectLog implements Writable {
@SerializedName(value = "dbId")
private long dbId;
+ @SerializedName(value = "dbName")
+ private String dbName;
+
@SerializedName(value = "tableId")
private long tableId;
+ @SerializedName(value = "tableName")
+ private String tableName;
+
@SerializedName(value = "invalidCache")
private boolean invalidCache;
+ @SerializedName(value = "partitionNames")
+ private List<String> partitionNames;
+
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 5de9aec2eb..70ba9f9aa9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -154,6 +154,16 @@ public class HMSExternalCatalog extends ExternalCatalog {
return client.tableExists(getRealTableName(dbName), tblName);
}
+ @Override
+ public boolean tableExistInLocal(String dbName, String tblName) {
+ makeSureInitialized();
+ HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase)
idToDb.get(dbNameToId.get(dbName));
+ if (hmsExternalDatabase == null) {
+ return false;
+ }
+ return
hmsExternalDatabase.getTable(getRealTableName(tblName)).isPresent();
+ }
+
public PooledHiveMetaStoreClient getClient() {
makeSureInitialized();
return client;
@@ -215,4 +225,24 @@ public class HMSExternalCatalog extends ExternalCatalog {
}
return currentNotificationEventId.getEventId();
}
+
+ @Override
+ public void dropDatabase(String dbName) {
+ LOG.debug("drop database [{}]", dbName);
+ makeSureInitialized();
+ Long dbId = dbNameToId.remove(dbName);
+ if (dbId == null) {
+ LOG.warn("drop database [{}] failed", dbName);
+ }
+ idToDb.remove(dbId);
+ }
+
+ @Override
+ public void createDatabase(long dbId, String dbName) {
+ makeSureInitialized();
+ LOG.debug("create database [{}]", dbName);
+ dbNameToId.put(dbName, dbId);
+ HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName);
+ idToDb.put(dbId, db);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 6f4d3845e7..c51ef521a4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -45,6 +45,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -57,6 +58,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -163,27 +165,36 @@ public class HiveMetaStoreCache {
LOG.debug("load #{} partitions for {} in catalog {}",
partitionNames.size(), key, catalog.getName());
}
Map<Long, PartitionItem> idToPartitionItem =
Maps.newHashMapWithExpectedSize(partitionNames.size());
+ Map<String, Long> partitionNameToIdMap =
Maps.newHashMapWithExpectedSize(partitionNames.size());
+ Map<Long, List<UniqueId>> idToUniqueIdsMap =
Maps.newHashMapWithExpectedSize(partitionNames.size());
long idx = 0;
for (String partitionName : partitionNames) {
- idToPartitionItem.put(idx++, toListPartitionItem(partitionName,
key.types));
+ long partitionId = idx++;
+ ListPartitionItem listPartitionItem =
toListPartitionItem(partitionName, key.types);
+ idToPartitionItem.put(partitionId, listPartitionItem);
+ partitionNameToIdMap.put(partitionName, partitionId);
}
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = null;
Map<Range<PartitionKey>, UniqueId> rangeToId = null;
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null;
+ Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = null;
if (key.types.size() > 1) {
// uidToPartitionRange and rangeToId are only used for
multi-column partition
- uidToPartitionRange =
ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem);
+ uidToPartitionRange =
ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem,
idToUniqueIdsMap);
rangeToId =
ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
} else {
Preconditions.checkState(key.types.size() == 1, key.types);
// singleColumnRangeMap is only used for single-column partition
- singleColumnRangeMap =
ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem);
+ singleColumnRangeMap =
ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem,
idToUniqueIdsMap);
+ singleUidToColumnRangeMap =
ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap);
}
- return new HivePartitionValues(idToPartitionItem, uidToPartitionRange,
rangeToId, singleColumnRangeMap);
+ Map<Long, List<String>> partitionValuesMap =
ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
+ return new HivePartitionValues(idToPartitionItem, uidToPartitionRange,
rangeToId, singleColumnRangeMap, idx,
+ partitionNameToIdMap, idToUniqueIdsMap,
singleUidToColumnRangeMap, partitionValuesMap);
}
- private ListPartitionItem toListPartitionItem(String partitionName,
List<Type> types) {
+ public ListPartitionItem toListPartitionItem(String partitionName,
List<Type> types) {
// Partition name will be in format: nation=cn/city=beijing
// parse it to get values "cn" and "beijing"
String[] parts = partitionName.split("/");
@@ -274,6 +285,10 @@ public class HiveMetaStoreCache {
public HivePartitionValues getPartitionValues(String dbName, String
tblName, List<Type> types) {
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName,
tblName, types);
+ return getPartitionValues(key);
+ }
+
+ public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) {
try {
return partitionValuesCache.get(key);
} catch (ExecutionException e) {
@@ -350,6 +365,21 @@ public class HiveMetaStoreCache {
}
}
+ public void invalidatePartitionCache(String dbName, String tblName, String
partitionName) {
+ PartitionValueCacheKey key = new PartitionValueCacheKey(dbName,
tblName, null);
+ HivePartitionValues partitionValues =
partitionValuesCache.getIfPresent(key);
+ if (partitionValues != null) {
+ Long partitionId =
partitionValues.partitionNameToIdMap.get(partitionName);
+ List<String> values =
partitionValues.partitionValuesMap.get(partitionId);
+ PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName,
values);
+ HivePartition partition = partitionCache.getIfPresent(partKey);
+ if (partition != null) {
+ fileCache.invalidate(new FileCacheKey(partition.getPath(),
null));
+ partitionCache.invalidate(partKey);
+ }
+ }
+ }
+
public void invalidateDbCache(String dbName) {
long start = System.currentTimeMillis();
Set<PartitionValueCacheKey> keys =
partitionValuesCache.asMap().keySet();
@@ -369,6 +399,113 @@ public class HiveMetaStoreCache {
LOG.debug("invalid all meta cache in catalog {}", catalog.getName());
}
+ // partition name format: nation=cn/city=beijing
+ public void addPartitionsCache(String dbName, String tblName, List<String>
partitionNames,
+ List<Type> partitionColumnTypes) {
+ PartitionValueCacheKey key = new PartitionValueCacheKey(dbName,
tblName, partitionColumnTypes);
+ HivePartitionValues partitionValues =
partitionValuesCache.getIfPresent(key);
+ if (partitionValues == null) {
+ return;
+ }
+ HivePartitionValues copy = partitionValues.copy();
+ Map<Long, PartitionItem> idToPartitionItemBefore =
copy.getIdToPartitionItem();
+ Map<String, Long> partitionNameToIdMapBefore =
copy.getPartitionNameToIdMap();
+ Map<Long, List<UniqueId>> idToUniqueIdsMap =
copy.getIdToUniqueIdsMap();
+ Map<Long, PartitionItem> idToPartitionItem = new HashMap<>();
+ long idx = copy.getNextPartitionId();
+ for (String partitionName : partitionNames) {
+ if (partitionNameToIdMapBefore.containsKey(partitionName)) {
+ LOG.info("addPartitionsCache partitionName:[{}] has exist in
table:[{}]", partitionName, tblName);
+ continue;
+ }
+ long partitionId = idx++;
+ ListPartitionItem listPartitionItem =
toListPartitionItem(partitionName, key.types);
+ idToPartitionItemBefore.put(partitionId, listPartitionItem);
+ idToPartitionItem.put(partitionId, listPartitionItem);
+ partitionNameToIdMapBefore.put(partitionName, partitionId);
+ }
+ Map<Long, List<String>> partitionValuesMapBefore =
copy.getPartitionValuesMap();
+ Map<Long, List<String>> partitionValuesMap =
ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
+ partitionValuesMapBefore.putAll(partitionValuesMap);
+ copy.setNextPartitionId(idx);
+ if (key.types.size() > 1) {
+ Map<UniqueId, Range<PartitionKey>> uidToPartitionRangeBefore =
copy.getUidToPartitionRange();
+ // uidToPartitionRange and rangeToId are only used for
multi-column partition
+ Map<UniqueId, Range<PartitionKey>> uidToPartitionRange =
ListPartitionPrunerV2
+ .genUidToPartitionRange(idToPartitionItem,
idToUniqueIdsMap);
+ uidToPartitionRangeBefore.putAll(uidToPartitionRange);
+ Map<Range<PartitionKey>, UniqueId> rangeToIdBefore =
copy.getRangeToId();
+ Map<Range<PartitionKey>, UniqueId> rangeToId =
ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
+ rangeToIdBefore.putAll(rangeToId);
+ } else {
+ Preconditions.checkState(key.types.size() == 1, key.types);
+ // singleColumnRangeMap is only used for single-column partition
+ RangeMap<ColumnBound, UniqueId> singleColumnRangeMapBefore =
copy.getSingleColumnRangeMap();
+ RangeMap<ColumnBound, UniqueId> singleColumnRangeMap =
ListPartitionPrunerV2
+ .genSingleColumnRangeMap(idToPartitionItem,
idToUniqueIdsMap);
+ singleColumnRangeMapBefore.putAll(singleColumnRangeMap);
+ Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMapBefore
= copy
+ .getSingleUidToColumnRangeMap();
+ Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap =
ListPartitionPrunerV2
+ .genSingleUidToColumnRange(singleColumnRangeMap);
+ singleUidToColumnRangeMapBefore.putAll(singleUidToColumnRangeMap);
+ }
+ HivePartitionValues partitionValuesCur =
partitionValuesCache.getIfPresent(key);
+ if (partitionValuesCur == partitionValues) {
+ partitionValuesCache.put(key, copy);
+ }
+ }
+
+ public void dropPartitionsCache(String dbName, String tblName,
List<String> partitionNames,
+ List<Type> partitionColumnTypes, boolean invalidPartitionCache) {
+ PartitionValueCacheKey key = new PartitionValueCacheKey(dbName,
tblName, partitionColumnTypes);
+ HivePartitionValues partitionValues =
partitionValuesCache.getIfPresent(key);
+ if (partitionValues == null) {
+ return;
+ }
+ HivePartitionValues copy = partitionValues.copy();
+ Map<String, Long> partitionNameToIdMapBefore =
copy.getPartitionNameToIdMap();
+ Map<Long, PartitionItem> idToPartitionItemBefore =
copy.getIdToPartitionItem();
+ Map<Long, List<UniqueId>> idToUniqueIdsMapBefore =
copy.getIdToUniqueIdsMap();
+ Map<UniqueId, Range<PartitionKey>> uidToPartitionRangeBefore =
copy.getUidToPartitionRange();
+ Map<Range<PartitionKey>, UniqueId> rangeToIdBefore =
copy.getRangeToId();
+ RangeMap<ColumnBound, UniqueId> singleColumnRangeMapBefore =
copy.getSingleColumnRangeMap();
+ Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMapBefore =
copy.getSingleUidToColumnRangeMap();
+ Map<Long, List<String>> partitionValuesMap =
copy.getPartitionValuesMap();
+ for (String partitionName : partitionNames) {
+ if (!partitionNameToIdMapBefore.containsKey(partitionName)) {
+ LOG.info("dropPartitionsCache partitionName:[{}] not exist in
table:[{}]", partitionName, tblName);
+ continue;
+ }
+ Long partitionId =
partitionNameToIdMapBefore.remove(partitionName);
+ idToPartitionItemBefore.remove(partitionId);
+ partitionValuesMap.remove(partitionId);
+ List<UniqueId> uniqueIds =
idToUniqueIdsMapBefore.remove(partitionId);
+ if (key.types.size() > 1) {
+ for (UniqueId uniqueId : uniqueIds) {
+ Range<PartitionKey> range =
uidToPartitionRangeBefore.remove(uniqueId);
+ rangeToIdBefore.remove(range);
+ }
+ } else {
+ for (UniqueId uniqueId : uniqueIds) {
+ Range<ColumnBound> range =
singleUidToColumnRangeMapBefore.remove(uniqueId);
+ singleColumnRangeMapBefore.remove(range);
+ }
+ }
+ if (invalidPartitionCache) {
+ invalidatePartitionCache(dbName, tblName, partitionName);
+ }
+ }
+ HivePartitionValues partitionValuesCur =
partitionValuesCache.getIfPresent(key);
+ if (partitionValuesCur == partitionValues) {
+ partitionValuesCache.put(key, copy);
+ }
+ }
+
+ public void putPartitionValuesCacheForTest(PartitionValueCacheKey key,
HivePartitionValues values) {
+ partitionValuesCache.put(key, values);
+ }
+
/**
* The Key of hive partition value cache
*/
@@ -480,24 +617,58 @@ public class HiveMetaStoreCache {
@Data
public static class HivePartitionValues {
+ private long nextPartitionId;
+ private Map<String, Long> partitionNameToIdMap;
+ private Map<Long, List<UniqueId>> idToUniqueIdsMap;
private Map<Long, PartitionItem> idToPartitionItem;
- private Map<Long, List<String>> partitionValuesMap = Maps.newHashMap();
+ private Map<Long, List<String>> partitionValuesMap;
+ //multi pair
private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange;
private Map<Range<PartitionKey>, UniqueId> rangeToId;
+ //single pair
private RangeMap<ColumnBound, UniqueId> singleColumnRangeMap;
+ private Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap;
+
+ public HivePartitionValues() {
+ }
public HivePartitionValues(Map<Long, PartitionItem> idToPartitionItem,
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange,
Map<Range<PartitionKey>, UniqueId> rangeToId,
- RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) {
+ RangeMap<ColumnBound, UniqueId> singleColumnRangeMap,
+ long nextPartitionId,
+ Map<String, Long> partitionNameToIdMap,
+ Map<Long, List<UniqueId>> idToUniqueIdsMap,
+ Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap,
+ Map<Long, List<String>> partitionValuesMap) {
this.idToPartitionItem = idToPartitionItem;
- for (Map.Entry<Long, PartitionItem> entry :
this.idToPartitionItem.entrySet()) {
- partitionValuesMap.put(entry.getKey(),
- ((ListPartitionItem)
entry.getValue()).getItems().get(0).getPartitionValuesAsStringList());
- }
this.uidToPartitionRange = uidToPartitionRange;
this.rangeToId = rangeToId;
this.singleColumnRangeMap = singleColumnRangeMap;
+ this.nextPartitionId = nextPartitionId;
+ this.partitionNameToIdMap = partitionNameToIdMap;
+ this.idToUniqueIdsMap = idToUniqueIdsMap;
+ this.singleUidToColumnRangeMap = singleUidToColumnRangeMap;
+ this.partitionValuesMap = partitionValuesMap;
+ }
+
+ public HivePartitionValues copy() {
+ HivePartitionValues copy = new HivePartitionValues();
+ copy.setNextPartitionId(nextPartitionId);
+ copy.setPartitionNameToIdMap(partitionNameToIdMap == null ? null :
Maps.newHashMap(partitionNameToIdMap));
+ copy.setIdToUniqueIdsMap(idToUniqueIdsMap == null ? null :
Maps.newHashMap(idToUniqueIdsMap));
+ copy.setIdToPartitionItem(idToPartitionItem == null ? null :
Maps.newHashMap(idToPartitionItem));
+ copy.setPartitionValuesMap(partitionValuesMap == null ? null :
Maps.newHashMap(partitionValuesMap));
+ copy.setUidToPartitionRange(uidToPartitionRange == null ? null :
Maps.newHashMap(uidToPartitionRange));
+ copy.setRangeToId(rangeToId == null ? null :
Maps.newHashMap(rangeToId));
+ copy.setSingleUidToColumnRangeMap(
+ singleUidToColumnRangeMap == null ? null :
Maps.newHashMap(singleUidToColumnRangeMap));
+ if (singleColumnRangeMap != null) {
+ RangeMap<ColumnBound, UniqueId> copySingleColumnRangeMap =
TreeRangeMap.create();
+ copySingleColumnRangeMap.putAll(singleColumnRangeMap);
+ copy.setSingleColumnRangeMap(copySingleColumnRangeMap);
+ }
+ return copy;
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
new file mode 100644
index 0000000000..004efa3d15
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * MetastoreEvent for ADD_PARTITION event type
+ */
+public class AddPartitionEvent extends MetastoreTableEvent {
+ private final Table hmsTbl;
+ private final List<String> partitionNames;
+
+ private AddPartitionEvent(NotificationEvent event,
+ String catalogName) {
+ super(event, catalogName);
+
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ADD_PARTITION));
+ Preconditions
+ .checkNotNull(event.getMessage(), debugString("Event message
is null"));
+ try {
+ AddPartitionMessage addPartitionMessage =
+ MetastoreEventsProcessor.getMessageDeserializer()
+ .getAddPartitionMessage(event.getMessage());
+ hmsTbl =
Preconditions.checkNotNull(addPartitionMessage.getTableObj());
+ Iterable<Partition> addedPartitions =
addPartitionMessage.getPartitionObjs();
+ partitionNames = new ArrayList<>();
+ List<String> partitionColNames = hmsTbl.getPartitionKeys().stream()
+ .map(FieldSchema::getName).collect(Collectors.toList());
+ addedPartitions.forEach(partition -> partitionNames.add(
+ FileUtils.makePartName(partitionColNames,
partition.getValues())));
+ } catch (Exception ex) {
+ throw new MetastoreNotificationException(ex);
+ }
+ }
+
+ protected static List<MetastoreEvent> getEvents(NotificationEvent event,
+ String catalogName) {
+ return Lists.newArrayList(new AddPartitionEvent(event, catalogName));
+ }
+
+ @Override
+ protected void process() throws MetastoreNotificationException {
+ try {
+
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNames:[{}]",
catalogName, dbName, tblName,
+ partitionNames.toString());
+ // bail out early if there are not partitions to process
+ if (partitionNames.isEmpty()) {
+ infoLog("Partition list is empty. Ignoring this event.");
+ return;
+ }
+ Env.getCurrentEnv().getCatalogMgr()
+ .addExternalPartitions(catalogName, dbName,
hmsTbl.getTableName(), partitionNames);
+ } catch (DdlException e) {
+ throw new MetastoreNotificationException(
+ debugString("Failed to process event"));
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
similarity index 61%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
index 4d2dc1a178..59445a5dc4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
@@ -18,26 +18,31 @@
package org.apache.doris.datasource.hive.event;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import java.util.List;
/**
- * An event type which is ignored. Useful for unsupported metastore event types
+ * MetastoreEvent for Alter_DATABASE event type
*/
-public class IgnoredEvent extends MetastoreEvent {
- protected IgnoredEvent(NotificationEvent event, String catalogName) {
+public class AlterDatabaseEvent extends MetastoreEvent {
+
+ private AlterDatabaseEvent(NotificationEvent event,
+ String catalogName) {
super(event, catalogName);
+
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_DATABASE));
}
- private static List<MetastoreEvent> getEvents(NotificationEvent event,
+ protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
- return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+ return Lists.newArrayList(new AlterDatabaseEvent(event, catalogName));
}
@Override
- public void process() {
- debugLog("Ignoring unknown event type " +
metastoreNotificationEvent.getEventType());
+ protected void process() throws MetastoreNotificationException {
+ // only can change properties,we do nothing
+ infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
new file mode 100644
index 0000000000..b2ba44f842
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * MetastoreEvent for ALTER_PARTITION event type
+ */
+public class AlterPartitionEvent extends MetastoreTableEvent {
+ private final Table hmsTbl;
+ private final org.apache.hadoop.hive.metastore.api.Partition
partitionAfter;
+ private final org.apache.hadoop.hive.metastore.api.Partition
partitionBefore;
+ private final String partitionNameBefore;
+ private final String partitionNameAfter;
+ // true if this alter event was due to a rename operation
+ private final boolean isRename;
+
+ private AlterPartitionEvent(NotificationEvent event,
+ String catalogName) {
+ super(event, catalogName);
+
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_PARTITION));
+ Preconditions
+ .checkNotNull(event.getMessage(), debugString("Event message
is null"));
+ try {
+ AlterPartitionMessage alterPartitionMessage =
+ MetastoreEventsProcessor.getMessageDeserializer()
+ .getAlterPartitionMessage(event.getMessage());
+ hmsTbl =
Preconditions.checkNotNull(alterPartitionMessage.getTableObj());
+ partitionBefore =
Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore());
+ partitionAfter =
Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
+ List<String> partitionColNames = hmsTbl.getPartitionKeys().stream()
+ .map(FieldSchema::getName).collect(Collectors.toList());
+ partitionNameBefore = FileUtils.makePartName(partitionColNames,
partitionBefore.getValues());
+ partitionNameAfter = FileUtils.makePartName(partitionColNames,
partitionAfter.getValues());
+ isRename =
!partitionNameBefore.equalsIgnoreCase(partitionNameAfter);
+ } catch (Exception ex) {
+ throw new MetastoreNotificationException(ex);
+ }
+ }
+
+ protected static List<MetastoreEvent> getEvents(NotificationEvent event,
+ String catalogName) {
+ return Lists.newArrayList(new AlterPartitionEvent(event, catalogName));
+ }
+
+ @Override
+ protected void process() throws MetastoreNotificationException {
+ try {
+
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNameBefore:[{}],partitionNameAfter:[{}]",
+ catalogName, dbName, tblName, partitionNameBefore,
partitionNameAfter);
+ if (isRename) {
+ Env.getCurrentEnv().getCatalogMgr()
+ .dropExternalPartitions(catalogName, dbName, tblName,
Lists.newArrayList(partitionNameBefore));
+ Env.getCurrentEnv().getCatalogMgr()
+ .addExternalPartitions(catalogName, dbName, tblName,
Lists.newArrayList(partitionNameAfter));
+ } else {
+ Env.getCurrentEnv().getCatalogMgr()
+ .refreshExternalPartitions(catalogName, dbName,
hmsTbl.getTableName(),
+ Lists.newArrayList(partitionNameAfter));
+ }
+ } catch (DdlException e) {
+ throw new MetastoreNotificationException(
+ debugString("Failed to process event"));
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
new file mode 100644
index 0000000000..b14a1c6577
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
+
+import java.util.List;
+
+/**
+ * MetastoreEvent for ALTER_TABLE event type
+ */
+public class AlterTableEvent extends MetastoreTableEvent {
+ // the table object before alter operation
+ private final Table tableBefore;
+ // the table object after alter operation
+ private final Table tableAfter;
+
+ // true if this alter event was due to a rename operation
+ private final boolean isRename;
+
+ private AlterTableEvent(NotificationEvent event, String catalogName) {
+ super(event, catalogName);
+
Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(getEventType()));
+ Preconditions
+ .checkNotNull(event.getMessage(), debugString("Event message
is null"));
+ try {
+ JSONAlterTableMessage alterTableMessage =
+ (JSONAlterTableMessage)
MetastoreEventsProcessor.getMessageDeserializer()
+ .getAlterTableMessage(event.getMessage());
+ tableAfter =
Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
+ tableBefore =
Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(
+ debugString("Unable to parse the alter table message"), e);
+ }
+ // this is a rename event if either dbName or tblName of before and
after object changed
+ isRename =
!tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName())
+ ||
!tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName());
+
+ }
+
+ public static List<MetastoreEvent> getEvents(NotificationEvent event,
+ String catalogName) {
+ return Lists.newArrayList(new AlterTableEvent(event, catalogName));
+ }
+
+
+ private void processRename() throws DdlException {
+ if (!isRename) {
+ return;
+ }
+ boolean hasExist = Env.getCurrentEnv().getCatalogMgr()
+ .externalTableExistInLocal(tableAfter.getDbName(),
tableAfter.getTableName(), catalogName);
+ if (hasExist) {
+ infoLog("AlterExternalTable canceled,because tableAfter has exist,
"
+ + "catalogName:[{}],dbName:[{}],tableName:[{}]",
+ catalogName, dbName, tableAfter.getTableName());
+ return;
+ }
+ Env.getCurrentEnv().getCatalogMgr()
+ .dropExternalTable(tableBefore.getDbName(),
tableBefore.getTableName(), catalogName);
+ Env.getCurrentEnv().getCatalogMgr()
+ .createExternalTable(tableAfter.getDbName(),
tableAfter.getTableName(), catalogName);
+
+ }
+
+ /**
+ * If the ALTER_TABLE event is due a table rename, this method removes the
old table
+ * and creates a new table with the new name. Else, we just refresh table
+ */
+ @Override
+ protected void process() throws MetastoreNotificationException {
+ try {
+
infoLog("catalogName:[{}],dbName:[{}],tableBefore:[{}],tableAfter:[{}]",
catalogName, dbName,
+ tableBefore.getTableName(), tableAfter.getTableName());
+ if (isRename) {
+ processRename();
+ return;
+ }
+ //The scope of refresh can be narrowed in the future
+ Env.getCurrentEnv().getCatalogMgr()
+ .refreshExternalTable(tableBefore.getDbName(),
tableBefore.getTableName(), catalogName);
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(
+ debugString("Failed to process event"));
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
similarity index 51%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
index 4d2dc1a178..48476bce57 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
@@ -18,26 +18,40 @@
package org.apache.doris.datasource.hive.event;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import java.util.List;
/**
- * An event type which is ignored. Useful for unsupported metastore event types
+ * MetastoreEvent for CREATE_DATABASE event type
*/
-public class IgnoredEvent extends MetastoreEvent {
- protected IgnoredEvent(NotificationEvent event, String catalogName) {
+public class CreateDatabaseEvent extends MetastoreEvent {
+
+ private CreateDatabaseEvent(NotificationEvent event,
+ String catalogName) {
super(event, catalogName);
+
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.CREATE_DATABASE));
}
- private static List<MetastoreEvent> getEvents(NotificationEvent event,
+ protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
- return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+ return Lists.newArrayList(new CreateDatabaseEvent(event, catalogName));
}
@Override
- public void process() {
- debugLog("Ignoring unknown event type " +
metastoreNotificationEvent.getEventType());
+ protected void process() throws MetastoreNotificationException {
+ try {
+ infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
+ Env.getCurrentEnv().getCatalogMgr()
+ .createExternalDatabase(dbName, catalogName);
+ } catch (DdlException e) {
+ throw new MetastoreNotificationException(
+ debugString("Failed to process event"));
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
new file mode 100644
index 0000000000..e4e8e8bb4b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+
+import java.util.List;
+
+/**
+ * MetastoreEvent for CREATE_TABLE event type
+ */
+public class CreateTableEvent extends MetastoreTableEvent {
+ private final Table hmsTbl;
+
+ private CreateTableEvent(NotificationEvent event, String catalogName)
throws MetastoreNotificationException {
+ super(event, catalogName);
+
Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(getEventType()));
+ Preconditions
+ .checkNotNull(event.getMessage(), debugString("Event message
is null"));
+ try {
+ CreateTableMessage createTableMessage =
+
MetastoreEventsProcessor.getMessageDeserializer().getCreateTableMessage(event.getMessage());
+ hmsTbl =
Preconditions.checkNotNull(createTableMessage.getTableObj());
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(
+ debugString("Unable to deserialize the event message"), e);
+ }
+ }
+
+ public static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
+ return Lists.newArrayList(new CreateTableEvent(event, catalogName));
+ }
+
+ @Override
+ protected void process() throws MetastoreNotificationException {
+ try {
+ infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]",
catalogName, dbName, tblName);
+ boolean hasExist = Env.getCurrentEnv().getCatalogMgr()
+ .externalTableExistInLocal(dbName, hmsTbl.getTableName(),
catalogName);
+ if (hasExist) {
+ infoLog(
+ "CreateExternalTable canceled,because table has exist,"
+ +
"catalogName:[{}],dbName:[{}],tableName:[{}]",
+ catalogName, dbName, tblName);
+ return;
+ }
+ Env.getCurrentEnv().getCatalogMgr().createExternalTable(dbName,
hmsTbl.getTableName(), catalogName);
+ } catch (DdlException e) {
+ throw new MetastoreNotificationException(
+ debugString("Failed to process event"));
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
similarity index 52%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
index 4d2dc1a178..b51145a258 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
@@ -18,26 +18,40 @@
package org.apache.doris.datasource.hive.event;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import java.util.List;
/**
- * An event type which is ignored. Useful for unsupported metastore event types
+ * MetastoreEvent for DROP_DATABASE event type
*/
-public class IgnoredEvent extends MetastoreEvent {
- protected IgnoredEvent(NotificationEvent event, String catalogName) {
+public class DropDatabaseEvent extends MetastoreEvent {
+
+ private DropDatabaseEvent(NotificationEvent event,
+ String catalogName) {
super(event, catalogName);
+
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.DROP_DATABASE));
}
- private static List<MetastoreEvent> getEvents(NotificationEvent event,
+ protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
- return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+ return Lists.newArrayList(new DropDatabaseEvent(event, catalogName));
}
@Override
- public void process() {
- debugLog("Ignoring unknown event type " +
metastoreNotificationEvent.getEventType());
+ protected void process() throws MetastoreNotificationException {
+ try {
+ infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
+ Env.getCurrentEnv().getCatalogMgr()
+ .dropExternalDatabase(dbName, catalogName);
+ } catch (DdlException e) {
+ throw new MetastoreNotificationException(
+ debugString("Failed to process event"));
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
new file mode 100644
index 0000000000..cd5ed5bfbd
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * MetastoreEvent for ADD_PARTITION event type
+ */
+public class DropPartitionEvent extends MetastoreTableEvent {
+ private final Table hmsTbl;
+ private final List<String> partitionNames;
+
+ private DropPartitionEvent(NotificationEvent event,
+ String catalogName) {
+ super(event, catalogName);
+
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.DROP_PARTITION));
+ Preconditions
+ .checkNotNull(event.getMessage(), debugString("Event message
is null"));
+ try {
+ DropPartitionMessage dropPartitionMessage =
+ MetastoreEventsProcessor.getMessageDeserializer()
+ .getDropPartitionMessage(event.getMessage());
+ hmsTbl =
Preconditions.checkNotNull(dropPartitionMessage.getTableObj());
+ List<Map<String, String>> droppedPartitions =
dropPartitionMessage.getPartitions();
+ partitionNames = new ArrayList<>();
+ List<String> partitionColNames = hmsTbl.getPartitionKeys().stream()
+ .map(FieldSchema::getName).collect(Collectors.toList());
+ droppedPartitions.forEach(partition -> partitionNames.add(
+ getPartitionName(partition, partitionColNames)));
+ } catch (Exception ex) {
+ throw new MetastoreNotificationException(ex);
+ }
+ }
+
+ protected static List<MetastoreEvent> getEvents(NotificationEvent event,
+ String catalogName) {
+ return Lists.newArrayList(
+ new DropPartitionEvent(event, catalogName));
+ }
+
+ @Override
+ protected void process() throws MetastoreNotificationException {
+ try {
+
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNames:[{}]",
catalogName, dbName, tblName,
+ partitionNames.toString());
+ // bail out early if there are not partitions to process
+ if (partitionNames.isEmpty()) {
+ infoLog("Partition list is empty. Ignoring this event.");
+ return;
+ }
+ Env.getCurrentEnv().getCatalogMgr()
+ .dropExternalPartitions(catalogName, dbName,
hmsTbl.getTableName(), partitionNames);
+ } catch (DdlException e) {
+ throw new MetastoreNotificationException(
+ debugString("Failed to process event"));
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
index 8647e47b78..2b84c1bb42 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
@@ -25,8 +25,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import java.util.List;
@@ -34,25 +32,21 @@ import java.util.List;
* MetastoreEvent for DROP_TABLE event type
*/
public class DropTableEvent extends MetastoreTableEvent {
- private static final Logger LOG =
LogManager.getLogger(DropTableEvent.class);
- private final String dbName;
private final String tableName;
private DropTableEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType()));
- JSONDropTableMessage dropTableMessage =
- (JSONDropTableMessage)
MetastoreEventsProcessor.getMessageDeserializer()
- .getDropTableMessage(event.getMessage());
+ Preconditions
+ .checkNotNull(event.getMessage(), debugString("Event message
is null"));
try {
- dbName = dropTableMessage.getDB();
+ JSONDropTableMessage dropTableMessage =
+ (JSONDropTableMessage)
MetastoreEventsProcessor.getMessageDeserializer()
+ .getDropTableMessage(event.getMessage());
tableName = dropTableMessage.getTable();
} catch (Exception e) {
- throw new MetastoreNotificationException(debugString(
- "Could not parse event message. "
- + "Check if %s is set to true in metastore
configuration",
-
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
+ throw new MetastoreNotificationException(e);
}
}
@@ -61,29 +55,14 @@ public class DropTableEvent extends MetastoreTableEvent {
return Lists.newArrayList(new DropTableEvent(event, catalogName));
}
- @Override
- protected boolean existInCache() {
- return true;
- }
-
- @Override
- protected boolean canBeSkipped() {
- return false;
- }
-
- protected boolean isSupported() {
- return true;
- }
-
@Override
protected void process() throws MetastoreNotificationException {
try {
- LOG.info("DropTable event
process,catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName,
- tableName);
+ infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]",
catalogName, dbName, tableName);
Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName,
tableName, catalogName);
} catch (DdlException e) {
- LOG.warn("DropExternalTable
failed,dbName:[{}],tableName:[{}],catalogName:[{}].", dbName, tableName,
- catalogName, e);
+ throw new MetastoreNotificationException(
+ debugString("Failed to process event"));
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
index 4d2dc1a178..d504c2917f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
@@ -27,17 +27,17 @@ import java.util.List;
* An event type which is ignored. Useful for unsupported metastore event types
*/
public class IgnoredEvent extends MetastoreEvent {
- protected IgnoredEvent(NotificationEvent event, String catalogName) {
+ private IgnoredEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
}
- private static List<MetastoreEvent> getEvents(NotificationEvent event,
+ protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new IgnoredEvent(event, catalogName));
}
@Override
public void process() {
- debugLog("Ignoring unknown event type " +
metastoreNotificationEvent.getEventType());
+ infoLog("Ignoring unknown event type " +
metastoreNotificationEvent.getEventType());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
index 5cc4594457..132496a9f4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
@@ -22,6 +22,9 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
+import java.util.Map;
+
/**
* Abstract base class for all MetastoreEvents. A MetastoreEvent is an object
used to
* process a NotificationEvent received from metastore.
@@ -105,11 +108,6 @@ public abstract class MetastoreEvent {
return null;
}
-
- protected boolean existInCache() throws MetastoreNotificationException {
- return false;
- }
-
/**
* Returns the number of events represented by this event. For most events
this is 1.
* In case of batch events this could be more than 1.
@@ -128,14 +126,6 @@ public abstract class MetastoreEvent {
return false;
}
- /**
- * Whether the current version of FE supports processing of some events,
some events are reserved,
- * and may be processed later version.
- */
- protected boolean isSupported() {
- return false;
- }
-
/**
* Process the information available in the NotificationEvent.
*/
@@ -196,6 +186,26 @@ public abstract class MetastoreEvent {
LOG.debug(formatString, formatArgs);
}
+ protected String getPartitionName(Map<String, String> part, List<String>
partitionColNames) {
+ if (part.size() == 0) {
+ return "";
+ }
+ if (partitionColNames.size() != part.size()) {
+ return "";
+ }
+ StringBuilder name = new StringBuilder();
+ int i = 0;
+ for (String colName : partitionColNames) {
+ if (i++ > 0) {
+ name.append("/");
+ }
+ name.append(colName);
+ name.append("=");
+ name.append(part.get(colName));
+ }
+ return name.toString();
+ }
+
@Override
public String toString() {
return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
index 2719158c8e..ce96ce62e1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
@@ -26,9 +26,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
/**
* Factory class to create various MetastoreEvents.
@@ -42,31 +40,36 @@ public class MetastoreEventFactory implements EventFactory {
Preconditions.checkNotNull(event.getEventType());
MetastoreEventType metastoreEventType =
MetastoreEventType.from(event.getEventType());
switch (metastoreEventType) {
+ case CREATE_TABLE:
+ return CreateTableEvent.getEvents(event, catalogName);
case DROP_TABLE:
return DropTableEvent.getEvents(event, catalogName);
+ case ALTER_TABLE:
+ return AlterTableEvent.getEvents(event, catalogName);
+ case CREATE_DATABASE:
+ return CreateDatabaseEvent.getEvents(event, catalogName);
+ case DROP_DATABASE:
+ return DropDatabaseEvent.getEvents(event, catalogName);
+ case ALTER_DATABASE:
+ return AlterDatabaseEvent.getEvents(event, catalogName);
+ case ADD_PARTITION:
+ return AddPartitionEvent.getEvents(event, catalogName);
+ case DROP_PARTITION:
+ return DropPartitionEvent.getEvents(event, catalogName);
+ case ALTER_PARTITION:
+ return AlterPartitionEvent.getEvents(event, catalogName);
default:
// ignore all the unknown events by creating a IgnoredEvent
- return Lists.newArrayList(new IgnoredEvent(event,
catalogName));
+ return IgnoredEvent.getEvents(event, catalogName);
}
}
List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events,
HMSExternalCatalog hmsExternalCatalog) {
List<MetastoreEvent> metastoreEvents = Lists.newArrayList();
-
for (NotificationEvent event : events) {
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event,
hmsExternalCatalog.getName()));
}
-
- List<MetastoreEvent> tobeProcessEvents = metastoreEvents.stream()
- .filter(MetastoreEvent::isSupported)
- .collect(Collectors.toList());
-
- if (tobeProcessEvents.isEmpty()) {
- LOG.info("The metastore events to process is empty on catalog {}",
hmsExternalCatalog.getName());
- return Collections.emptyList();
- }
-
- return createBatchEvents(tobeProcessEvents);
+ return createBatchEvents(metastoreEvents);
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index fb8fbb5b99..d8aff54d11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -707,6 +707,12 @@ public class JournalEntity implements Writable {
}
case OperationType.OP_REFRESH_EXTERNAL_DB:
case OperationType.OP_DROP_EXTERNAL_TABLE:
+ case OperationType.OP_CREATE_EXTERNAL_TABLE:
+ case OperationType.OP_DROP_EXTERNAL_DB:
+ case OperationType.OP_CREATE_EXTERNAL_DB:
+ case OperationType.OP_ADD_EXTERNAL_PARTITIONS:
+ case OperationType.OP_DROP_EXTERNAL_PARTITIONS:
+ case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS:
case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
data = ExternalObjectLog.read(in);
isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 9254583294..66671c184f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -954,6 +954,36 @@ public class EditLog {
env.getCatalogMgr().replayDropExternalTable(log);
break;
}
+ case OperationType.OP_CREATE_EXTERNAL_TABLE: {
+ final ExternalObjectLog log = (ExternalObjectLog)
journal.getData();
+ env.getCatalogMgr().replayCreateExternalTable(log);
+ break;
+ }
+ case OperationType.OP_DROP_EXTERNAL_DB: {
+ final ExternalObjectLog log = (ExternalObjectLog)
journal.getData();
+ env.getCatalogMgr().replayDropExternalDatabase(log);
+ break;
+ }
+ case OperationType.OP_CREATE_EXTERNAL_DB: {
+ final ExternalObjectLog log = (ExternalObjectLog)
journal.getData();
+ env.getCatalogMgr().replayCreateExternalDatabase(log);
+ break;
+ }
+ case OperationType.OP_ADD_EXTERNAL_PARTITIONS: {
+ final ExternalObjectLog log = (ExternalObjectLog)
journal.getData();
+ env.getCatalogMgr().replayAddExternalPartitions(log);
+ break;
+ }
+ case OperationType.OP_DROP_EXTERNAL_PARTITIONS: {
+ final ExternalObjectLog log = (ExternalObjectLog)
journal.getData();
+ env.getCatalogMgr().replayDropExternalPartitions(log);
+ break;
+ }
+ case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: {
+ final ExternalObjectLog log = (ExternalObjectLog)
journal.getData();
+ env.getCatalogMgr().replayRefreshExternalPartitions(log);
+ break;
+ }
case OperationType.OP_INIT_EXTERNAL_TABLE: {
// Do nothing.
break;
@@ -1633,6 +1663,30 @@ public class EditLog {
logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log);
}
+ public void logCreateExternalTable(ExternalObjectLog log) {
+ logEdit(OperationType.OP_CREATE_EXTERNAL_TABLE, log);
+ }
+
+ public void logDropExternalDatabase(ExternalObjectLog log) {
+ logEdit(OperationType.OP_DROP_EXTERNAL_DB, log);
+ }
+
+ public void logCreateExternalDatabase(ExternalObjectLog log) {
+ logEdit(OperationType.OP_CREATE_EXTERNAL_DB, log);
+ }
+
+ public void logAddExternalPartitions(ExternalObjectLog log) {
+ logEdit(OperationType.OP_ADD_EXTERNAL_PARTITIONS, log);
+ }
+
+ public void logDropExternalPartitions(ExternalObjectLog log) {
+ logEdit(OperationType.OP_DROP_EXTERNAL_PARTITIONS, log);
+ }
+
+ public void logInvalidateExternalPartitions(ExternalObjectLog log) {
+ logEdit(OperationType.OP_REFRESH_EXTERNAL_PARTITIONS, log);
+ }
+
public Journal getJournal() {
return this.journal;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index c5889a8001..ee145f06f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -256,6 +256,12 @@ public class OperationType {
public static final short OP_ALTER_MTMV_TASK = 342;
public static final short OP_DROP_EXTERNAL_TABLE = 350;
+ public static final short OP_DROP_EXTERNAL_DB = 351;
+ public static final short OP_CREATE_EXTERNAL_TABLE = 352;
+ public static final short OP_CREATE_EXTERNAL_DB = 353;
+ public static final short OP_ADD_EXTERNAL_PARTITIONS = 354;
+ public static final short OP_DROP_EXTERNAL_PARTITIONS = 355;
+ public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356;
public static final short OP_ALTER_USER = 400;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java
index bbefea50e3..fcae3c4ecd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java
@@ -18,6 +18,7 @@
package org.apache.doris.planner;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.common.AnalysisException;
@@ -33,8 +34,10 @@ import com.google.common.collect.TreeRangeMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -59,7 +62,7 @@ public class ListPartitionPrunerV2 extends
PartitionPrunerV2Base {
super(idToPartitionItem, partitionColumns, columnNameToRange);
this.uidToPartitionRange = Maps.newHashMap();
if (partitionColumns.size() > 1) {
- this.uidToPartitionRange =
genUidToPartitionRange(idToPartitionItem);
+ this.uidToPartitionRange =
genUidToPartitionRange(idToPartitionItem, new HashMap<>());
this.rangeToId = genRangeToId(uidToPartitionRange);
}
}
@@ -77,16 +80,21 @@ public class ListPartitionPrunerV2 extends
PartitionPrunerV2Base {
}
public static Map<UniqueId, Range<PartitionKey>> genUidToPartitionRange(
- Map<Long, PartitionItem> idToPartitionItem) {
+ Map<Long, PartitionItem> idToPartitionItem, Map<Long,
List<UniqueId>> idToUniqueIdsMap) {
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange =
Maps.newHashMap();
idToPartitionItem.forEach((id, item) -> {
List<PartitionKey> keys = item.getItems();
List<Range<PartitionKey>> ranges = keys.stream()
.map(key -> Range.closed(key, key))
.collect(Collectors.toList());
+ List<UniqueId> uniqueIds = idToUniqueIdsMap.get(id) == null ? new
ArrayList<>(ranges.size())
+ : idToUniqueIdsMap.get(id);
for (int i = 0; i < ranges.size(); i++) {
- uidToPartitionRange.put(new ListPartitionUniqueId(id, i),
ranges.get(i));
+ ListPartitionUniqueId listPartitionUniqueId = new
ListPartitionUniqueId(id, i);
+ uidToPartitionRange.put(listPartitionUniqueId, ranges.get(i));
+ uniqueIds.add(listPartitionUniqueId);
}
+ idToUniqueIdsMap.put(id, uniqueIds);
});
return uidToPartitionRange;
}
@@ -94,21 +102,35 @@ public class ListPartitionPrunerV2 extends
PartitionPrunerV2Base {
@Override
void genSingleColumnRangeMap() {
if (singleColumnRangeMap == null) {
- singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem);
+ singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem,
new HashMap<>());
}
}
- public static RangeMap<ColumnBound, UniqueId>
genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem) {
+ public static Map<Long, List<String>> getPartitionValuesMap(Map<Long,
PartitionItem> idToPartitionItem) {
+ Map<Long, List<String>> partitionValuesMap = new HashMap<>();
+ for (Map.Entry<Long, PartitionItem> entry :
idToPartitionItem.entrySet()) {
+ partitionValuesMap.put(entry.getKey(),
+ ((ListPartitionItem)
entry.getValue()).getItems().get(0).getPartitionValuesAsStringList());
+ }
+ return partitionValuesMap;
+ }
+
+ public static RangeMap<ColumnBound, UniqueId>
genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem,
+ Map<Long, List<UniqueId>> idToUniqueIdsMap) {
RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create();
idToPartitionItem.forEach((id, item) -> {
List<PartitionKey> keys = item.getItems();
List<Range<PartitionKey>> ranges = keys.stream()
.map(key -> Range.closed(key, key))
.collect(Collectors.toList());
+ List<UniqueId> uniqueIds = idToUniqueIdsMap.get(id) == null ? new
ArrayList<>(ranges.size())
+ : idToUniqueIdsMap.get(id);
for (int i = 0; i < ranges.size(); i++) {
- candidate.put(mapPartitionKeyRange(ranges.get(i), 0),
- new ListPartitionUniqueId(id, i));
+ ListPartitionUniqueId listPartitionUniqueId = new
ListPartitionUniqueId(id, i);
+ candidate.put(mapPartitionKeyRange(ranges.get(i), 0),
listPartitionUniqueId);
+ uniqueIds.add(listPartitionUniqueId);
}
+ idToUniqueIdsMap.put(id, uniqueIds);
});
return candidate;
}
@@ -151,6 +173,13 @@ public class ListPartitionPrunerV2 extends
PartitionPrunerV2Base {
return rangeToId;
}
+ public static Map<UniqueId, Range<ColumnBound>> genSingleUidToColumnRange(
+ RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) {
+ Map<UniqueId, Range<ColumnBound>> uidToColumnRange = Maps.newHashMap();
+ singleColumnRangeMap.asMapOfRanges().forEach((columnBound, uid) ->
uidToColumnRange.put(uid, columnBound));
+ return uidToColumnRange;
+ }
+
private Collection<Long> doPruneMultiple(Map<Column, FinalFilters>
columnToFilters,
Map<Range<PartitionKey>, UniqueId> partitionRangeToUid,
int columnIdx) {
@@ -178,20 +207,20 @@ public class ListPartitionPrunerV2 extends
PartitionPrunerV2Base {
grouped.forEach(candidateRangeMap::put);
return finalFilters.filters.stream()
- .map(filter -> {
- RangeMap<ColumnBound, List<UniqueId>> filtered =
- candidateRangeMap.subRangeMap(filter);
- // Find PartitionKey ranges according to filtered
UniqueIds.
- Map<Range<PartitionKey>, UniqueId>
filteredPartitionRange =
- filtered.asMapOfRanges().values()
- .stream()
- .flatMap(List::stream)
-
.collect(Collectors.toMap(uidToPartitionRange::get, Function.identity()));
- return doPruneMultiple(columnToFilters,
filteredPartitionRange,
- columnIdx + 1);
- })
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
+ .map(filter -> {
+ RangeMap<ColumnBound, List<UniqueId>> filtered =
+ candidateRangeMap.subRangeMap(filter);
+ // Find PartitionKey ranges according to filtered
UniqueIds.
+ Map<Range<PartitionKey>, UniqueId>
filteredPartitionRange =
+ filtered.asMapOfRanges().values()
+ .stream()
+ .flatMap(List::stream)
+
.collect(Collectors.toMap(uidToPartitionRange::get, Function.identity()));
+ return doPruneMultiple(columnToFilters,
filteredPartitionRange,
+ columnIdx + 1);
+ })
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
case NO_FILTERS:
default:
return doPruneMultiple(columnToFilters, partitionRangeToUid,
columnIdx + 1);
@@ -215,9 +244,9 @@ public class ListPartitionPrunerV2 extends
PartitionPrunerV2Base {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("partitionId", partitionId)
- .add("partitionKeyIndex", partitionKeyIndex)
- .toString();
+ .add("partitionId", partitionId)
+ .add("partitionKeyIndex", partitionKeyIndex)
+ .toString();
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
index a75f332f70..f74bf032e5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
@@ -32,8 +32,12 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsResource;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ResourceMgr;
+import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.EsExternalTable;
import org.apache.doris.catalog.external.HMSExternalDatabase;
@@ -41,14 +45,23 @@ import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues;
+import
org.apache.doris.datasource.hive.HiveMetaStoreCache.PartitionValueCacheKey;
import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.planner.ColumnBound;
+import org.apache.doris.planner.ListPartitionPrunerV2;
+import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.utframe.TestWithFeService;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -58,6 +71,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -69,13 +83,14 @@ public class CatalogMgrTest extends TestWithFeService {
private static UserIdentity user2;
private CatalogMgr mgr;
private ResourceMgr resourceMgr;
+ private ExternalMetaCacheMgr externalMetaCacheMgr;
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
mgr = Env.getCurrentEnv().getCatalogMgr();
resourceMgr = Env.getCurrentEnv().getResourceMgr();
-
+ externalMetaCacheMgr = Env.getCurrentEnv().getExtMetaCacheMgr();
ConnectContext rootCtx = createDefaultCtx();
env = Env.getCurrentEnv();
auth = env.getAuth();
@@ -417,4 +432,119 @@ public class CatalogMgrTest extends TestWithFeService {
}
}
+ @Test
+ public void testAddMultiColumnPartitionsCache() {
+ HMSExternalCatalog hiveCatalog = (HMSExternalCatalog)
mgr.getCatalog("hive");
+ HiveMetaStoreCache metaStoreCache =
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
+ PartitionValueCacheKey partitionValueCacheKey = new
PartitionValueCacheKey("hiveDb", "hiveTable",
+ Lists.newArrayList(Type.INT, Type.SMALLINT));
+ HivePartitionValues hivePartitionValues =
loadPartitionValues(partitionValueCacheKey,
+ Lists.newArrayList("y=2020/m=1", "y=2020/m=2"),
metaStoreCache);
+ metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey,
hivePartitionValues);
+ metaStoreCache.addPartitionsCache("hiveDb", "hiveTable",
Lists.newArrayList("y=2020/m=3", "y=2020/m=4"),
+ partitionValueCacheKey.getTypes());
+ HivePartitionValues partitionValues =
metaStoreCache.getPartitionValues(partitionValueCacheKey);
+ Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(),
4);
+ }
+
+ @Test
+ public void testDropMultiColumnPartitionsCache() {
+ HMSExternalCatalog hiveCatalog = (HMSExternalCatalog)
mgr.getCatalog("hive");
+ HiveMetaStoreCache metaStoreCache =
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
+ PartitionValueCacheKey partitionValueCacheKey = new
PartitionValueCacheKey("hiveDb", "hiveTable",
+ Lists.newArrayList(Type.INT, Type.SMALLINT));
+ HivePartitionValues hivePartitionValues =
loadPartitionValues(partitionValueCacheKey,
+ Lists.newArrayList("y=2020/m=1", "y=2020/m=2"),
metaStoreCache);
+ metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey,
hivePartitionValues);
+ metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable",
Lists.newArrayList("y=2020/m=1", "y=2020/m=2"),
+ partitionValueCacheKey.getTypes(), false);
+ HivePartitionValues partitionValues =
metaStoreCache.getPartitionValues(partitionValueCacheKey);
+ Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(),
0);
+ }
+
+ @Test
+ public void testAddSingleColumnPartitionsCache() {
+ HMSExternalCatalog hiveCatalog = (HMSExternalCatalog)
mgr.getCatalog("hive");
+ HiveMetaStoreCache metaStoreCache =
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
+ PartitionValueCacheKey partitionValueCacheKey = new
PartitionValueCacheKey("hiveDb", "hiveTable",
+ Lists.newArrayList(Type.SMALLINT));
+ HivePartitionValues hivePartitionValues =
loadPartitionValues(partitionValueCacheKey,
+ Lists.newArrayList("m=1", "m=2"), metaStoreCache);
+ metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey,
hivePartitionValues);
+ metaStoreCache.addPartitionsCache("hiveDb", "hiveTable",
Lists.newArrayList("m=3", "m=4"),
+ partitionValueCacheKey.getTypes());
+ HivePartitionValues partitionValues =
metaStoreCache.getPartitionValues(partitionValueCacheKey);
+ Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(),
4);
+ }
+
+ @Test
+ public void testDropSingleColumnPartitionsCache() {
+ HMSExternalCatalog hiveCatalog = (HMSExternalCatalog)
mgr.getCatalog("hive");
+ HiveMetaStoreCache metaStoreCache =
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
+ PartitionValueCacheKey partitionValueCacheKey = new
PartitionValueCacheKey("hiveDb", "hiveTable",
+ Lists.newArrayList(Type.SMALLINT));
+ HivePartitionValues hivePartitionValues =
loadPartitionValues(partitionValueCacheKey,
+ Lists.newArrayList("m=1", "m=2"), metaStoreCache);
+ metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey,
hivePartitionValues);
+ metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable",
Lists.newArrayList("m=1", "m=2"),
+ partitionValueCacheKey.getTypes(), false);
+ HivePartitionValues partitionValues =
metaStoreCache.getPartitionValues(partitionValueCacheKey);
+ Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(),
0);
+ }
+
+ @Test
+ public void testAddPartitionsCacheToLargeTable() {
+ HMSExternalCatalog hiveCatalog = (HMSExternalCatalog)
mgr.getCatalog("hive");
+ HiveMetaStoreCache metaStoreCache =
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
+ PartitionValueCacheKey partitionValueCacheKey = new
PartitionValueCacheKey("hiveDb", "hiveTable",
+ Lists.newArrayList(Type.INT));
+ List<String> pNames = new ArrayList<>(100000);
+ for (int i = 1; i <= 100000; i++) {
+ pNames.add("m=" + i);
+ }
+ HivePartitionValues hivePartitionValues =
loadPartitionValues(partitionValueCacheKey,
+ pNames, metaStoreCache);
+ metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey,
hivePartitionValues);
+ long start = System.currentTimeMillis();
+ metaStoreCache.addPartitionsCache("hiveDb", "hiveTable",
Lists.newArrayList("m=100001"),
+ partitionValueCacheKey.getTypes());
+ //387 in 4c16g
+ System.out.println("testAddPartitionsCacheToLargeTable use time
mills:" + (System.currentTimeMillis() - start));
+ HivePartitionValues partitionValues =
metaStoreCache.getPartitionValues(partitionValueCacheKey);
+ Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(),
100001);
+ }
+
+ private HivePartitionValues loadPartitionValues(PartitionValueCacheKey
key, List<String> partitionNames,
+ HiveMetaStoreCache metaStoreCache) {
+ // partition name format: nation=cn/city=beijing
+ Map<Long, PartitionItem> idToPartitionItem =
Maps.newHashMapWithExpectedSize(partitionNames.size());
+ Map<String, Long> partitionNameToIdMap =
Maps.newHashMapWithExpectedSize(partitionNames.size());
+ Map<Long, List<UniqueId>> idToUniqueIdsMap =
Maps.newHashMapWithExpectedSize(partitionNames.size());
+ long idx = 0;
+ for (String partitionName : partitionNames) {
+ long partitionId = idx++;
+ ListPartitionItem listPartitionItem =
metaStoreCache.toListPartitionItem(partitionName, key.getTypes());
+ idToPartitionItem.put(partitionId, listPartitionItem);
+ partitionNameToIdMap.put(partitionName, partitionId);
+ }
+
+ Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = null;
+ Map<Range<PartitionKey>, UniqueId> rangeToId = null;
+ RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null;
+ Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = null;
+ if (key.getTypes().size() > 1) {
+ // uidToPartitionRange and rangeToId are only used for
multi-column partition
+ uidToPartitionRange =
ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem,
idToUniqueIdsMap);
+ rangeToId =
ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
+ } else {
+ Preconditions.checkState(key.getTypes().size() == 1,
key.getTypes());
+ // singleColumnRangeMap is only used for single-column partition
+ singleColumnRangeMap =
ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem,
idToUniqueIdsMap);
+ singleUidToColumnRangeMap =
ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap);
+ }
+ Map<Long, List<String>> partitionValuesMap =
ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
+ return new HivePartitionValues(idToPartitionItem, uidToPartitionRange,
rangeToId, singleColumnRangeMap, idx,
+ partitionNameToIdMap, idToUniqueIdsMap,
singleUidToColumnRangeMap, partitionValuesMap);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]