This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8abd00dcd5 [feature-wip](multi-catalog) Add catalog name to
information schema. (#10349)
8abd00dcd5 is described below
commit 8abd00dcd5ce367b3871143313aa2d4af0bd2fb3
Author: Jibing-Li <[email protected]>
AuthorDate: Sat Jun 25 11:53:04 2022 +0800
[feature-wip](multi-catalog) Add catalog name to information schema.
(#10349)
Information schema database need to show catalog name after multi-catalog
is supported.
This part is step 1, add catalog name for schemata table.
---
.../schema_scanner/schema_schemata_scanner.cpp | 9 +-
.../exec/schema_scanner/schema_tables_scanner.cpp | 13 +-
.../java/org/apache/doris/catalog/TableIf.java | 16 +++
.../doris/catalog/external/ExternalTable.java | 44 +++++++
.../doris/catalog/external/HMSExternalTable.java | 53 +++++++++
.../org/apache/doris/datasource/DataSourceMgr.java | 9 ++
.../apache/doris/service/FrontendServiceImpl.java | 131 ++++++++++++---------
gensrc/thrift/FrontendService.thrift | 4 +-
8 files changed, 219 insertions(+), 60 deletions(-)
diff --git a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp
b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp
index d4a69b37b4..cd794bdeaa 100644
--- a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp
@@ -72,7 +72,14 @@ Status SchemaSchemataScanner::fill_one_row(Tuple* tuple,
MemPool* pool) {
memset((void*)tuple, 0, _tuple_desc->num_null_bytes());
// catalog
- { tuple->set_null(_tuple_desc->slots()[0]->null_indicator_offset()); }
+ {
+ void* slot = tuple->get_slot(_tuple_desc->slots()[0]->tuple_offset());
+ StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
+ std::string catalog_name = _db_result.catalogs[_db_index];
+ str_slot->ptr = (char*)pool->allocate(catalog_name.size());
+ str_slot->len = catalog_name.size();
+ memcpy(str_slot->ptr, catalog_name.c_str(), str_slot->len);
+ }
// schema
{
void* slot = tuple->get_slot(_tuple_desc->slots()[1]->tuple_offset());
diff --git a/be/src/exec/schema_scanner/schema_tables_scanner.cpp
b/be/src/exec/schema_scanner/schema_tables_scanner.cpp
index 580e6cbc41..142200f9ed 100644
--- a/be/src/exec/schema_scanner/schema_tables_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_tables_scanner.cpp
@@ -89,7 +89,14 @@ Status SchemaTablesScanner::fill_one_row(Tuple* tuple,
MemPool* pool) {
memset((void*)tuple, 0, _tuple_desc->num_null_bytes());
const TTableStatus& tbl_status = _table_result.tables[_table_index];
// catalog
- { tuple->set_null(_tuple_desc->slots()[0]->null_indicator_offset()); }
+ {
+ void* slot = tuple->get_slot(_tuple_desc->slots()[0]->tuple_offset());
+ StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
+ std::string catalog_name = _db_result.catalogs[_db_index - 1];
+ str_slot->ptr = (char*)pool->allocate(catalog_name.size());
+ str_slot->len = catalog_name.size();
+ memcpy(str_slot->ptr, catalog_name.c_str(), str_slot->len);
+ }
// schema
{
void* slot = tuple->get_slot(_tuple_desc->slots()[1]->tuple_offset());
@@ -245,7 +252,9 @@ Status SchemaTablesScanner::fill_one_row(Tuple* tuple,
MemPool* pool) {
Status SchemaTablesScanner::get_new_table() {
TGetTablesParams table_params;
- table_params.__set_db(_db_result.dbs[_db_index++]);
+ table_params.__set_db(_db_result.dbs[_db_index]);
+ table_params.__set_catalog(_db_result.catalogs[_db_index]);
+ _db_index++;
if (nullptr != _param->wild) {
table_params.__set_pattern(*(_param->wild));
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index d69bbe3213..01efa961ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -72,6 +72,22 @@ public interface TableIf {
Column getColumn(String name);
+ String getMysqlType();
+
+ String getEngine();
+
+ String getComment();
+
+ long getCreateTime();
+
+ long getUpdateTime();
+
+ long getRowCount();
+
+ long getDataLength();
+
+ long getAvgRowLength();
+
/**
* Doris table type.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
index 7a96dc970e..ab5a3fa6ef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
@@ -200,4 +200,48 @@ public class ExternalTable implements TableIf {
public Column getColumn(String name) {
throw new NotImplementedException();
}
+
+ @Override
+ public String getMysqlType() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getEngine() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getComment() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public long getCreateTime() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public long getUpdateTime() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public long getRowCount() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public long getDataLength() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public long getAvgRowLength() {
+ throw new NotImplementedException();
+ }
+
+ public long getLastCheckTime() {
+ throw new NotImplementedException();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index c4167b89df..9c6e9b12fe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -132,6 +132,59 @@ public class HMSExternalTable extends ExternalTable {
return null;
}
+ @Override
+ public String getMysqlType() {
+ return type.name();
+ }
+
+ @Override
+ public String getEngine() {
+ switch (type) {
+ case HIVE:
+ return "Hive";
+ case ICEBERG:
+ return "Iceberg";
+ case HUDI:
+ return "Hudi";
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public String getComment() {
+ return "";
+ }
+
+ @Override
+ public long getCreateTime() {
+ return 0;
+ }
+
+ @Override
+ public long getUpdateTime() {
+ return 0;
+ }
+
+ @Override
+ public long getRowCount() {
+ return 0;
+ }
+
+ @Override
+ public long getDataLength() {
+ return 0;
+ }
+
+ @Override
+ public long getAvgRowLength() {
+ return 0;
+ }
+
+ public long getLastCheckTime() {
+ return 0;
+ }
+
/**
* get database name of hms table.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
index d77cc9a18d..8f04e035b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
@@ -47,6 +47,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
/**
* DataSourceMgr will load all data sources at FE startup,
@@ -78,6 +79,10 @@ public class DataSourceMgr implements Writable {
return internalDataSource;
}
+ public DataSourceIf getCatalog(String name) {
+ return nameToCatalogs.get(name);
+ }
+
private void writeLock() {
lock.writeLock().lock();
}
@@ -241,6 +246,10 @@ public class DataSourceMgr implements Writable {
}
}
+ public List<DataSourceIf> listCatalogs() {
+ return nameToCatalogs.values().stream().collect(Collectors.toList());
+ }
+
/**
* Reply for alter catalog props event.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 67aa2baff0..b8419215a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -22,9 +22,12 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
@@ -38,6 +41,7 @@ import org.apache.doris.common.ThriftServerContext;
import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
+import org.apache.doris.datasource.DataSourceIf;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.MiniEtlTaskInfo;
@@ -150,6 +154,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
TGetDbsResult result = new TGetDbsResult();
List<String> dbs = Lists.newArrayList();
+ List<String> catalogs = Lists.newArrayList();
PatternMatcher matcher = null;
if (params.isSetPattern()) {
try {
@@ -161,28 +166,34 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
Catalog catalog = Catalog.getCurrentCatalog();
- List<String> dbNames = catalog.getDbNames();
- LOG.debug("get db names: {}", dbNames);
-
- UserIdentity currentUser = null;
- if (params.isSetCurrentUserIdent()) {
- currentUser = UserIdentity.fromThrift(params.current_user_ident);
- } else {
- currentUser =
UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
- }
- for (String fullName : dbNames) {
- if (!catalog.getAuth().checkDbPriv(currentUser, fullName,
PrivPredicate.SHOW)) {
- continue;
+ List<DataSourceIf> dataSourceIfs =
catalog.getDataSourceMgr().listCatalogs();
+ for (DataSourceIf ds : dataSourceIfs) {
+ List<String> dbNames = ds.getDbNames();
+ LOG.debug("get db names: {}, in data source: {}", dbNames,
ds.getName());
+
+ UserIdentity currentUser = null;
+ if (params.isSetCurrentUserIdent()) {
+ currentUser =
UserIdentity.fromThrift(params.current_user_ident);
+ } else {
+ currentUser =
UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
+ for (String fullName : dbNames) {
+ if (!catalog.getAuth().checkDbPriv(currentUser, fullName,
PrivPredicate.SHOW)) {
+ continue;
+ }
- final String db = ClusterNamespace.getNameFromFullName(fullName);
- if (matcher != null && !matcher.match(db)) {
- continue;
- }
+ final String db =
ClusterNamespace.getNameFromFullName(fullName);
+ if (matcher != null && !matcher.match(db)) {
+ continue;
+ }
- dbs.add(fullName);
+ catalogs.add(ds.getName());
+ dbs.add(fullName);
+ }
}
+
result.setDbs(dbs);
+ result.setCatalogs(catalogs);
return result;
}
@@ -243,7 +254,6 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
throw new TException("Pattern is in bad format " +
params.getPattern());
}
}
-
// database privs should be checked in analysis phrase
UserIdentity currentUser;
@@ -252,51 +262,60 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
} else {
currentUser =
UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
- Database db = Catalog.getCurrentCatalog().getDbNullable(params.db);
- if (db != null) {
- List<Table> tables = null;
- if (!params.isSetType() || params.getType() == null ||
params.getType().isEmpty()) {
- tables = db.getTables();
- } else {
- switch (params.getType()) {
- case "VIEW":
- tables = db.getViews();
- break;
- default:
- tables = db.getTables();
- }
- }
- for (Table table : tables) {
- if
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db,
- table.getName(), PrivPredicate.SHOW)) {
- continue;
+ DataSourceIf ds =
Catalog.getCurrentCatalog().getDataSourceMgr().getCatalog(params.catalog);
+ if (ds != null) {
+ DatabaseIf db = ds.getDbNullable(params.db);
+ if (db != null) {
+ List<TableIf> tables = null;
+ if (!params.isSetType() || params.getType() == null ||
params.getType().isEmpty()) {
+ tables = db.getTables();
+ } else {
+ switch (params.getType()) {
+ case "VIEW":
+ tables = db.getViews();
+ break;
+ default:
+ tables = db.getTables();
+ }
}
- table.readLock();
- try {
+ for (TableIf table : tables) {
if
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db,
table.getName(), PrivPredicate.SHOW)) {
continue;
}
+ table.readLock();
+ try {
+ if
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(currentUser, params.db,
+ table.getName(), PrivPredicate.SHOW)) {
+ continue;
+ }
- if (matcher != null && !matcher.match(table.getName())) {
- continue;
+ if (matcher != null &&
!matcher.match(table.getName())) {
+ continue;
+ }
+ long lastCheckTime = 0;
+ if (table instanceof Table) {
+ lastCheckTime = ((Table) table).getLastCheckTime();
+ } else {
+ lastCheckTime = ((ExternalTable)
table).getLastCheckTime();
+ }
+ TTableStatus status = new TTableStatus();
+ status.setName(table.getName());
+ status.setType(table.getMysqlType());
+ status.setEngine(table.getEngine());
+ status.setComment(table.getComment());
+ status.setCreateTime(table.getCreateTime());
+ status.setLastCheckTime(lastCheckTime);
+ status.setUpdateTime(table.getUpdateTime() / 1000);
+ status.setCheckTime(lastCheckTime);
+ status.setCollation("utf-8");
+ status.setRows(table.getRowCount());
+ status.setDataLength(table.getDataLength());
+ status.setAvgRowLength(table.getAvgRowLength());
+ tablesResult.add(status);
+ } finally {
+ table.readUnlock();
}
- TTableStatus status = new TTableStatus();
- status.setName(table.getName());
- status.setType(table.getMysqlType());
- status.setEngine(table.getEngine());
- status.setComment(table.getComment());
- status.setCreateTime(table.getCreateTime());
- status.setLastCheckTime(table.getLastCheckTime());
- status.setUpdateTime(table.getUpdateTime() / 1000);
- status.setCheckTime(table.getLastCheckTime());
- status.setCollation("utf-8");
- status.setRows(table.getRowCount());
- status.setDataLength(table.getDataLength());
- status.setAvgRowLength(table.getAvgRowLength());
- tablesResult.add(status);
- } finally {
- table.readUnlock();
}
}
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index f195a11722..ae3018d8a4 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -287,9 +287,10 @@ struct TGetDbsParams {
4: optional Types.TUserIdentity current_user_ident // to replace the user
and user ip
}
-// getDbNames returns a list of database names
+// getDbNames returns a list of database names and catalog names
struct TGetDbsResult {
1: list<string> dbs
+ 2: list<string> catalogs
}
// Arguments to getTableNames, which returns a list of tables that match an
@@ -304,6 +305,7 @@ struct TGetTablesParams {
4: optional string user_ip // deprecated
5: optional Types.TUserIdentity current_user_ident // to replace the user
and user ip
6: optional string type
+ 7: optional string catalog
}
struct TTableStatus {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]