morningman commented on code in PR #30198:
URL: https://github.com/apache/doris/pull/30198#discussion_r1466164146
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java:
##########
@@ -227,6 +239,154 @@ public void notifyPropertiesUpdated(Map<String, String>
updatedProps) {
}
}
+ private boolean tryLock(boolean mustLock) {
+ while (true) {
+ try {
+ if (!lock.tryLock(Config.catalog_try_lock_timeout_ms,
TimeUnit.MILLISECONDS)) {
+ // to see which thread held this lock for long time.
+ Thread owner = lock.getOwner();
+ if (owner != null) {
+ // There are many catalog timeout during regression
test
+ // And this timeout should not happen very often, so
it could be info log
+ LOG.info("catalog lock is held by: {}",
Util.dumpThread(owner, 10));
+ }
+
+ if (mustLock) {
+ continue;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ } catch (InterruptedException e) {
+ LOG.warn("got exception while getting catalog lock", e);
+ if (mustLock) {
+ continue;
+ } else {
+ return lock.isHeldByCurrentThread();
+ }
+ }
+ }
+ }
+
+ private void unlock() {
+ if (lock.isHeldByCurrentThread()) {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public void createDb(CreateDbStmt stmt) throws DdlException {
+ String fullDbName = stmt.getFullDbName();
+ Map<String, String> properties = stmt.getProperties();
+ long id = Env.getCurrentEnv().getNextId();
+
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try
again");
+ }
+ try {
+ HiveCatalogDatabase catalogDatabase = new HiveCatalogDatabase();
+ catalogDatabase.setDbName(fullDbName);
+ catalogDatabase.setProperties(properties);
+ if (properties.containsKey("location_uri")) {
+ catalogDatabase.setLocationUri(properties.get("location_uri"));
+ }
+ catalogDatabase.setComment(properties.getOrDefault("comment", ""));
+ client.createDatabase(catalogDatabase);
+ addDatabase(id, fullDbName);
+ } finally {
+ unlock();
+ }
+ LOG.info("createDb dbName = " + fullDbName + ", id = " + id);
+ }
+
+ public void dropDb(DropDbStmt stmt) throws DdlException {
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try
again");
+ }
+ String dbName = stmt.getDbName();
+ try {
+ client.dropDatabase(dbName);
+ refreshDb(dbName);
+ } finally {
+ unlock();
+ }
+ }
+
+ private void refreshDb(String dbName) {
+ Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(getId(),
dbName);
+ removeDatabase(dbName);
+ }
+
+ @Override
+ public void createTable(CreateTableStmt stmt) throws UserException {
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try
again");
+ }
+ String dbName = stmt.getDbName();
+ String tblName = stmt.getTableName();
+ ExternalDatabase<?> db = getDbNullable(dbName);
+ if (db == null) {
+ throw new UserException("Failed to get database: '" + dbName + "'
in catalog: " + this.getName());
+ }
+ try {
+ HiveCatalogTable catalogTable = new HiveCatalogTable();
+ catalogTable.setDbName(dbName);
+ catalogTable.setTableName(tblName);
+ Map<String, String> props = stmt.getExtProperties();
+ catalogTable.setProperties(props);
+ String inputFormat = props.getOrDefault("input_format",
+ "org.apache.hadoop.mapred.TextInputFormat");
+ String outputFormat = props.getOrDefault("output_format",
+
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat");
+ catalogTable.setInputFormat(inputFormat);
+ catalogTable.setOutputFormat(outputFormat);
+ catalogTable.setPartitionKeys(parsePartitionKeys(props));
+ client.createTable(catalogTable, stmt.isSetIfNotExists());
+ long tableId =
Env.getCurrentEnv().getExternalMetaIdMgr().getTblId(getId(), dbName, tblName);
+ if (tableId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) {
+ return;
+ }
+ refreshTable(db, dbName, tblName, tableId);
Review Comment:
How to sync to other fe?
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java:
##########
@@ -227,6 +239,154 @@ public void notifyPropertiesUpdated(Map<String, String>
updatedProps) {
}
}
+ private boolean tryLock(boolean mustLock) {
+ while (true) {
+ try {
+ if (!lock.tryLock(Config.catalog_try_lock_timeout_ms,
TimeUnit.MILLISECONDS)) {
+ // to see which thread held this lock for long time.
+ Thread owner = lock.getOwner();
+ if (owner != null) {
+ // There are many catalog timeout during regression
test
+ // And this timeout should not happen very often, so
it could be info log
+ LOG.info("catalog lock is held by: {}",
Util.dumpThread(owner, 10));
+ }
+
+ if (mustLock) {
+ continue;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ } catch (InterruptedException e) {
+ LOG.warn("got exception while getting catalog lock", e);
+ if (mustLock) {
+ continue;
+ } else {
+ return lock.isHeldByCurrentThread();
+ }
+ }
+ }
+ }
+
+ private void unlock() {
+ if (lock.isHeldByCurrentThread()) {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public void createDb(CreateDbStmt stmt) throws DdlException {
+ String fullDbName = stmt.getFullDbName();
+ Map<String, String> properties = stmt.getProperties();
+ long id = Env.getCurrentEnv().getNextId();
+
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try
again");
+ }
+ try {
+ HiveCatalogDatabase catalogDatabase = new HiveCatalogDatabase();
+ catalogDatabase.setDbName(fullDbName);
+ catalogDatabase.setProperties(properties);
+ if (properties.containsKey("location_uri")) {
+ catalogDatabase.setLocationUri(properties.get("location_uri"));
+ }
+ catalogDatabase.setComment(properties.getOrDefault("comment", ""));
+ client.createDatabase(catalogDatabase);
+ addDatabase(id, fullDbName);
+ } finally {
+ unlock();
+ }
+ LOG.info("createDb dbName = " + fullDbName + ", id = " + id);
+ }
+
+ public void dropDb(DropDbStmt stmt) throws DdlException {
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try
again");
+ }
+ String dbName = stmt.getDbName();
+ try {
+ client.dropDatabase(dbName);
+ refreshDb(dbName);
+ } finally {
+ unlock();
+ }
+ }
+
+ private void refreshDb(String dbName) {
+ Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(getId(),
dbName);
+ removeDatabase(dbName);
+ }
+
+ @Override
+ public void createTable(CreateTableStmt stmt) throws UserException {
+ if (!tryLock(false)) {
+ throw new DdlException("Failed to acquire catalog lock. Try
again");
+ }
+ String dbName = stmt.getDbName();
+ String tblName = stmt.getTableName();
+ ExternalDatabase<?> db = getDbNullable(dbName);
+ if (db == null) {
+ throw new UserException("Failed to get database: '" + dbName + "'
in catalog: " + this.getName());
+ }
+ try {
+ HiveCatalogTable catalogTable = new HiveCatalogTable();
+ catalogTable.setDbName(dbName);
+ catalogTable.setTableName(tblName);
+ Map<String, String> props = stmt.getExtProperties();
+ catalogTable.setProperties(props);
+ String inputFormat = props.getOrDefault("input_format",
+ "org.apache.hadoop.mapred.TextInputFormat");
+ String outputFormat = props.getOrDefault("output_format",
+
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat");
+ catalogTable.setInputFormat(inputFormat);
+ catalogTable.setOutputFormat(outputFormat);
+ catalogTable.setPartitionKeys(parsePartitionKeys(props));
+ client.createTable(catalogTable, stmt.isSetIfNotExists());
+ long tableId =
Env.getCurrentEnv().getExternalMetaIdMgr().getTblId(getId(), dbName, tblName);
+ if (tableId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) {
+ return;
+ }
+ refreshTable(db, dbName, tblName, tableId);
+ } finally {
+ unlock();
+ }
+ }
+
+ private void refreshTable(ExternalDatabase<?> db, String dbName, String
tblName, long tableId) {
+ Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(getId(),
dbName, tblName);
Review Comment:
This is newly created table, why need to invalidateTableCache?
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java:
##########
@@ -211,7 +223,7 @@ public void dropDatabase(String dbName) {
}
@Override
- public void createDatabase(long dbId, String dbName) {
+ public void addDatabase(long dbId, String dbName) {
Review Comment:
How to sync to other FE?
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java:
##########
@@ -559,11 +564,33 @@ public void addDatabaseForTest(ExternalDatabase<? extends
ExternalTable> db) {
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()),
db.getId());
}
- public void dropDatabase(String dbName) {
+ public void createDb(CreateDbStmt stmt) throws DdlException {
throw new NotImplementedException("dropDatabase not implemented");
}
- public void createDatabase(long dbId, String dbName) {
+ public void dropDb(DropDbStmt stmt) throws DdlException {
Review Comment:
Add @Override for this.
Also add in InternalCatalog
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java:
##########
@@ -117,6 +123,107 @@ public List<String> getAllTables(String dbName) {
}
}
+ @Override
+ public void createDatabase(CatalogDatabase db) {
+ try (ThriftHMSClient client = getClient()) {
+ try {
+ if (db instanceof HiveCatalogDatabase) {
Review Comment:
Implement a `toHMSDatabase()` method for `HiveCatalogDatabase`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]