This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch remove-sync-entry
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b71d64bd353650c4237a497db82d7d770ffacfdc
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Jun 17 01:01:39 2023 +0800

    antlr g4: remove sync entry & refactor pipe related
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   | 139 +++++++++------------
 .../plan/execution/config/ConfigTaskVisitor.java   |   6 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  10 +-
 .../config/executor/IConfigTaskExecutor.java       |   4 +-
 .../execution/config/sys/pipe/ShowPipeTask.java    |  10 +-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       | 115 +++++------------
 .../db/mpp/plan/statement/StatementVisitor.java    |   6 +-
 ...wPipeStatement.java => ShowPipesStatement.java} |   4 +-
 8 files changed, 110 insertions(+), 184 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 3446a1039a7..b73bb9bbd64 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -32,7 +32,7 @@ singleStatement
     ;
 
 statement
-    : ddlStatement | dmlStatement | dclStatement | utilityStatement | 
syncStatement
+    : ddlStatement | dmlStatement | dclStatement | utilityStatement
     ;
 
 ddlStatement
@@ -52,6 +52,8 @@ ddlStatement
     | createFunction | dropFunction | showFunctions
     // Trigger
     | createTrigger | dropTrigger | showTriggers | startTrigger | stopTrigger
+    // Pipe Task
+    | createPipe | dropPipe | startPipe | stopPipe | showPipes
     // Pipe Plugin
     | createPipePlugin | dropPipePlugin | showPipePlugins
     // CQ
@@ -84,11 +86,6 @@ utilityStatement
     | loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile
     ;
 
-syncStatement
-    : createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
-    | createPipe | showPipe | stopPipe | startPipe | dropPipe
-    ;
-
 /**
  * 2. Data Definition Language (DDL)
  */
@@ -510,25 +507,77 @@ getSeriesSlotList
     : SHOW (DATA|SCHEMA) SERIESSLOTID WHERE DATABASE operator_eq 
database=prefixPath
     ;
 
-
-
 // ---- Migrate Region
 migrateRegion
     : MIGRATE REGION regionId=INTEGER_LITERAL FROM fromId=INTEGER_LITERAL TO 
toId=INTEGER_LITERAL
     ;
 
+// Pipe Task 
=========================================================================================
+createPipe
+    : CREATE PIPE pipeName=identifier
+        collectorAttributesClause?
+        processorAttributesClause?
+        connectorAttributesClause
+    ;
+
+collectorAttributesClause
+    : WITH COLLECTOR
+        LR_BRACKET
+        (collectorAttributeClause COMMA)* collectorAttributeClause?
+        RR_BRACKET
+    ;
+
+collectorAttributeClause
+    : collectorKey=STRING_LITERAL OPERATOR_SEQ collectorValue=STRING_LITERAL
+    ;
+
+processorAttributesClause
+    : WITH PROCESSOR
+        LR_BRACKET
+        (processorAttributeClause COMMA)* processorAttributeClause?
+        RR_BRACKET
+    ;
+
+processorAttributeClause
+    : processorKey=STRING_LITERAL OPERATOR_SEQ processorValue=STRING_LITERAL
+    ;
+
+connectorAttributesClause
+    : WITH CONNECTOR
+        LR_BRACKET
+        (connectorAttributeClause COMMA)* connectorAttributeClause?
+        RR_BRACKET
+    ;
+
+connectorAttributeClause
+    : connectorKey=STRING_LITERAL OPERATOR_SEQ connectorValue=STRING_LITERAL
+    ;
+
+dropPipe
+    : DROP PIPE pipeName=identifier
+    ;
+
+startPipe
+    : START PIPE pipeName=identifier
+    ;
+
+stopPipe
+    : STOP PIPE pipeName=identifier
+    ;
+
+showPipes
+    : SHOW ((PIPE pipeName=identifier) | PIPES (WHERE CONNECTOR USED BY 
pipeName=identifier)?)
+    ;
+
 // Pipe Plugin 
=========================================================================================
-// Create Pipe Plugin
 createPipePlugin
     : CREATE PIPEPLUGIN pluginName=identifier AS className=STRING_LITERAL 
uriClause
     ;
 
-// Drop Pipe Plugin
 dropPipePlugin
     : DROP PIPEPLUGIN pluginName=identifier
     ;
 
-// Show Pipe Plugins
 showPipePlugins
     : SHOW PIPEPLUGINS
     ;
@@ -988,72 +1037,6 @@ unloadFile
     : UNLOAD srcFileName=STRING_LITERAL dstFileDir=STRING_LITERAL
     ;
 
-/**
- * 6. syncStatement
- */
-
-// pipesink statement
-createPipeSink
-    : CREATE PIPESINK pipeSinkName=identifier AS pipeSinkType=identifier 
(LR_BRACKET syncAttributeClauses RR_BRACKET)?
-    ;
-
-showPipeSinkType
-    : SHOW PIPESINKTYPE
-    ;
-
-showPipeSink
-    : SHOW ((PIPESINK (pipeSinkName=identifier)?) | PIPESINKS)
-    ;
-
-dropPipeSink
-    : DROP PIPESINK pipeSinkName=identifier
-    ;
-
-// pipe statement
-createPipe
-    : CREATE PIPE pipeName=identifier collectorAttributesClause? 
processorAttributesClause? connectorAttributesClause
-    ;
-
-showPipe
-    : SHOW ((PIPE pipeName=identifier) | PIPES (WHERE CONNECTOR USED BY 
pipeName=identifier)?)
-    ;
-
-collectorAttributesClause
-    : WITH COLLECTOR LR_BRACKET (collectorAttributeClause COMMA)* 
collectorAttributeClause? RR_BRACKET
-    ;
-
-collectorAttributeClause
-    : collectorKey=STRING_LITERAL OPERATOR_SEQ collectorValue=STRING_LITERAL
-    ;
-
-processorAttributesClause
-    : WITH PROCESSOR LR_BRACKET (processorAttributeClause COMMA)* 
processorAttributeClause? RR_BRACKET
-    ;
-
-processorAttributeClause
-    : processorKey=STRING_LITERAL OPERATOR_SEQ processorValue=STRING_LITERAL
-    ;
-
-connectorAttributesClause
-    : WITH CONNECTOR LR_BRACKET (connectorAttributeClause COMMA)* 
connectorAttributeClause? RR_BRACKET
-    ;
-
-connectorAttributeClause
-    : connectorKey=STRING_LITERAL OPERATOR_SEQ connectorValue=STRING_LITERAL
-    ;
-
-stopPipe
-    : STOP PIPE pipeName=identifier
-    ;
-
-startPipe
-    : START PIPE pipeName=identifier
-    ;
-
-dropPipe
-    : DROP PIPE pipeName=identifier
-    ;
-
 // attribute clauses
 syncAttributeClauses
     : attributePair (COMMA? attributePair)*
@@ -1061,7 +1044,7 @@ syncAttributeClauses
 
 
 /**
- * 7. Common Clauses
+ * 6. Common Clauses
  */
 
 // IoTDB Objects
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 6b9aa677375..a8260d0d06a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -143,7 +143,7 @@ import 
org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
@@ -395,8 +395,8 @@ public class ConfigTaskVisitor
   }
 
   @Override
-  public IConfigTask visitShowPipe(ShowPipeStatement showPipeStatement, 
TaskContext context) {
-    return new ShowPipeTask(showPipeStatement);
+  public IConfigTask visitShowPipe(ShowPipesStatement showPipesStatement, 
TaskContext context) {
+    return new ShowPipeTask(showPipesStatement);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 1136c31c55e..5828bb60a29 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -186,7 +186,7 @@ import 
org.apache.iotdb.db.mpp.plan.statement.metadata.view.RenameLogicalViewSta
 import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
@@ -1671,15 +1671,15 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement 
showPipeStatement) {
+  public SettableFuture<ConfigTaskResult> showPipe(ShowPipesStatement 
showPipesStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     try (ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
       TShowPipeReq tShowPipeReq = new TShowPipeReq();
-      if (!StringUtils.isEmpty(showPipeStatement.getPipeName())) {
-        tShowPipeReq.setPipeName(showPipeStatement.getPipeName());
+      if (!StringUtils.isEmpty(showPipesStatement.getPipeName())) {
+        tShowPipeReq.setPipeName(showPipesStatement.getPipeName());
       }
-      if (showPipeStatement.getWhereClause()) {
+      if (showPipesStatement.getWhereClause()) {
         tShowPipeReq.setWhereClause(true);
       }
       List<TShowPipeInfo> tShowPipeInfoList =
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index e88e57962fa..18e8e6fe861 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -59,7 +59,7 @@ import 
org.apache.iotdb.db.mpp.plan.statement.metadata.view.RenameLogicalViewSta
 import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
@@ -169,7 +169,7 @@ public interface IConfigTaskExecutor {
 
   SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement 
stopPipeStatement);
 
-  SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement 
showPipeStatement);
+  SettableFuture<ConfigTaskResult> showPipe(ShowPipesStatement 
showPipesStatement);
 
   SettableFuture<ConfigTaskResult> deleteTimeSeries(
       String queryId, DeleteTimeSeriesStatement deleteTimeSeriesStatement);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java
index 7961f05f8a5..0058ff1a36d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -27,7 +27,7 @@ import 
org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
-import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipesStatement;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -42,16 +42,16 @@ import java.util.stream.Collectors;
 
 public class ShowPipeTask implements IConfigTask {
 
-  private final ShowPipeStatement showPipeStatement;
+  private final ShowPipesStatement showPipesStatement;
 
-  public ShowPipeTask(ShowPipeStatement showPipeStatement) {
-    this.showPipeStatement = showPipeStatement;
+  public ShowPipeTask(ShowPipesStatement showPipesStatement) {
+    this.showPipesStatement = showPipesStatement;
   }
 
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
-    return configTaskExecutor.showPipe(showPipeStatement);
+    return configTaskExecutor.showPipe(showPipesStatement);
   }
 
   public static void buildTSBlock(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 34cd5711ea8..79d6fbe187b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -180,17 +180,13 @@ import 
org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetThrottleQuotaStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowThrottleQuotaStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
-import 
org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkTypeStatement;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.ConstantContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountDatabasesContext;
@@ -3512,20 +3508,10 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
 
   // PIPE
 
-  @Override
-  public Statement visitShowPipe(IoTDBSqlParser.ShowPipeContext ctx) {
-    ShowPipeStatement showPipeStatement = new ShowPipeStatement();
-    if (ctx.pipeName != null) {
-      showPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
-    }
-    showPipeStatement.setWhereClause(ctx.CONNECTOR() != null);
-    return showPipeStatement;
-  }
-
   @Override
   public Statement visitCreatePipe(IoTDBSqlParser.CreatePipeContext ctx) {
-
-    CreatePipeStatement createPipeStatement = new 
CreatePipeStatement(StatementType.CREATE_PIPE);
+    final CreatePipeStatement createPipeStatement =
+        new CreatePipeStatement(StatementType.CREATE_PIPE);
 
     if (ctx.pipeName != null) {
       createPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
@@ -3552,7 +3538,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
 
   private Map<String, String> parseCollectorAttributesClause(
       IoTDBSqlParser.CollectorAttributesClauseContext ctx) {
-    Map<String, String> collectorMap = new HashMap<>();
+    final Map<String, String> collectorMap = new HashMap<>();
     for (IoTDBSqlParser.CollectorAttributeClauseContext singleCtx :
         ctx.collectorAttributeClause()) {
       collectorMap.put(
@@ -3564,7 +3550,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
 
   private Map<String, String> parseProcessorAttributesClause(
       IoTDBSqlParser.ProcessorAttributesClauseContext ctx) {
-    Map<String, String> processorMap = new HashMap<>();
+    final Map<String, String> processorMap = new HashMap<>();
     for (IoTDBSqlParser.ProcessorAttributeClauseContext singleCtx :
         ctx.processorAttributeClause()) {
       processorMap.put(
@@ -3576,7 +3562,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
 
   private Map<String, String> parseConnectorAttributesClause(
       IoTDBSqlParser.ConnectorAttributesClauseContext ctx) {
-    Map<String, String> connectorMap = new HashMap<>();
+    final Map<String, String> connectorMap = new HashMap<>();
     for (IoTDBSqlParser.ConnectorAttributeClauseContext singleCtx :
         ctx.connectorAttributeClause()) {
       connectorMap.put(
@@ -3587,97 +3573,54 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
   }
 
   @Override
-  public Statement visitStartPipe(IoTDBSqlParser.StartPipeContext ctx) {
-    StartPipeStatement startPipeStatement = new 
StartPipeStatement(StatementType.START_PIPE);
+  public Statement visitDropPipe(IoTDBSqlParser.DropPipeContext ctx) {
+    final DropPipeStatement dropPipeStatement = new 
DropPipeStatement(StatementType.DROP_PIPE);
 
     if (ctx.pipeName != null) {
-      startPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
+      dropPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
     } else {
-      throw new SemanticException("Not support for this sql in STARTPIPE, 
please enter pipename.");
+      throw new SemanticException("Not support for this sql in DROP PIPE, 
please enter pipename.");
     }
-    return startPipeStatement;
-  }
-
-  @Override
-  public Statement visitStopPipe(IoTDBSqlParser.StopPipeContext ctx) {
-    StopPipeStatement stopPipeStatement = new 
StopPipeStatement(StatementType.STOP_PIPE);
 
-    if (ctx.pipeName != null) {
-      stopPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
-    } else {
-      throw new SemanticException("Not support for this sql in STOPPIPE, 
please enter pipename.");
-    }
-    return stopPipeStatement;
+    return dropPipeStatement;
   }
 
   @Override
-  public Statement visitDropPipe(IoTDBSqlParser.DropPipeContext ctx) {
-
-    DropPipeStatement dropPipeStatement = new 
DropPipeStatement(StatementType.DROP_PIPE);
+  public Statement visitStartPipe(IoTDBSqlParser.StartPipeContext ctx) {
+    final StartPipeStatement startPipeStatement = new 
StartPipeStatement(StatementType.START_PIPE);
 
     if (ctx.pipeName != null) {
-      dropPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
+      startPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
     } else {
-      throw new SemanticException("Not support for this sql in DROPPIPE, 
please enter pipename.");
+      throw new SemanticException("Not support for this sql in START PIPE, 
please enter pipename.");
     }
-    return dropPipeStatement;
-  }
-
-  // pipeSink
-
-  @Override
-  public Statement visitShowPipeSink(IoTDBSqlParser.ShowPipeSinkContext ctx) {
-    ShowPipeSinkStatement showPipeSinkStatement = new ShowPipeSinkStatement();
-    if (ctx.pipeSinkName != null) {
-      
showPipeSinkStatement.setPipeSinkName(parseIdentifier(ctx.pipeSinkName.getText()));
-    }
-    return showPipeSinkStatement;
-  }
 
-  @Override
-  public Statement 
visitShowPipeSinkType(IoTDBSqlParser.ShowPipeSinkTypeContext ctx) {
-    return new ShowPipeSinkTypeStatement();
+    return startPipeStatement;
   }
 
   @Override
-  public Statement visitCreatePipeSink(IoTDBSqlParser.CreatePipeSinkContext 
ctx) {
-
-    CreatePipeSinkStatement createPipeSinkStatement = new 
CreatePipeSinkStatement();
+  public Statement visitStopPipe(IoTDBSqlParser.StopPipeContext ctx) {
+    final StopPipeStatement stopPipeStatement = new 
StopPipeStatement(StatementType.STOP_PIPE);
 
-    if (ctx.pipeSinkName != null) {
-      
createPipeSinkStatement.setPipeSinkName(parseIdentifier(ctx.pipeSinkName.getText()));
-    } else {
-      throw new SemanticException(
-          "Not support for this sql in CREATEPIPESINK, please enter 
pipesinkname.");
-    }
-    if (ctx.pipeSinkType != null) {
-      
createPipeSinkStatement.setPipeSinkType(parseIdentifier(ctx.pipeSinkType.getText()));
-    } else {
-      throw new SemanticException(
-          "Not support for this sql in CREATEPIPESINK, please enter 
pipesinktype.");
-    }
-    if (ctx.syncAttributeClauses() != null) {
-      
createPipeSinkStatement.setAttributes(parseSyncAttributeClauses(ctx.syncAttributeClauses()));
+    if (ctx.pipeName != null) {
+      stopPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
     } else {
-      createPipeSinkStatement.setAttributes(new HashMap<>());
+      throw new SemanticException("Not support for this sql in STOP PIPE, 
please enter pipename.");
     }
 
-    return createPipeSinkStatement;
+    return stopPipeStatement;
   }
 
   @Override
-  public Statement visitDropPipeSink(IoTDBSqlParser.DropPipeSinkContext ctx) {
-
-    DropPipeSinkStatement dropPipeSinkStatement =
-        new DropPipeSinkStatement(StatementType.DROP_PIPESINK);
+  public Statement visitShowPipes(IoTDBSqlParser.ShowPipesContext ctx) {
+    final ShowPipesStatement showPipesStatement = new ShowPipesStatement();
 
-    if (ctx.pipeSinkName != null) {
-      
dropPipeSinkStatement.setPipeSinkName(parseIdentifier(ctx.pipeSinkName.getText()));
-    } else {
-      throw new SemanticException(
-          "Not support for this sql in DROPPIPESINK, please enter 
pipesinkname.");
+    if (ctx.pipeName != null) {
+      showPipesStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
     }
-    return dropPipeSinkStatement;
+    showPipesStatement.setWhereClause(ctx.CONNECTOR() != null);
+
+    return showPipesStatement;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 5345b4c8ba8..8b58b344af1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -107,7 +107,7 @@ import 
org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
@@ -478,8 +478,8 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(showPipeSinkTypeStatement, context);
   }
 
-  public R visitShowPipe(ShowPipeStatement showPipeStatement, C context) {
-    return visitStatement(showPipeStatement, context);
+  public R visitShowPipe(ShowPipesStatement showPipesStatement, C context) {
+    return visitStatement(showPipesStatement, context);
   }
 
   public R visitCreatePipe(CreatePipeStatement createPipeStatement, C context) 
{
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/ShowPipeStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/ShowPipesStatement.java
similarity index 94%
rename from 
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/ShowPipeStatement.java
rename to 
server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/ShowPipesStatement.java
index 6faef02d024..59401f23d7d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/ShowPipeStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/pipe/ShowPipesStatement.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStatement;
 
-public class ShowPipeStatement extends ShowStatement implements 
IConfigStatement {
-  public ShowPipeStatement() {
+public class ShowPipesStatement extends ShowStatement implements 
IConfigStatement {
+  public ShowPipesStatement() {
     super();
     statementType = StatementType.SHOW_PIPES;
   }

Reply via email to