This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new aa24c8f973 [Feature](multi-catalog) Support refresh catalog metadata
(#11656)
aa24c8f973 is described below
commit aa24c8f973b544e71ef5f2f4b3fe305e3185ca61
Author: Stalary <[email protected]>
AuthorDate: Thu Aug 11 15:14:05 2022 +0800
[Feature](multi-catalog) Support refresh catalog metadata (#11656)
---
fe/fe-core/src/main/cup/sql_parser.cup | 4 ++
.../apache/doris/analysis/RefreshCatalogStmt.java | 67 ++++++++++++++++++++++
.../apache/doris/datasource/CatalogFactory.java | 3 +
.../org/apache/doris/datasource/DataSourceMgr.java | 49 ++++++++++++++--
.../doris/datasource/EsExternalDataSource.java | 2 -
.../doris/datasource/ExternalDataSource.java | 1 +
.../doris/datasource/HMSExternalDataSource.java | 1 -
.../org/apache/doris/journal/JournalEntity.java | 8 +--
.../java/org/apache/doris/persist/EditLog.java | 21 +++++--
.../org/apache/doris/persist/OperationType.java | 20 ++++---
.../main/java/org/apache/doris/qe/DdlExecutor.java | 13 ++++-
11 files changed, 163 insertions(+), 26 deletions(-)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 9ca18f39c0..96ad0f0805 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -808,6 +808,10 @@ refresh_stmt ::=
{:
RESULT = new RefreshMaterializedViewStmt(mv,
MVRefreshInfo.RefreshMethod.COMPLETE);
:}
+ | KW_REFRESH KW_CATALOG ident:catalogName
+ {:
+ RESULT = new RefreshCatalogStmt(catalogName);
+ :}
;
clean_stmt ::=
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java
new file mode 100644
index 0000000000..d2f27fec19
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.InternalDataSource;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+/**
+ * RefreshCatalogStmt
+ * Manually refresh the catalog metadata.
+ */
+public class RefreshCatalogStmt extends DdlStmt {
+
+ private final String catalogName;
+
+ public RefreshCatalogStmt(String catalogName) {
+ this.catalogName = catalogName;
+ }
+
+ public String getCatalogName() {
+ return catalogName;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+ Util.checkCatalogAllRules(catalogName);
+ if (catalogName.equals(InternalDataSource.INTERNAL_DS_NAME)) {
+ throw new AnalysisException("Internal catalog name can't be
refresh.");
+ }
+
+ if (!Env.getCurrentEnv().getAuth().checkCtlPriv(
+ ConnectContext.get(), catalogName, PrivPredicate.ALTER)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_CATALOG_ACCESS_DENIED,
+ analyzer.getQualifiedUser(), catalogName);
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("REFRESH CATALOG
").append("`").append(catalogName).append("`");
+ return stringBuilder.toString();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index f3669d325b..8b110f80b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.AlterCatalogNameStmt;
import org.apache.doris.analysis.AlterCatalogPropertyStmt;
import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.DropCatalogStmt;
+import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.common.DdlException;
@@ -48,6 +49,8 @@ public class CatalogFactory {
} else if (stmt instanceof AlterCatalogNameStmt) {
log.setCatalogId(catalogId);
log.setNewCatalogName(((AlterCatalogNameStmt)
stmt).getNewCatalogName());
+ } else if (stmt instanceof RefreshCatalogStmt) {
+ log.setCatalogId(catalogId);
} else {
throw new RuntimeException("Unknown stmt for datasource manager "
+ stmt.getClass().getSimpleName());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
index c9e58be38f..22a9e00efd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.AlterCatalogNameStmt;
import org.apache.doris.analysis.AlterCatalogPropertyStmt;
import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.DropCatalogStmt;
+import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.ShowCatalogStmt;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@@ -95,6 +96,16 @@ public class DataSourceMgr implements Writable,
GsonPostProcessable {
return catalog;
}
+ private void unprotectedRefreshCatalog(long catalogId) {
+ DataSourceIf catalog = idToCatalog.get(catalogId);
+ if (catalog != null) {
+ String catalogName = catalog.getName();
+ if (!catalogName.equals(InternalDataSource.INTERNAL_DS_NAME)) {
+ ((ExternalDataSource) catalog).setInitialized(false);
+ }
+ }
+ }
+
public InternalDataSource getInternalDataSource() {
return internalDataSource;
}
@@ -192,7 +203,7 @@ public class DataSourceMgr implements Writable,
GsonPostProcessable {
long id = Env.getCurrentEnv().getNextId();
CatalogLog log = CatalogFactory.constructorCatalogLog(id, stmt);
replayCreateCatalog(log);
-
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_CREATE_DS,
log);
+
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_CREATE_CATALOG,
log);
} finally {
writeUnlock();
}
@@ -214,7 +225,7 @@ public class DataSourceMgr implements Writable,
GsonPostProcessable {
}
CatalogLog log =
CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
replayDropCatalog(log);
-
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_DROP_DS,
log);
+
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_DROP_CATALOG,
log);
} finally {
writeUnlock();
}
@@ -232,7 +243,7 @@ public class DataSourceMgr implements Writable,
GsonPostProcessable {
}
CatalogLog log =
CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
replayAlterCatalogName(log);
-
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_DS_NAME,
log);
+
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_CATALOG_NAME,
log);
} finally {
writeUnlock();
}
@@ -253,7 +264,7 @@ public class DataSourceMgr implements Writable,
GsonPostProcessable {
}
CatalogLog log =
CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
replayAlterCatalogProps(log);
-
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_DS_PROPS,
log);
+
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_CATALOG_PROPS,
log);
} finally {
writeUnlock();
}
@@ -308,6 +319,24 @@ public class DataSourceMgr implements Writable,
GsonPostProcessable {
return new ShowResultSet(showStmt.getMetaData(), rows);
}
+ /**
+ * Refresh the catalog meta and write the meta log.
+ */
+ public void refreshCatalog(RefreshCatalogStmt stmt) throws UserException {
+ writeLock();
+ try {
+ DataSourceIf catalog = nameToCatalog.get(stmt.getCatalogName());
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
stmt.getCatalogName());
+ }
+ CatalogLog log =
CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
+ replayRefreshCatalog(log);
+
Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_REFRESH_CATALOG,
log);
+ } finally {
+ writeUnlock();
+ }
+ }
+
/**
* Reply for create catalog event.
*/
@@ -333,6 +362,18 @@ public class DataSourceMgr implements Writable,
GsonPostProcessable {
}
}
+ /**
+ * Reply for refresh catalog event.
+ */
+ public void replayRefreshCatalog(CatalogLog log) throws DdlException {
+ writeLock();
+ try {
+ unprotectedRefreshCatalog(log.getCatalogId());
+ } finally {
+ writeUnlock();
+ }
+ }
+
/**
* Reply for alter catalog name event.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
index fd5b3fb077..cf557df762 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java
@@ -60,8 +60,6 @@ public class EsExternalDataSource extends ExternalDataSource {
private EsRestClient esRestClient;
- private boolean initialized = false;
-
private String[] nodes;
private String username = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
index 79223498f8..a0f376da75 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java
@@ -49,6 +49,7 @@ public abstract class ExternalDataSource implements
DataSourceIf<ExternalDatabas
// save properties of this data source, such as hive meta store url.
@SerializedName(value = "dsProperty")
protected DataSourceProperty dsProperty = new DataSourceProperty();
+ protected boolean initialized = false;
/**
* @return names of database in this data source.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
index 293266bdb4..fa4087b3f3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java
@@ -44,7 +44,6 @@ public class HMSExternalDataSource extends ExternalDataSource
{
// Cache of db name to db id.
private Map<String, Long> dbNameToId;
private Map<Long, HMSExternalDatabase> idToDb;
- private boolean initialized = false;
protected HiveMetaStoreClient client;
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 19556ce614..a724518386 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -667,10 +667,10 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
- case OperationType.OP_CREATE_DS:
- case OperationType.OP_DROP_DS:
- case OperationType.OP_ALTER_DS_NAME:
- case OperationType.OP_ALTER_DS_PROPS: {
+ case OperationType.OP_CREATE_CATALOG:
+ case OperationType.OP_DROP_CATALOG:
+ case OperationType.OP_ALTER_CATALOG_NAME:
+ case OperationType.OP_ALTER_CATALOG_PROPS: {
data = CatalogLog.read(in);
isRead = true;
break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 9da6cdd656..9bc1ec48e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -95,6 +95,9 @@ public class EditLog {
private Journal journal;
+ /**
+ * The constructor.
+ **/
public EditLog(String nodeName) {
String journalType = Config.edit_log_type;
if (journalType.equalsIgnoreCase("bdb")) {
@@ -134,6 +137,9 @@ public class EditLog {
return journal == null ? 0 : 1;
}
+ /**
+ * Load journal.
+ **/
public static void loadJournal(Env env, JournalEntity journal) {
short opCode = journal.getOpCode();
if (opCode != OperationType.OP_SAVE_NEXTID && opCode !=
OperationType.OP_TIMESTAMP) {
@@ -838,26 +844,31 @@ public class EditLog {
env.getPolicyMgr().replayStoragePolicyAlter(log);
break;
}
- case OperationType.OP_CREATE_DS: {
+ case OperationType.OP_CREATE_CATALOG: {
CatalogLog log = (CatalogLog) journal.getData();
env.getDataSourceMgr().replayCreateCatalog(log);
break;
}
- case OperationType.OP_DROP_DS: {
+ case OperationType.OP_DROP_CATALOG: {
CatalogLog log = (CatalogLog) journal.getData();
env.getDataSourceMgr().replayDropCatalog(log);
break;
}
- case OperationType.OP_ALTER_DS_NAME: {
+ case OperationType.OP_ALTER_CATALOG_NAME: {
CatalogLog log = (CatalogLog) journal.getData();
env.getDataSourceMgr().replayAlterCatalogName(log);
break;
}
- case OperationType.OP_ALTER_DS_PROPS: {
+ case OperationType.OP_ALTER_CATALOG_PROPS: {
CatalogLog log = (CatalogLog) journal.getData();
env.getDataSourceMgr().replayAlterCatalogProps(log);
break;
}
+ case OperationType.OP_REFRESH_CATALOG: {
+ CatalogLog log = (CatalogLog) journal.getData();
+ env.getDataSourceMgr().replayRefreshCatalog(log);
+ break;
+ }
case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS: {
final TableAddOrDropColumnsInfo info =
(TableAddOrDropColumnsInfo) journal.getData();
env.getSchemaChangeHandler().replayModifyTableAddOrDropColumns(info);
@@ -875,7 +886,7 @@ public class EditLog {
}
}
} catch (MetaNotFoundException e) {
- /**
+ /*
* In the following cases, doris may record metadata modification
information
* for a table that no longer exists.
* 1. Thread 1: get TableA object
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 411bf056b4..f87b9c51a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -20,6 +20,9 @@ package org.apache.doris.persist;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+/**
+ * Operation name and code mapping.
+ **/
public class OperationType {
// OP_LOCAL_EOF is only for local edit log, to indicate the end of a edit
log run.
public static final short OP_LOCAL_EOF = -1;
@@ -227,15 +230,18 @@ public class OperationType {
// policy 310-320
public static final short OP_CREATE_POLICY = 310;
public static final short OP_DROP_POLICY = 311;
+ public static final short OP_ALTER_STORAGE_POLICY = 312;
- // datasource 312-315
- public static final short OP_CREATE_DS = 312;
- public static final short OP_DROP_DS = 313;
- public static final short OP_ALTER_DS_NAME = 314;
- public static final short OP_ALTER_DS_PROPS = 315;
- public static final short OP_ALTER_STORAGE_POLICY = 316;
+ // catalog 320-330
+ public static final short OP_CREATE_CATALOG = 320;
+ public static final short OP_DROP_CATALOG = 321;
+ public static final short OP_ALTER_CATALOG_NAME = 322;
+ public static final short OP_ALTER_CATALOG_PROPS = 323;
+ public static final short OP_REFRESH_CATALOG = 324;
- // get opcode name by op codeStri
+ /**
+ * Get opcode name by op code.
+ **/
public static String getOpName(short opCode) {
try {
Field[] fields = OperationType.class.getDeclaredFields();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 1f42930ecd..7f6d00846a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -95,6 +95,7 @@ import org.apache.doris.analysis.PauseSyncJobStmt;
import org.apache.doris.analysis.RecoverDbStmt;
import org.apache.doris.analysis.RecoverPartitionStmt;
import org.apache.doris.analysis.RecoverTableStmt;
+import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.RefreshDbStmt;
import org.apache.doris.analysis.RefreshMaterializedViewStmt;
import org.apache.doris.analysis.RefreshTableStmt;
@@ -116,7 +117,13 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.sync.SyncJobManager;
+/**
+ * Use for execute ddl.
+ **/
public class DdlExecutor {
+ /**
+ * Execute ddl.
+ **/
public static void execute(Env env, DdlStmt ddlStmt) throws Exception {
if (ddlStmt instanceof CreateClusterStmt) {
CreateClusterStmt stmt = (CreateClusterStmt) ddlStmt;
@@ -153,8 +160,6 @@ public class DdlExecutor {
env.createMaterializedView((CreateMaterializedViewStmt) ddlStmt);
} else if (ddlStmt instanceof CreateMultiTableMaterializedViewStmt) {
env.createMultiTableMaterializedView((CreateMultiTableMaterializedViewStmt)
ddlStmt);
- } else if (ddlStmt instanceof DropMaterializedViewStmt) {
- env.dropMaterializedView((DropMaterializedViewStmt) ddlStmt);
} else if (ddlStmt instanceof AlterTableStmt) {
env.alterTable((AlterTableStmt) ddlStmt);
} else if (ddlStmt instanceof AlterTableStatsStmt) {
@@ -322,13 +327,15 @@ public class DdlExecutor {
} else if (ddlStmt instanceof AlterCatalogPropertyStmt) {
env.getDataSourceMgr().alterCatalogProps((AlterCatalogPropertyStmt) ddlStmt);
} else if (ddlStmt instanceof CleanLabelStmt) {
- env.getCurrentEnv().getLoadManager().cleanLabel((CleanLabelStmt)
ddlStmt);
+ env.getLoadManager().cleanLabel((CleanLabelStmt) ddlStmt);
} else if (ddlStmt instanceof AlterMaterializedViewStmt) {
env.alterMaterializedView((AlterMaterializedViewStmt) ddlStmt);
} else if (ddlStmt instanceof DropMaterializedViewStmt) {
env.dropMaterializedView((DropMaterializedViewStmt) ddlStmt);
} else if (ddlStmt instanceof RefreshMaterializedViewStmt) {
env.refreshMaterializedView((RefreshMaterializedViewStmt) ddlStmt);
+ } else if (ddlStmt instanceof RefreshCatalogStmt) {
+ env.getDataSourceMgr().refreshCatalog((RefreshCatalogStmt)
ddlStmt);
} else {
throw new DdlException("Unknown statement.");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]