This is an automated email from the ASF dual-hosted git repository.
starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a05582f9901 [Enhancement] (nereids)implement showSyncJobCommand in
nereids (#44817)
a05582f9901 is described below
commit a05582f990117ec6b19925ce72dd81c8ff2164f4
Author: Sridhar R Manikarnike <[email protected]>
AuthorDate: Thu Dec 19 09:06:49 2024 +0530
[Enhancement] (nereids)implement showSyncJobCommand in nereids (#44817)
issue Number: close #42775
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 2 +-
.../doris/nereids/parser/LogicalPlanBuilder.java | 12 +++
.../apache/doris/nereids/trees/plans/PlanType.java | 1 +
.../trees/plans/commands/ShowSyncJobCommand.java | 115 +++++++++++++++++++++
.../trees/plans/visitor/CommandVisitor.java | 5 +
.../show/test_show_sync_job_command.groovy | 56 ++++++++++
6 files changed, 190 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index ad884eefee1..6c37a2b276b 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -239,6 +239,7 @@ supportedShowStatement
| SHOW DELETE ((FROM | IN) database=multipartIdentifier)?
#showDelete
| SHOW ALL? GRANTS
#showGrants
| SHOW GRANTS FOR userIdentify
#showGrantsForUser
+ | SHOW SYNC JOB ((FROM | IN) database=multipartIdentifier)?
#showSyncJob
| SHOW LOAD PROFILE loadIdPath=STRING_LITERAL
#showLoadProfile
| SHOW CREATE REPOSITORY FOR identifier
#showCreateRepository
| SHOW VIEW
@@ -366,7 +367,6 @@ unsupportedShowStatement
((FROM | IN) database=multipartIdentifier)?
#showIndex
| SHOW TRANSACTION ((FROM | IN) database=multipartIdentifier)? wildWhere?
#showTransaction
| SHOW CACHE HOTSPOT tablePath=STRING_LITERAL
#showCacheHotSpot
- | SHOW SYNC JOB ((FROM | IN) database=multipartIdentifier)?
#showSyncJob
| SHOW CATALOG RECYCLE BIN wildWhere?
#showCatalogRecycleBin
| SHOW QUERY STATS ((FOR database=identifier)
| (FROM tableName=multipartIdentifier (ALL VERBOSE?)?))?
#showQueryStats
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 4c5d43cb4db..a9ce9215d4d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -279,6 +279,7 @@ import
org.apache.doris.nereids.DorisParser.ShowRolesContext;
import org.apache.doris.nereids.DorisParser.ShowSmallFilesContext;
import org.apache.doris.nereids.DorisParser.ShowSqlBlockRuleContext;
import org.apache.doris.nereids.DorisParser.ShowStorageEnginesContext;
+import org.apache.doris.nereids.DorisParser.ShowSyncJobContext;
import org.apache.doris.nereids.DorisParser.ShowTableCreationContext;
import org.apache.doris.nereids.DorisParser.ShowTableIdContext;
import org.apache.doris.nereids.DorisParser.ShowTabletStorageFormatContext;
@@ -593,6 +594,7 @@ import
org.apache.doris.nereids.trees.plans.commands.ShowRolesCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowSmallFilesCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowSqlBlockRuleCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowStorageEnginesCommand;
+import org.apache.doris.nereids.trees.plans.commands.ShowSyncJobCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowTableCreationCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowTableIdCommand;
import
org.apache.doris.nereids.trees.plans.commands.ShowTabletStorageFormatCommand;
@@ -4979,6 +4981,16 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
return new CreateWorkloadGroupCommand(workloadGroupName, ifNotExists,
properties);
}
+ @Override
+ public LogicalPlan visitShowSyncJob(ShowSyncJobContext ctx) {
+ String databaseName = null;
+ if (ctx.multipartIdentifier() != null) {
+ List<String> databaseParts =
visitMultipartIdentifier(ctx.multipartIdentifier());
+ databaseName = databaseParts.get(0);
+ }
+ return new ShowSyncJobCommand(databaseName);
+ }
+
@Override
public LogicalPlan visitDropFile(DropFileContext ctx) {
String dbName = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 0cec8c48caf..bfa0163e7d2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -237,6 +237,7 @@ public enum PlanType {
SHOW_ROLE_COMMAND,
SHOW_SMALL_FILES_COMMAND,
SHOW_STORAGE_ENGINES_COMMAND,
+ SHOW_SYNC_JOB_COMMAND,
SHOW_TABLE_ID_COMMAND,
SHOW_TRASH_COMMAND,
SHOW_TABLET_STORAGE_FORMAT_COMMAND,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowSyncJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowSyncJobCommand.java
new file mode 100644
index 00000000000..a673164707f
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowSyncJobCommand.java
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.RedirectStatus;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.util.ListComparator;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSet;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.qe.StmtExecutor;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represents the command for SHOW SYNC JOB.
+ */
+public class ShowSyncJobCommand extends ShowCommand {
+ public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
+
.add("JobId").add("JobName").add("Type").add("State").add("Channel").add("Status")
+
.add("JobConfig").add("CreateTime").add("LastStartTime").add("LastStopTime").add("FinishTime").add("Msg")
+ .build();
+
+ private String databaseName;
+
+ public ShowSyncJobCommand(String databaseName) {
+ super(PlanType.SHOW_SYNC_JOB_COMMAND);
+ this.databaseName = databaseName;
+ }
+
+ private void validate(ConnectContext ctx) throws AnalysisException {
+ if (Strings.isNullOrEmpty(databaseName)) {
+ databaseName = ctx.getDatabase();
+ if (Strings.isNullOrEmpty(databaseName)) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+ }
+ }
+ if (!Env.getCurrentEnv().getAccessManager()
+ .checkDbPriv(ConnectContext.get(),
InternalCatalog.INTERNAL_CATALOG_NAME,
+ databaseName, PrivPredicate.SHOW)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED_ERROR,
+ PrivPredicate.SHOW.getPrivs().toString(), databaseName);
+ }
+ }
+
+ @Override
+ public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor)
throws Exception {
+ validate(ctx);
+ Env env = Env.getCurrentEnv();
+ DatabaseIf db =
Env.getCurrentInternalCatalog().getDbOrAnalysisException(databaseName);
+
+ List<List<Comparable>> syncInfos =
env.getSyncJobManager().getSyncJobsInfoByDbId(db.getId());
+ Collections.sort(syncInfos, new ListComparator<List<Comparable>>(0));
+
+ List<List<String>> rows = Lists.newArrayList();
+ for (List<Comparable> syncInfo : syncInfos) {
+ List<String> row = new ArrayList<String>(syncInfo.size());
+
+ for (Comparable element : syncInfo) {
+ row.add(element.toString());
+ }
+ rows.add(row);
+ }
+ return new ShowResultSet(this.getMetaData(), rows);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitShowSyncJobCommand(this, context);
+ }
+
+ public ShowResultSetMetaData getMetaData() {
+ ShowResultSetMetaData.Builder builder =
ShowResultSetMetaData.builder();
+ for (String title : TITLE_NAMES) {
+ builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
+ }
+ return builder.build();
+ }
+
+ @Override
+ public RedirectStatus toRedirectStatus() {
+ return RedirectStatus.FORWARD_NO_SYNC;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index f988055f0f6..6efa58c1b8e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -123,6 +123,7 @@ import
org.apache.doris.nereids.trees.plans.commands.ShowRolesCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowSmallFilesCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowSqlBlockRuleCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowStorageEnginesCommand;
+import org.apache.doris.nereids.trees.plans.commands.ShowSyncJobCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowTableCreationCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowTableIdCommand;
import
org.apache.doris.nereids.trees.plans.commands.ShowTabletStorageFormatCommand;
@@ -299,6 +300,10 @@ public interface CommandVisitor<R, C> {
return visitCommand(callCommand, context);
}
+ default R visitShowSyncJobCommand(ShowSyncJobCommand showSyncJobCommand, C
context) {
+ return visitCommand(showSyncJobCommand, context);
+ }
+
default R visitCreateProcedureCommand(CreateProcedureCommand
createProcedureCommand, C context) {
return visitCommand(createProcedureCommand, context);
}
diff --git
a/regression-test/suites/nereids_p0/show/test_show_sync_job_command.groovy
b/regression-test/suites/nereids_p0/show/test_show_sync_job_command.groovy
new file mode 100644
index 00000000000..32b460be490
--- /dev/null
+++ b/regression-test/suites/nereids_p0/show/test_show_sync_job_command.groovy
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_show_sync_job_command", "query,sync_job") {
+ try {
+ sql """CREATE DATABASE IF NOT EXISTS test_db_sync_job;"""
+
+ sql """CREATE TABLE IF NOT EXISTS test_db_sync_job.test_tbl1_sync_job (
+ id INT,
+ name STRING
+ ) UNIQUE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES("replication_num" = "1");"""
+
+ // Create the sync job
+ sql """CREATE SYNC test_db_sync_job.job1
+ (
+ FROM mysql_db1.tbl1 INTO test_tbl1_sync_job
+ )
+ FROM BINLOG
+ (
+ "type" = "canal",
+ "canal.server.ip" = "127.0.0.1",
+ "canal.server.port" = "11111",
+ "canal.destination" = "example",
+ "canal.username" = "",
+ "canal.password" = ""
+ );"""
+
+ checkNereidsExecute("SHOW SYNC JOB FROM test_db_sync_job")
+
+ } catch (Exception e) {
+ // Log any exceptions that occur during testing
+ log.error("Failed to execute CREATE SYNC JOB command", e)
+ throw e
+ } finally {
+ // Cleanup
+ try_sql("STOP SYNC JOB IF EXISTS job1;")
+ try_sql("DROP TABLE IF EXISTS test_db_sync_job.test_tbl1_sync_job;")
+ try_sql("DROP DATABASE IF EXISTS test_db_sync_job;")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]