This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a77f7bb3db [IOTDB-4137][IOTDB-4799] Update sync-tool docs and print 
more detailed pipe information in show pipe (#7797)
a77f7bb3db is described below

commit a77f7bb3db8c816bc63e254506635aedaa6a9990
Author: Chen YZ <[email protected]>
AuthorDate: Sun Oct 30 12:33:52 2022 +0800

    [IOTDB-4137][IOTDB-4799] Update sync-tool docs and print more detailed pipe 
information in show pipe (#7797)
---
 docs/UserGuide/Maintenance-Tools/Sync-Tool.md      | 296 +++++++-----------
 docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md   | 332 ++++++++-------------
 .../org/apache/iotdb/db/it/sync/IoTDBPipeIT.java   |  50 ++--
 .../apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java   |   2 +-
 .../iotdb/commons/sync/pipe/TsFilePipeInfo.java    |   8 +-
 .../db/mpp/common/header/ColumnHeaderConstant.java |   2 +
 .../config/executor/ClusterConfigTaskExecutor.java |   7 +-
 .../execution/config/sys/sync/ShowPipeTask.java    |   3 +-
 .../memory/StatementMemorySourceVisitor.java       |   9 +-
 .../java/org/apache/iotdb/db/sync/SyncService.java |  25 +-
 .../src/main/thrift/confignode.thrift              |   3 +-
 11 files changed, 301 insertions(+), 436 deletions(-)

diff --git a/docs/UserGuide/Maintenance-Tools/Sync-Tool.md 
b/docs/UserGuide/Maintenance-Tools/Sync-Tool.md
index 937fa9425c..2fc15721b8 100644
--- a/docs/UserGuide/Maintenance-Tools/Sync-Tool.md
+++ b/docs/UserGuide/Maintenance-Tools/Sync-Tool.md
@@ -25,7 +25,7 @@
 
 The Sync Tool is an IoTDB suite tool that continuously uploads the timeseries 
data from the edge (sender) to the cloud(receiver).
 
-On the sender side of the sync, the sync module is embedded in the IoTDB 
engine. You should start an IoTDB before using the Sync tool.
+On the sender side of the sync-tool, the sync module is embedded in the IoTDB 
engine. The receiver side of the sync-tool supports IoTDB (standalone/cluster).
 
 You can use SQL commands to start or close a synchronization task at the 
sender, and you can check the status of the synchronization task at any time. 
At the receiving end, you can set the IP white list to specify the access IP 
address range of sender.
 
@@ -44,13 +44,14 @@ Two machines A and B, which are installed with iotdb, we 
want to continuously sy
 
 ## 3.Precautions for Use
 
-- The Sync Tool only supports for many-to-one, that is, one sender should 
connect to exactly one receiver. One receiver can receive data from more 
senders.
-- The sender can only have one pipe in non drop status. If you want to create 
a new pipe, please drop the current pipe.
+- The sender side of the sync-tool currently supports IoTDB version 1.0 **only 
if data_replication_factor is set to 1**. The receiver side supports any IoTDB 
version 1.0 configuration
+- A normal Pipe has two states: RUNNING indicates that it is synchronizing 
data to the receiver, and STOP indicates that synchronization to the receiver 
is suspended.
 - When one or more senders send data to a receiver, there should be no 
intersection between the respective device path sets of these senders and 
receivers, otherwise unexpected errors may occur.
   - e.g. When sender A includes path `root.sg.d.s`, sender B also includes the 
path `root.sg.d.s`, sender A deletes storage group `root.sg` will also delete 
all data of B stored in the path `root.sg.d.s` at receiver.
-- Synchronization between the two machines is not supported at present.
-- The Sync Tool only synchronizes insertions, deletions, metadata creations 
and deletions, do not support TTL settings, trigger and other operations.
+- The two "ends" do not support synchronization with each other.
+- The Sync Tool only synchronizes insertions, delete data, delete timeseires. 
If no storage group is created on the receiver, a storage group of the same 
level as the sender will be automatically created. Do not support TTL settings, 
trigger and other operations.
   - If TTL is set on the sender side, all unexpired data in the IoTDB and all 
future data writes and deletions will be synchronized to the receiver side when 
Pipe is started.
+- When operating a synchronization task, ensure that all DataNode nodes in 
`SHOW DATANODES` that are in the Running state are connected, otherwise the 
execution will fail.
 
 ## 4.Quick Start
 
@@ -58,57 +59,45 @@ Execute the following SQL statements at the sender and 
receiver to quickly start
 
 ### 4.1 Receiver
 
-- Start PipeServer.
+- Start sender IoTDB and receiver IoTDB.
 
-```
-IoTDB> START PIPESERVER
-```
-
-- Stop PipeServer(should execute after dropping all pipes which connect to 
this receiver).
-
-```
-IOTDB> STOP PIPESERVER
-```
-
-### 4.2 Sender
-
-- Create a pipesink with IoTDB type.
+- Create a PipeSink with IoTDB type.
 
 ```
-IoTDB> CREATE PIPESINK central_iotdb AS IoTDB (IP='There is your goal IP')
+IoTDB> CREATE PIPESINK central_iotdb AS IoTDB (ip='There is your goal IP', 
port='There is your goal port')
 ```
 
-- Establish a pipe(before creation, ensure that PipeServer is running on 
receiver).
+- Establish a Pipe (before creation, ensure that receiver IoTDB has been 
started).
 
 ```
 IoTDB> CREATE PIPE my_pipe TO central_iotDB
 ```
 
-- Start this pipe.
+- Start this Pipe.
 
 ```
 IoTDB> START PIPE my_pipe
 ```
 
-- Show pipe's status.
+- Show Pipe's status.
 
 ```
 IoTDB> SHOW PIPES
 ```
 
-- Stop this pipe.
+- Stop this Pipe.
 
 ```
 IoTDB> STOP PIPE my_pipe
 ```
 
-- Continue this pipe.
+- Continue this Pipe.
 
 ```
 IoTDB> START PIPE my_pipe
 ```
 
-- Drop this pipe(delete all information about this pipe).
+- Drop this Pipe (delete all information about this pipe).
 
 ```
 IoTDB> DROP PIPE my_pipe
@@ -132,31 +121,15 @@ All parameters are in `$IOTDB_ HOME$/conf/iotdb-engine`, 
after all modifications
 
 | **Parameter Name** | **ip_white_list**                                       
     |
 | ------------------ | 
------------------------------------------------------------ |
-| Description        | Set the white list of IP addresses of the sending end 
of the synchronization, which is expressed in the form of network segments, and 
multiple network segments are separated by commas. When the sender synchronizes 
data to the receiver, the receiver allows synchronization only when the IP 
address of the sender is within the network segment set in the white list. If 
the whitelist is empty, the receiver does not allow any sender to synchronize 
data. By default, the re [...]
+| Description        | Set the white list of IP addresses of the sending end 
of the synchronization, which is expressed in the form of network segments, and 
multiple network segments are separated by commas. When the sender synchronizes 
data to the receiver, the receiver allows synchronization only when the IP 
address of the sender is within the network segment set in the white list. If 
the whitelist is empty, the receiver does not allow any sender to synchronize 
data. By default, the re [...]
 | Data type          | String                                                  
     |
 | Default value      | 0.0.0.0/0                                               
     |
 
-
-
-| **Parameter Name** | **sync_server_port**                                    
     |
-| ------------------ | 
------------------------------------------------------------ |
-| Description        | The port which the receiver listens, please ensure this 
port is not occupied by other applications. |
-| Data type          | Short Int : [0,65535]                                   
     |
-| Default value      | 6670                                                    
     |
-
-
-
 ## 6.SQL
 
-### 6.1 Sender
-
-- Create a pipesink with IoTDB type, where IP and port are optional parameters.
-
-```
-IoTDB> CREATE PIPESINK <PipeSinkName> AS IoTDB [(IP='127.0.0.1',port=6670);]
-```
+### SHOW PIPESINKTYPE
 
-- Show all pipesink types supported by IoTDB.
+- Show all PipeSink types supported by IoTDB.
 
 ```Plain%20Text
 IoTDB> SHOW PIPESINKTYPE
@@ -168,124 +141,131 @@ IoTDB>
 +-----+
 ```
 
-- Show all pipesinks' definition, the results set has three columns, name, 
pipesink's type and pipesink's attributes.
+### CREATE PIPESINK
+
+* Create a PipeSink with IoTDB type, where IP and port are optional parameters.
 
 ```
-IoTDB> SHOW PIPESINKS
-IoTDB> SHOW PIPESINK [PipeSinkName]
-IoTDB> 
-+-----------+-----+------------------------+
-|       name| type|              attributes|
-+-----------+-----+------------------------+
-|my_pipesink|IoTDB|ip='127.0.0.1',port=6670|
-+-----------+-----+------------------------+
+IoTDB> CREATE PIPESINK <PipeSinkName> AS IoTDB [(ip='127.0.0.1',port=6670);]
 ```
 
+### DROP PIPESINK
+
 - Drop the pipesink with PipeSinkName parameter.
 
 ```
 IoTDB> DROP PIPESINK <PipeSinkName>
 ```
 
-- Create a pipe.
-- At present, the SELECT statement only supports `**` (i.e. data in all 
timeseries), the FROM statement only supports `root`, and the WHERE statement 
only supports the start time of the specified time.
-- If the `SyncDelOp` parameter is true, the deletions of sender will not be 
synchronized to receiver.
+### SHOW PIPESINK
+
+- Show all PipeSinks' definition, the results set has three columns, name, 
PipeSink’s type and PipeSink‘s attributes.
 
 ```
-IoTDB> CREATE PIPE my_pipe TO my_iotdb [FROM (select ** from root WHERE 
time>='yyyy-mm-dd HH:MM:SS' )] [WITH SyncDelOp=true]
+IoTDB> SHOW PIPESINKS
+IoTDB> SHOW PIPESINK [PipeSinkName]
+IoTDB> 
++-----------+-----+------------------------+
+|       name| type|              attributes|
++-----------+-----+------------------------+
+|my_pipesink|IoTDB|ip='127.0.0.1',port=6670|
++-----------+-----+------------------------+
 ```
 
-- Show all pipes' status.
+### CREATE PIPE
 
-> This statement can be executed on both senders and receivers.
-
-- Create time, the creation time of this pipe.
-- Name, the name of this pipe.
-- Role, the current role of this IoTDB in pipe, there are two possible roles.
-  - Sender, the current IoTDB is the synchronous sender
-  - Receiver, the current IoTDB is the synchronous receiver
-- Remote, information about the opposite end of the pipe.
-  - When role is receiver, the value of this field is the sender's IP.
-  - When role is sender, the value of this field is the pipeSink name.
+- Create a pipe.
 
-- Status, this pipe's status.
-- Message, the status message of this pipe. When pipe runs normally, this 
column is usually empty. When an exception occurs, messages may appear in  
following two states.
-  - WARN, this indicates that a data loss or other error has occurred, but the 
pipe will remain running.
-  - ERROR, this indicates that the network is interrupted for a long time or 
there is a problem at the receiving end. The pipe is stopped and set to STOP 
state.
+  - At present, the SELECT statement only supports `**` (i.e. data in all 
timeseries), the FROM statement only supports `root`, and the WHERE statement 
only supports the start time of the specified time. The start time can be 
specified in the form of yyyy-mm-dd HH:MM:SS or a timestamp.
 
-```
-IoTDB> SHOW PIPES
-IoTDB>
-+-----------------------+--------+--------+-------------+---------+-------+
-|            create time|   name |    role|       remote|   status|message|
-+-----------------------+--------+--------+-------------+---------+-------+
-|2022-03-30T20:58:30.689|my_pipe1|  sender|  my_pipesink|     STOP|       |
-+-----------------------+--------+--------+-------------+---------+-------+ 
-|2022-03-31T12:55:28.129|my_pipe2|receiver|192.168.11.11|  RUNNING|       |
-+-----------------------+--------+--------+-------------+---------+-------+
-```
+  - If the `SyncDelOp` parameter is true, the deletions of sender will not be 
synchronized to receiver. Default is false.
 
-- Show the pipe status with PipeName. When the PipeName is empty,it is the 
same with `Show PIPES`.
 
 ```
-IoTDB> SHOW PIPE [PipeName]
+IoTDB> CREATE PIPE my_pipe TO my_iotdb [FROM (select ** from root WHERE 
time>='yyyy-mm-dd HH:MM:SS' )] [WITH SyncDelOp=true]
 ```
 
-- Stop the pipe with PipeName.
+### STOP PIPE
+
+- Stop the Pipe with PipeName.
 
 ```
 IoTDB> STOP PIPE <PipeName>
 ```
 
-- Continue the pipe with PipeName.
+### START PIPE
+
+- Continue the Pipe with PipeName.
 
 ```
 IoTDB> START PIPE <PipeName>
 ```
 
+### DROP PIPE
+
 - Drop the pipe with PipeName(delete all information about this pipe).
 
 ```
 IoTDB> DROP PIPE <PipeName>
 ```
 
-#### 6.2 Receiver
+### SHOW PIPE
 
-- Start the PipeServer service.
+> This statement can be executed on both senders and receivers.
+
+- Show all Pipe's status.
+
+  - `create time`: the creation time of this pipe.
+
+  - `name`: the name of this pipe.
+
+  - `role`: the current role of this IoTDB in pipe, there are two possible 
roles.
+    - Sender, the current IoTDB is the synchronous sender
+    - Receiver, the current IoTDB is the synchronous receiver
+
+  - `remote`: information about the opposite end of the Pipe.
+    - When role is sender, the value of this field is the PipeSink name.
+    - When role is receiver, the value of this field is the sender's IP.
 
-```
-IoTDB> START PIPESERVER
-```
 
-- Stop the PipeServer service.
+  - `status`: the Pipe's status.
+  - `attributes`: the attributes of Pipe
+    - When role is sender, the value of this field is the synchronization 
start time of the Pipe and whether to synchronize the delete operation.
+    - When role is receiver, the value of this field is the name of the 
storage group corresponding to the synchronization connection created on this 
DataNode.
+
+  - `message`: the status message of this pipe. When pipe runs normally, this 
column is usually empty. When an exception occurs, messages may appear in  
following two states.
+    - WARN, this indicates that a data loss or other error has occurred, but 
the pipe will remain running.
+    - ERROR, this indicates that the network is interrupted for a long time or 
there is a problem at the receiving end. The pipe is stopped and set to STOP 
state.
+
 
 ```
-IoTDB> STOP PIPESERVER
+IoTDB> SHOW PIPES
+IoTDB>
++-----------------------+--------+--------+-------------+---------+-----------------------------------+-------+
+|            create time|   name |    role|       remote|   status|            
             attributes|message|
++-----------------------+--------+--------+-------------+---------+-----------------------------------+-------+
+|2022-03-30T20:58:30.689|my_pipe1|  sender|  my_pipesink|     
STOP|syncDelOp=true,dataStartTimestamp=0|       |
++-----------------------+--------+--------+-------------+---------+-----------------------------------+-------+
 
+|2022-03-31T12:55:28.129|my_pipe2|receiver|192.168.11.11|  RUNNING|        
storageGroup='root.vehicle'|       |
++-----------------------+--------+--------+-------------+---------+-----------------------------------+-------+
 ```
 
-- Show the information of PipeServer.
-  - True means the PipeServer is running, otherwise not.
+- Show the pipe status with PipeName. When the PipeName is empty,it is the 
same with `Show PIPES`.
 
 ```
-IoTDB> SHOW PIPESERVER STATUS
-+----------+
-|    enalbe|
-+----------+
-|true/false|
-+----------+
+IoTDB> SHOW PIPE [PipeName]
 ```
 
 ## 7. Usage Examples
 
-##### Goal
+### Goal
 
 - Create a synchronize task from sender IoTDB to receiver IoTDB.
 - Sender wants to synchronize the data after 2022-3-30 00:00:00.
 - Sender does not want to synchronize the deletions.
-- Sender has an unstable network environment, needs more retries.
 - Receiver only wants to receive data from this sender(sender ip 192.168.0.1).
 
-##### **Receiver**
+### Receiver
 
 - `vi conf/iotdb-datanode.properties`  to config the parameters,set the IP 
white list to 192.168.0.1/1 to receive and only receive data from sender.
 
@@ -293,10 +273,6 @@ IoTDB> SHOW PIPESERVER STATUS
 ####################
 ### PIPE Server Configuration
 ####################
-# PIPE server port to listen
-# Datatype: int
-# pipe_server_port=6670
-
 # White IP list of Sync client.
 # Please use the form of network segment to present the range of IP, for 
example: 192.168.0.0/16
 # If there are more than one IP segment, please separate them by commas
@@ -305,43 +281,22 @@ IoTDB> SHOW PIPESERVER STATUS
 ip_white_list=192.168.0.1/1
 ```
 
-- Start PipeServer service at receiver.
-
-```
-IoTDB> START PIPESERVER
-```
+### Sender
 
-- Show PipeServer's status, a `True` result means running correctly.
-
-```
-IoTDB> SHOW PIPESERVER STATUS
-```
-
-##### Sender
-
-- Config the `max_number_of_sync_file_retry` parameter to 10.
-
-```
-####################
-### PIPE Sender Configuration
-####################
-# The maximum number of retry when syncing a file to receiver fails.
-max_number_of_sync_file_retry=10
-```
-
-- Create pipesink with IoTDB type, input ip address 192.168.0.1, port 6670.
+- Create PipeSink with IoTDB type, input ip address 192.168.0.1, port 6670.
 
 ```
 IoTDB> CREATE PIPESINK my_iotdb AS IoTDB (IP='192.168.0.2',PORT=6670)
 ```
 
-- Create pipe connect to my_iotdb, input the start time 2022-03-30 00:00:00 in 
WHERE statments, set the `SyncDelOp` to false.
+- Create Pipe connect to my_iotdb, input the start time 2022-03-30 00:00:00 in 
WHERE statments, set the `SyncDelOp` to false. The following two SQL statements 
are equivalent
 
 ```
 IoTDB> CREATE PIPE p TO my_iotdb FROM (select ** from root where 
time>='2022-03-30 00:00:00') WITH SyncDelOp=false
+IoTDB> CREATE PIPE p TO my_iotdb FROM (select ** from root where time>= 
1648569600000)
 ```
 
-- Start the pipe p
+- Start the Pipe p
 
 ```
 IoTDB> START PIPE p
@@ -353,7 +308,7 @@ IoTDB> START PIPE p
 IoTDB> SHOW PIPE p
 ```
 
-##### Result Verification
+### Result Verification
 
 Execute SQL on sender.
 
@@ -392,56 +347,27 @@ It costs 0.134s
 
 ## 8.Q&A
 
-- Execute 
-
-  ```
-  STOP PIPESERVER
-  ```
-
-  to close IoTDB PipeServer service with message.
-
-  ```
-  Msg: 328: Failed to stop pipe server because there is pipe still running.
-  ```
+- Execute `CREATE PIPESINK demo as IoTDB` get message `PIPESINK [demo] already 
exists in IoTDB.`
 
-  - Cause by: There is a running pipe connected to this receiver.
-  - Solution: Execute `STOP PIPE <PipeName>` to stop pipe, then stop 
PipeServer service.
-
-- Execute 
-
-  ```
-  CREATE PIPE mypipe
-  ```
-
-  get message.
-
-  ```
-  Msg: 411: Create transport for pipe mypipe error, because CREATE request 
connects to receiver 127.0.0.1:6670 error..
-  ```
-
-  - Cause by: The receiver is not started or the receiver cannot be connected.
-  - Solution: Execute `SHOW PIPESERVER` on the receiver side to check if the 
receiver side is started, if not use `START PIPESERVER` to start; check if the 
whitelist in `iotdb-datanode.properties` on the receiver side contains the 
sender ip.
-
-- Execute 
-
-  ```
-  DROP PIPESINK pipesinkName
-  ```
-
-  get message.
-
-  ```
-  Msg: 411: Can not drop pipeSink demo, because pipe mypipe is using it.
-  ```
+  - Cause by: Current PipeSink already exists
+  - Solution: Execute `DROP PIPESINK demo` to drop PipeSink and recreate it.
+- Execute `DROP PIPESINK pipesinkName` get message `Can not drop PIPESINK 
[demo], because PIPE [mypipe] is using it.`
 
   - Cause by: It is not allowed to delete PipeSink that is used by a running 
PIPE.
   - Solution: Execute `SHOW PIPE` on the sender side to stop using the 
PipeSink's PIPE.
 
-- Sender creates PIPE prompt.
-
-  ```
-  Msg: 411: Pipe p is RUNNING, please retry after drop it.
-  ```
-
-  - Cause by: There is already a running PIPE.
-  - Solution: Execute `DROP PIPE p ` and retry.
+- Execute `CREATE PIPE p to demo`  get message  `PIPE [p] is STOP, please 
retry after drop it.`
+  - Cause by: Current Pipe already exists
+  - Solution: Execute `DROP PIPE p` to drop Pipe and recreate it.
+- Execute `CREATE PIPE p to demo` get message  `Fail to create PIPE [p] 
because Connection refused on DataNode: {id=2, 
internalEndPoint=TEndPoint(ip:127.0.0.1, port:9005)}.`
+  - Cause by: There are some DataNodes with the status Running cannot be 
connected.
+  - Solution: Execute `SHOW DATANODES`, and check for unreachable DataNode 
networks, or wait for their status to change to Unknown and re-execute the 
statement.
+- Execute `START PIPE p`  get message  `Fail to start PIPE [p] because 
Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, 
port:9005)}.`
+  - Cause by: There are some DataNodes with the status Running cannot be 
connected.
+  - Solution: Execute `SHOW DATANODES`, and check for unreachable DataNode 
networks, or wait for their status to change to Unknown and re-execute the 
statement.
+- Execute `STOP PIPE p`  get message  `Fail to stop PIPE [p] because 
Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, 
port:9005)}.`
+  - Cause by: There are some DataNodes with the status Running cannot be 
connected.
+  - Solution: Execute `SHOW DATANODES`, and check for unreachable DataNode 
networks, or wait for their status to change to Unknown and re-execute the 
statement.
+- Execute `DROP PIPE p`  get message  `Fail to DROP_PIPE because Fail to drop 
PIPE [p] because Connection refused on DataNode: {id=2, 
internalEndPoint=TEndPoint(ip:127.0.0.1, port:9005)}. Please execute [DROP PIPE 
p] later to retry.`
+  - Cause by: There are some DataNodes with the status Running cannot be 
connected. Pipe has been deleted on some nodes and the status has been set to 
***DROP***.
+  - Solution: Execute `SHOW DATANODES`, and check for unreachable DataNode 
networks, or wait for their status to change to Unknown and re-execute the 
statement.
diff --git a/docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md 
b/docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md
index 7ca0492ea2..bd675f6d44 100644
--- a/docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md
+++ b/docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md
@@ -19,66 +19,50 @@
 
 -->
 
-# 端云协同
+# TsFile 同步
 
 ## 1.介绍
 
 同步工具是持续将边缘端(发送端) IoTDB 中的时间序列数据上传并加载至云端(接收端) IoTDB 的套件工具。
 
-同步工具的发送端内嵌于 IoTDB 的引擎,使用同步工具需要首先启动IoTDB。
+IoTDB 同步工具内嵌于 IoTDB 引擎,与下游接收端相连,下游支持 IoTDB(单机/集群)。
 
-可以在发送端使用 SQL 命令来启动或者关闭一个同步任务,并且可以随时查看同步任务的状态。在接收端,您可以通过设置IP白名单来规定准入IP地址范围。
+可以在发送端使用 SQL 命令来启动或者关闭一个同步任务,并且可以随时查看同步任务的状态。在接收端,您可以通过设置 IP 白名单来规定准入 IP 地址范围。
 
 ## 2.模型定义
 
 
![pipe2.png](https://github.com/apache/iotdb-bin-resources/blob/main/docs/UserGuide/System%20Tools/Sync-Tool/pipe2.png?raw=true)
 
-假设目前有两台机器A和B都安装了IoTDB,希望将A上的数据不断同步至B中。为了更好地描述这个过程,我们引入以下概念。
+TsFile 同步工具实现了数据从 "流入-> IoTDB ->流出" 的闭环。假设目前有两台机器A和B都安装了IoTDB,希望将 A 上的数据不断同步至 
B 中。为了更好地描述这个过程,我们引入以下概念。
 
 - Pipe
-  - 指一次同步任务,在上述案例中,我们可以看作在A和B之间有一根数据流管道连接了A和B。
-  - 一个Pipe有三种状态,RUNNING,STOP,DROP,分别表示正在运行,暂停和永久取消。
+  - 指一次同步任务,在上述案例中,我们可以看作在 A 和 B 之间有一根数据流管道连接了 A 和 B。
+  - 一个正常运行的 Pipe 有两种状态:RUNNING 表示正在向接收端同步数据,STOP 表示暂停向接收端同步数据。
 - PipeSink
-  - 指接收端,在上述案例中,PipeSink即是B这台机器。PipeSink的类型目前仅支持IoTDB,即接收端为B上安装的IoTDB实例。
-  -  PipeServer:当PipeSink的类型为IoTDB的时候,需要打开IoTDB的PipeServer服务来让Pipe数据得到处理。
+  - 指接收端,在上述案例中,PipeSink 即是 B 这台机器。PipeSink 的类型目前仅支持 IoTDB,即接收端为 B 上安装的 IoTDB 
实例。
 
 ## 3.注意事项
 
-- 目前仅支持多对一模式,不支持一对多,即一个发送端只能发送数据到一个接收端,而一个接收端可以接受来自多个发送端的数据。
-- 发送端只能有一个非DROP状态的Pipe,如果想创建一个新的Pipe,请取消当前Pipe。
-- 当有一个或多个发送端指向一个接收端时,这些发送端和接收端各自设备路径集合之间应当没有交集,否则可能产生不可预料错误 。
+- 同步工具的发送端目前仅支持 IoTDB 1.0 版本**单数据副本配置**,接收端支持 IoTDB 1.0 版本任意配置。
+- 当有一个或多个发送端指向一个接收端时,这些发送端和接收端各自的设备路径集合之间应当没有交集,否则可能产生不可预料错误 
   - 
例如:当发送端A包括路径`root.sg.d.s`,发送端B也包括路径`root.sg.d.s`,当发送端A删除`root.sg`存储组时将也会在接收端删除所有B在接收端的`root.sg.d.s`中存放的数据。
-- 两台机器之间目前不支持相互同步。
-- 同步工具仅同步所有对数据写入和删除,元数据的创建和删除,如TTL的设置,Trigger,CQ等其他操作均不同步。
-  - 若在发送端设置了TTL,则启动Pipe时候IoTDB中所有未过期的数据以及未来所有的数据写入和删除都会被同步至接收端
+- 两个“端”之间目前不支持相互同步。
+- 同步工具仅同步数据写入、数据删除、时间序列删除,若接收端未创建存储组,自动创建与发送端同级存储组。TTL 的设置、Trigger、CQ 等其他操作均不同步
+  - 若在发送端设置了 TTL,则启动 Pipe 时候 IoTDB 中所有未过期的数据以及未来所有的数据写入和删除都会被同步至接收端
+- 对同步任务进行操作时,需保证 `SHOW DATANODES` 中所有处于 Running 状态的 DataNode 节点均可连通,否则将执行失败。
 
 ## 4.快速上手
 
 在发送端和接收端执行如下语句即可快速开始两个 IoTDB 之间的数据同步,完整的 SQL 
语句和配置事项请查看`配置参数`和`SQL`两节,更多使用范例请参考`使用范例`节。
 
-#### 4.1接收端
-
-- 开启PipeServer
-
-```
-IoTDB> START PIPESERVER
-```
-
-- 关闭PipeServer(在所有发送端取消了Pipe之后执行)
-
-```
-IOTDB> STOP PIPESERVER
-```
-
-#### 4.2发送端
-
+- 启动发送端 IoTDB 与接收端 IoTDB
 - 创建接收端为 IoTDB 类型的 Pipe Sink
 
 ```
-IoTDB> CREATE PIPESINK my_iotdb AS IoTDB (ip='输入你的IP')
+IoTDB> CREATE PIPESINK my_iotdb AS IoTDB (ip='接收端IP', port='接收端端口')
 ```
 
-- 创建同步任务Pipe(开启之前请确保接收端 IoTDB 的 PipeServer 已经启动)
+- 创建同步任务Pipe(请确保接收端 IoTDB 已经启动)
 
 ```
 IoTDB> CREATE PIPE my_pipe TO my_iotdb
@@ -108,7 +92,7 @@ IoTDB> STOP PIPE my_pipe
 IoTDB> START PIPE my_pipe
 ```
 
-- 关闭任务(状态信息可被删除)
+- 关闭任务(状态信息将被删除)
 
 ```
 IoTDB> DROP PIPE my_pipe
@@ -118,7 +102,7 @@ IoTDB> DROP PIPE my_pipe
 
 所有参数修改均在`$IOTDB_HOME$/conf/iotdb-datanode.properties`中,所有修改完成之后执行`load 
configuration`之后即可立刻生效。
 
-#### 5.1发送端相关
+### 5.1发送端相关
 
 | **参数名** | **max_number_of_sync_file_retry**          |
 | ---------- | ------------------------------------------ |
@@ -126,37 +110,19 @@ IoTDB> DROP PIPE my_pipe
 | 类型       | Int : [0,2147483647]                       |
 | 默认值     | 5                                          |
 
-
-
-#### 5.2接收端相关
+### 5.2接收端相关
 
 | **参数名** | **ip_white_list**                                            |
 | ---------- | ------------------------------------------------------------ |
-| 描述       | 设置同步功能发送端 IP 地址的白名单,以网段的形式表示,多个网段之间用逗号分隔。发送端向接收端同步数据时,只有当该发送端 IP 
地址处于该白名单设置的网段范围内,接收端才允许同步操作。如果白名单为空,则接收端不允许任何发送端同步数据。默认接收端接受全部 IP 的同步请求。 |
+| 描述       | 设置同步功能发送端 IP 地址的白名单,以网段的形式表示,多个网段之间用逗号分隔。发送端向接收端同步数据时,只有当该发送端 IP 
地址处于该白名单设置的网段范围内,接收端才允许同步操作。如果白名单为空,则接收端不允许任何发送端同步数据。默认接收端接受全部 IP 的同步请求。 
对该参数进行配置时,需要保证发送端所有 DataNode 地址均被覆盖。 |
 | 类型       | String                                                       |
 | 默认值     | 0.0.0.0/0                                                    |
 
-
-
-| **参数名** | ***pipe_server_port***                                       |
-| ---------- | ------------------------------------------------------------ |
-| 描述       | 同步接收端服务器监听接口,请确认该端口不是系统保留端口并且未被占用。 |
-| 类型       | Short Int : [0,65535]                                        |
-| 默认值     | 6670                                                         |
-
-
-
 ## 6.SQL
 
-#### 6.1发送端
-
-- 创建接收端为 IoTDB 类型的 Pipe Sink,其中IP和port是可选参数
-
-```
-IoTDB> CREATE PIPESINK <PipeSinkName> AS IoTDB [(ip='127.0.0.1',port=6670);]
-```
+### SHOW PIPESINKTYPE
 
-- 显示当前所能支持的 Pipe Sink 类型
+- 显示当前所能支持的 PipeSink 类型。
 
 ```Plain%20Text
 IoTDB> SHOW PIPESINKTYPE
@@ -168,73 +134,48 @@ IoTDB>
 +-----+
 ```
 
-- 显示当前所有 Pipe Sink 定义,结果集有三列,分别表示pipesink的名字,pipesink的类型,pipesink的属性
+### CREATE PIPESINK
+
+- 创建接收端为 IoTDB 类型的 PipeSink,其中IP和port是可选参数。当接收端为集群时,填写任意一个 DataNode 的 
`rpc_address` 与 `rpc_port`。
 
 ```
-IoTDB> SHOW PIPESINKS
-IoTDB> SHOW PIPESINK [PipeSinkName]
-IoTDB> 
-+-----------+-----+------------------------+
-|       name| type|              attributes|
-+-----------+-----+------------------------+
-|my_pipesink|IoTDB|ip='127.0.0.1',port=6670|
-+-----------+-----+------------------------+
+IoTDB> CREATE PIPESINK <PipeSinkName> AS IoTDB [(ip='127.0.0.1',port=6670);]
 ```
 
-- 删除 Pipe Sink 信息
+### DROP PIPESINK
+
+- 删除 PipeSink。当 PipeSink 正在被同步任务使用时,无法删除 PipeSink。
 
 ```
 IoTDB> DROP PIPESINK <PipeSinkName>
 ```
 
-- 创建同步任务
-  - 其中Select语句目前仅支持`**`(即所有序列中的数据),FROM语句目前仅支持`root`,Where语句仅支持指定time的起始时间
-  - `SyncDelOp`参数为true时会同步删除数据操作,否则不同步删除数据操作
+### SHOW PIPESINK
+
+- 显示当前所有 PipeSink 定义,结果集有三列,分别表示 PipeSink 的名字,PipeSink 的类型,PipeSink 的属性。
 
 ```
-IoTDB> CREATE PIPE my_pipe TO my_iotdb [FROM (select ** from root WHERE 
time>=yyyy-mm-dd HH:MM:SS)] [WITH SyncDelOp=true]
+IoTDB> SHOW PIPESINKS
+IoTDB> SHOW PIPESINK [PipeSinkName]
+IoTDB> 
++-----------+-----+------------------------+
+|       name| type|              attributes|
++-----------+-----+------------------------+
+|my_pipesink|IoTDB|ip='127.0.0.1',port=6670|
++-----------+-----+------------------------+
 ```
 
-- 显示所有同步任务状态
-
-> 该指令在发送端和接收端均可执行
-
-- create time,pipe的创建时间
-
-- name,pipe的名字
+### CREATE PIPE
 
-- role,当前IoTDB在pipe中的角色,可能有两种角色:
-  - sender,当前IoTDB为同步发送端
-
-  - receiver,当前IoTDB为同步接收端
-  
-- remote,pipe的对端信息
-  - 当role为receiver时,这一字段值为发送端ip
-
-  - 当role为sender时,这一字段值为pipeSink名称
-  
-- status,pipe状态
-- message,pipe运行信息,当pipe正常运行时,这一字段通常为空,当出现异常时,可能出现两种状态:
-  - WARN状态,这表明发生了数据丢失或者其他错误,但是Pipe会保持运行
-  - ERROR状态,这表明发生了网络长时间中断或者接收端出现问题,Pipe被停止,置为STOP状态
+- 创建同步任务
+  - 其中 select 语句目前仅支持`**`(即所有序列中的数据),from 语句目前仅支持`root`,where语句仅支持指定 time 
的起始时间。起始时间的指定形式可以是 yyyy-mm-dd HH:MM:SS或时间戳。
+  - `SyncDelOp`参数为 true 时会同步删除数据操作,否则不同步删除数据操作,默认为 false。
 
-```
-IoTDB> SHOW PIPES
-IoTDB>
-+-----------------------+--------+--------+-------------+---------+-------+
-|            create time|   name |    role|       remote|   status|message|
-+-----------------------+--------+--------+-------------+---------+-------+
-|2022-03-30T20:58:30.689|my_pipe1|  sender|  my_pipesink|     STOP|       |
-+-----------------------+--------+--------+-------------+---------+-------+ 
-|2022-03-31T12:55:28.129|my_pipe2|receiver|192.168.11.11|  RUNNING|       |
-+-----------------------+--------+--------+-------------+---------+-------+
+```Plain%20Text
+IoTDB> CREATE PIPE my_pipe TO my_iotdb [FROM (select ** from root WHERE 
time>=yyyy-mm-dd HH:MM:SS)] [WITH SyncDelOp=false]
 ```
 
-- 显示指定同步任务状态,当未指定PipeName时,与`SHOW PIPES`等效
-
-```
-IoTDB> SHOW PIPE [PipeName]
-```
+### STOP PIPE
 
 - 暂停任务
 
@@ -242,66 +183,85 @@ IoTDB> SHOW PIPE [PipeName]
 IoTDB> STOP PIPE <PipeName>
 ```
 
-- 继续被暂停的任务
+### START PIPE
+
+- 开始任务
 
 ```
 IoTDB> START PIPE <PipeName>
 ```
 
+### DROP PIPE
+
 - 关闭任务(状态信息可被删除)
 
 ```
 IoTDB> DROP PIPE <PipeName>
 ```
 
-#### 6.2接收端
+### SHOW PIPE
 
-- 启动本地的 IoTDB Pipe Server
+> 该指令在发送端和接收端均可执行
 
-```
-IoTDB> START PIPESERVER
-```
+- 显示所有同步任务状态
 
-- 关闭本地的 IoTDB Pipe Server
+  - `create time`:Pipe 的创建时间
 
-```
-IoTDB> STOP PIPESERVER
+  - `name`:Pipe 的名字
+
+  - `role`:当前 IoTDB 在 Pipe 中的角色,可能有两种角色:
+    - sender,当前 IoTDB 为同步发送端
+    - receiver,当前 IoTDB 为同步接收端
+
+  - `remote`:Pipe的对端信息
+    - 当 role 为 sender 时,这一字段值为 PipeSink 名称
+    - 当 role 为 receiver 时,这一字段值为发送端 IP
+
+  - `status`:Pipe状态
+
+  - `attributes`:Pipe属性
+    - 当 role 为 sender 时,这一字段值为 Pipe 的同步起始时间和是否同步删除操作
+    - 当 role 为 receiver 时,这一字段值为当前 DataNode 上创建的同步连接对应的存储组名称
+
+  - `message`:Pipe运行信息,当 Pipe 正常运行时,这一字段通常为空,当出现异常时,可能出现两种状态:
+    - WARN 状态,这表明发生了数据丢失或者其他错误,但是 Pipe 会保持运行
+    - ERROR 状态,这表明发生了网络长时间中断或者接收端出现问题,Pipe 被停止,置为 STOP 状态
+
+```Plain%20Text
+IoTDB> SHOW PIPES
+IoTDB>
++-----------------------+--------+--------+-------------+---------+-----------------------------------+-------+
+|            create time|   name |    role|       remote|   status|            
             attributes|message|
++-----------------------+--------+--------+-------------+---------+-----------------------------------+-------+
+|2022-03-30T20:58:30.689|my_pipe1|  sender|  my_pipesink|     
STOP|syncDelOp=true,dataStartTimestamp=0|       |
++-----------------------+--------+--------+-------------+---------+-----------------------------------+-------+
 
+|2022-03-31T12:55:28.129|my_pipe2|receiver|192.168.11.11|  RUNNING|        
storageGroup='root.vehicle'|       |
++-----------------------+--------+--------+-------------+---------+-----------------------------------+-------+
 ```
 
-- 显示本地 Pipe Server 的信息
-  - true表示PipeServer正在运行,false表示PipeServer停止服务
+- 显示指定同步任务状态,当未指定PipeName时,与`SHOW PIPES`等效
 
 ```
-IoTDB> SHOW PIPESERVER
-+----------+
-|    enalbe|
-+----------+
-|true/false|
-+----------+
+IoTDB> SHOW PIPE [PipeName]
 ```
 
 ## 7.使用示例
 
-#### **目标**
+### 目标
 
 - 创建一个从边端 IoTDB 到 云端 IoTDB 的 同步工作
 - 边端希望同步从2022年3月30日0时之后的数据
 - 边端不希望同步所有的删除操作
-- 边端处于弱网环境,需要配置更多的重试次数
-- 云端IoTDB仅接受来自边端的IoTDB的数据
+- 云端 IoTDB 仅接受来自边端的 IoTDB 的数据
 
-#### **接收端操作**
+### 接收端操作
 
-- `vi conf/iotdb-datanode.properties` 配置云端参数,将白名单设置为仅接收来自IP为 192.168.0.1的边端的数据
+`vi conf/iotdb-datanode.properties` 配置云端参数,将白名单设置为仅接收来自 IP 为 192.168.0.1 的边端的数据
 
 ```
 ####################
 ### PIPE Server Configuration
 ####################
-# PIPE server port to listen
-# Datatype: int
-# pipe_server_port=6670
-
 # White IP list of Sync client.
 # Please use the form of network segment to present the range of IP, for 
example: 192.168.0.0/16
 # If there are more than one IP segment, please separate them by commas
@@ -310,57 +270,36 @@ IoTDB> SHOW PIPESERVER
 ip_white_list=192.168.0.1/32
 ```
 
-- 云端启动 IoTDB 同步接收端
+### 发送端操作
 
-```
-IoTDB> START PIPESERVER
-```
-
-- 云端显示 IoTDB 同步接收端信息,如果结果为true则表示正确启动
-
-```
-IoTDB> SHOW PIPESERVER
-```
-
-#### **发送端操作**
-
-- 配置边端参数,将`max_number_of_sync_file_retry`参数设置为10
-
-```
-####################
-### PIPE Sender Configuration
-####################
-# The maximum number of retry when syncing a file to receiver fails.
-max_number_of_sync_file_retry=10
-```
-
-- 创建云端PipeSink,指定类型为IoTDB,指定云端IP地址为192.168.0.1,指定云端的PipeServer服务端口为6670
+创建云端 PipeSink,指定类型为 IoTDB,指定云端 IP 地址为 192.168.0.1,指定云端的 PipeServer 服务端口为6670
 
 ```
 IoTDB> CREATE PIPESINK my_iotdb AS IoTDB (ip='192.168.0.1',port=6670)
 ```
 
-- 创建Pipe,指定连接到my_iotdb的PipeSink,在WHREE子句中输入开始时间点2022年3月30日0时,将SyncDelOp置为false
+创建Pipe,指定连接到my_iotdb的PipeSink,在WHREE子句中输入开始时间点2022年3月30日0时,将SyncDelOp置为false。以下两条执行语句等价。
 
 ```
 IoTDB> CREATE PIPE p TO my_iotdb FROM (select ** from root where 
time>=2022-03-30 00:00:00) WITH SyncDelOp=false
+IoTDB> CREATE PIPE p TO my_iotdb FROM (select ** from root where time>= 
1648569600000)
 ```
 
-- 启动Pipe
+启动Pipe
 
 ```Plain%20Text
 IoTDB> START PIPE p
 ```
 
-- 显示同步任务状态
+显示同步任务状态
 
 ```
 IoTDB> SHOW PIPE p
 ```
 
-#### 结果验证
+### 结果验证
 
-在发送端执行以下SQL
+在发送端执行以下 SQL
 
 ```SQL
 SET STORAGE GROUP TO root.vehicle;
@@ -397,57 +336,24 @@ It costs 0.134s
 
 ## 8.常见问题
 
-- 执行 
-
-  ```
-  STOP PIPESERVER
-  ```
-
-   关闭本地的 IoTDB Pipe Server 时提示 
-
-  ```
-  Msg: 328: Failed to stop pipe server because there is pipe still running.
-  ```
-
-  - 原因:接收端有正在运行的同步任务
-
-  - 解决方案:在发送端先执行 `STOP PIPE` PipeName 停止任务,后关闭 IoTDB Pipe Server
-
-- 执行 
-
-  ```
-  CREATE PIPE mypipe
-  ```
-
-    提示  
-
-  ```
-  Msg: 411: Create transport for pipe mypipe error, because CREATE request 
connects to receiver 127.0.0.1:6670 error..
-  ```
-
-  - 原因:接收端未启动或接收端无法连接
-  - 解决方案:在接收端执行 `SHOW PIPESERVER` 检查是否启动接收端,若未启动使用 `START PIPESERVER` 
启动;检查接收端`iotdb-datanode.properties`中的白名单是否包含发送端ip。
-
-- 执行 
-
-  ```
-  DROP PIPESINK pipesinkName
-  ```
-
-   提示 
-
-  ```
-  Msg: 411: Can not drop pipeSink demo, because pipe mypipe is using it.
-  ```
-
+- 执行 `CREATE PIPESINK demo as IoTDB` 提示 `PIPESINK [demo] already exists in 
IoTDB.`
+  - 原因:当前 PipeSink 已存在
+  - 解决方案:删除 PipeSink 后重新创建
+- 执行 `DROP PIPESINK pipesinkName` 提示 `Can not drop PIPESINK [demo], because 
PIPE [mypipe] is using it.`
   - 原因:不允许删除有正在运行的PIPE所使用的 PipeSink
   - 解决方案:在发送端执行 `SHOW PIPE`,停止使用该 PipeSink 的 PIPE
-
-- 发送端创建 PIPE 提示 
-
-  ```
-  Msg: 411: Pipe p is RUNNING, please retry after drop it.
-  ```
-
-  - 原因:已有运行中的 PIPE
-  - 解决方案:执行 `DROP PIPE p `后重试
+- 执行 `CREATE PIPE p to demo` 提示 `PIPE [p] is STOP, please retry after drop it.`
+  - 原因:当前 Pipe 已存在
+  - 解决方案:删除 Pipe 后重新创建
+- 执行 `CREATE PIPE p to demo`提示 `Fail to create PIPE [p] because Connection 
refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, 
port:9005)}.`
+  - 原因:存在状态为 Running 的 DataNode 无法连通
+  - 解决方案:执行 `SHOW DATANODES` 语句,检查无法连通的 DataNode 网络,或等待其状态变为 Unknown 后重新执行语句。
+- 执行 `START PIPE p` 提示 `Fail to start PIPE [p] because Connection refused on 
DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, port:9005)}.`
+  - 原因:存在状态为 Running 的 DataNode 无法连通
+  - 解决方案:执行 `SHOW DATANODES` 语句,检查无法连通的 DataNode 网络,或等待其状态变为 Unknown 后重新执行语句。
+- 执行 `STOP PIPE p` 提示 `Fail to stop PIPE [p] because Connection refused on 
DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, port:9005)}.`
+  - 原因:存在状态为 Running 的 DataNode 无法连通
+  - 解决方案:执行 `SHOW DATANODES` 语句,检查无法连通的 DataNode 网络,或等待其状态变为 Unknown 后重新执行语句。
+- 执行 `DROP PIPE p` 提示 `Fail to DROP_PIPE because Fail to drop PIPE [p] because 
Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, 
port:9005)}. Please execute [DROP PIPE p] later to retry.`
+  - 原因:存在状态为 Running 的 DataNode 无法连通,Pipe 已在部分节点上被删除,状态被置为 ***DROP***。
+  - 解决方案:执行 `SHOW DATANODES` 语句,检查无法连通的 DataNode 网络,或等待其状态变为 Unknown 后重新执行语句。
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
index ea5dd0c0c3..87a23b4063 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
@@ -18,12 +18,14 @@
  */
 package org.apache.iotdb.db.it.sync;
 
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 
+import org.apache.commons.lang3.StringUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -43,17 +45,11 @@ public class IoTDBPipeIT {
   private static String ip;
   private static int port;
   private static final String SHOW_PIPE_HEADER =
-      ColumnHeaderConstant.COLUMN_PIPE_CREATE_TIME
-          + ","
-          + ColumnHeaderConstant.COLUMN_PIPE_NAME
-          + ","
-          + ColumnHeaderConstant.COLUMN_PIPE_ROLE
-          + ","
-          + ColumnHeaderConstant.COLUMN_PIPE_REMOTE
-          + ","
-          + ColumnHeaderConstant.COLUMN_PIPE_STATUS
-          + ","
-          + ColumnHeaderConstant.COLUMN_PIPE_MESSAGE
+      StringUtils.join(
+              ColumnHeaderConstant.showPipeColumnHeaders.stream()
+                  .map(ColumnHeader::getColumnName)
+                  .toArray(),
+              ",")
           + ",";
 
   @BeforeClass
@@ -89,8 +85,9 @@ public class IoTDBPipeIT {
 
       statement.execute(
           String.format("CREATE PIPESINK demo AS IoTDB (ip='%s',port='%d');", 
ip, port));
-      statement.execute("CREATE PIPE p1 to demo;");
-      statement.execute("CREATE PIPE p2 to demo;");
+      statement.execute(
+          "CREATE PIPE p1 to demo FROM (select ** from root where 
time>=1648569600000) WITH SyncDelOp=false;");
+      statement.execute("CREATE PIPE p2 to demo WITH SyncDelOp=true;");
       try {
         // check exception2: PIPE already exist
         statement.execute("CREATE PIPE p2 to demo;");
@@ -104,8 +101,11 @@ public class IoTDBPipeIT {
       try (ResultSet resultSet = statement.executeQuery("SHOW PIPE")) {
         String[] expectedRetSet =
             new String[] {
-              String.format("%s,p1,sender,demo,STOP,NORMAL,", createTime1),
-              String.format("%s,p2,sender,demo,STOP,NORMAL,", createTime2)
+              String.format(
+                  
"%s,p1,sender,demo,STOP,syncDelOp=false,dataStartTimestamp=1648569600000,NORMAL,",
+                  createTime1),
+              String.format(
+                  
"%s,p2,sender,demo,STOP,syncDelOp=true,dataStartTimestamp=0,NORMAL,", 
createTime2)
             };
         assertResultSetEqual(resultSet, SHOW_PIPE_HEADER, expectedRetSet);
       }
@@ -114,8 +114,11 @@ public class IoTDBPipeIT {
         String[] expectedRetSet =
             new String[] {
               // there is no data now, so no connection in receiver
-              String.format("%s,p1,sender,demo,RUNNING,NORMAL,", createTime1),
-              String.format("%s,p2,sender,demo,STOP,NORMAL,", createTime2)
+              String.format(
+                  
"%s,p1,sender,demo,RUNNING,syncDelOp=false,dataStartTimestamp=1648569600000,NORMAL,",
+                  createTime1),
+              String.format(
+                  
"%s,p2,sender,demo,STOP,syncDelOp=true,dataStartTimestamp=0,NORMAL,", 
createTime2)
             };
         assertResultSetEqual(resultSet, SHOW_PIPE_HEADER, expectedRetSet);
       }
@@ -127,7 +130,11 @@ public class IoTDBPipeIT {
       }
       try (ResultSet resultSet = statement.executeQuery("SHOW PIPE p1")) {
         String[] expectedRetSet =
-            new String[] {String.format("%s,p1,sender,demo,RUNNING,NORMAL,", 
createTime1)};
+            new String[] {
+              String.format(
+                  
"%s,p1,sender,demo,RUNNING,syncDelOp=false,dataStartTimestamp=1648569600000,NORMAL,",
+                  createTime1)
+            };
         assertResultSetEqual(resultSet, SHOW_PIPE_HEADER, expectedRetSet);
       }
       statement.execute("STOP PIPE p1;");
@@ -135,8 +142,11 @@ public class IoTDBPipeIT {
       try (ResultSet resultSet = statement.executeQuery("SHOW PIPE")) {
         String[] expectedRetSet =
             new String[] {
-              String.format("%s,p1,sender,demo,STOP,NORMAL,", createTime1),
-              String.format("%s,p2,sender,demo,STOP,NORMAL,", createTime2)
+              String.format(
+                  
"%s,p1,sender,demo,STOP,syncDelOp=false,dataStartTimestamp=1648569600000,NORMAL,",
+                  createTime1),
+              String.format(
+                  
"%s,p2,sender,demo,STOP,syncDelOp=true,dataStartTimestamp=0,NORMAL,", 
createTime2)
             };
         assertResultSetEqual(resultSet, SHOW_PIPE_HEADER, expectedRetSet);
       }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
index 8bc28b674e..e350364990 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java
@@ -56,7 +56,7 @@ public class IoTDBPipeSinkIT {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
       String expectedHeader = ColumnHeaderConstant.COLUMN_PIPESINK_TYPE + ",";
-      String[] expectedRetSet = new String[] {"IoTDB,", "ExternalPipe,"};
+      String[] expectedRetSet = new String[] {"IoTDB,"};
       try (ResultSet resultSet = statement.executeQuery("SHOW PIPESINKTYPE")) {
         assertResultSetEqual(resultSet, expectedHeader, expectedRetSet);
       }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
index 288cb8f80d..09015525ec 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
@@ -81,7 +81,13 @@ public class TsFilePipeInfo extends PipeInfo {
   @Override
   public TShowPipeInfo getTShowPipeInfo() {
     return new TShowPipeInfo(
-        createTime, pipeName, "sender", pipeSinkName, status.name(), 
messageType.name());
+        createTime,
+        pipeName,
+        "sender",
+        pipeSinkName,
+        status.name(),
+        String.format("syncDelOp=%s,dataStartTimestamp=%s", syncDelOp, 
dataStartTimestamp),
+        messageType.name());
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index 7924503897..b2e9cc4484 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -113,6 +113,7 @@ public class ColumnHeaderConstant {
   public static final String COLUMN_PIPE_ROLE = "role";
   public static final String COLUMN_PIPE_REMOTE = "remote";
   public static final String COLUMN_PIPE_STATUS = "status";
+  public static final String COLUMN_PIPE_ATTRIBUTES = "attributes";
   public static final String COLUMN_PIPE_MESSAGE = "message";
 
   // column names for select into
@@ -285,6 +286,7 @@ public class ColumnHeaderConstant {
           new ColumnHeader(COLUMN_PIPE_ROLE, TSDataType.TEXT),
           new ColumnHeader(COLUMN_PIPE_REMOTE, TSDataType.TEXT),
           new ColumnHeader(COLUMN_PIPE_STATUS, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_PIPE_ATTRIBUTES, TSDataType.TEXT),
           new ColumnHeader(COLUMN_PIPE_MESSAGE, TSDataType.TEXT));
 
   public static final List<ColumnHeader> selectIntoColumnHeaders =
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index ee4c4129fd..10df3d09c3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
@@ -128,6 +129,7 @@ import 
org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -1230,7 +1232,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         tShowPipeReq.setPipeName(showPipeStatement.getPipeName());
       }
       TShowPipeResp resp = configNodeClient.showPipe(tShowPipeReq);
-      ShowPipeTask.buildTSBlock(resp.getPipeInfoList(), future);
+      List<TShowPipeInfo> tShowPipeInfoList =
+          
SyncService.getInstance().showPipeForReceiver(showPipeStatement.getPipeName());
+      tShowPipeInfoList.addAll(resp.getPipeInfoList());
+      ShowPipeTask.buildTSBlock(tShowPipeInfoList, future);
     } catch (Exception e) {
       future.setException(e);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
index 71da39ac85..7d4b46c2e2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
@@ -70,7 +70,8 @@ public class ShowPipeTask implements IConfigTask {
       builder.getColumnBuilder(2).writeBinary(new Binary(tPipeInfo.getRole()));
       builder.getColumnBuilder(3).writeBinary(new 
Binary(tPipeInfo.getRemote()));
       builder.getColumnBuilder(4).writeBinary(new 
Binary(tPipeInfo.getStatus()));
-      builder.getColumnBuilder(5).writeBinary(new 
Binary(tPipeInfo.getMessage()));
+      builder.getColumnBuilder(5).writeBinary(new 
Binary(tPipeInfo.getAttributes()));
+      builder.getColumnBuilder(6).writeBinary(new 
Binary(tPipeInfo.getMessage()));
       builder.declarePosition();
     }
     DatasetHeader datasetHeader = DatasetHeaderFactory.getShowPipeHeader();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
index c60e062558..c3df6b0d95 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
@@ -245,9 +245,12 @@ public class StatementMemorySourceVisitor
             .collect(Collectors.toList());
     TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
     for (PipeSink.PipeSinkType pipeSinkType : PipeSink.PipeSinkType.values()) {
-      tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
-      tsBlockBuilder.getColumnBuilder(0).writeBinary(new 
Binary(pipeSinkType.name()));
-      tsBlockBuilder.declarePosition();
+      // TODO(sync): only support IoTDB PipeSinkType now.
+      if (pipeSinkType.equals(PipeSink.PipeSinkType.IoTDB)) {
+        tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+        tsBlockBuilder.getColumnBuilder(0).writeBinary(new 
Binary(pipeSinkType.name()));
+        tsBlockBuilder.declarePosition();
+      }
     }
     return new StatementMemorySource(
         tsBlockBuilder.build(), context.getAnalysis().getRespDatasetHeader());
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java 
b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index d9526d465f..7cce9048df 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -385,24 +385,28 @@ public class SyncService implements IService {
     }
   }
 
+  // TODO: Only used for LocalConfigNode. Delete it after delete 
LocalConfigNode.
   public List<TShowPipeInfo> showPipe(String pipeName) {
     boolean showAll = StringUtils.isEmpty(pipeName);
     List<TShowPipeInfo> list = new ArrayList<>();
     // show pipe in sender
     for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
       if (showAll || pipeName.equals(pipe.getPipeName())) {
-        TShowPipeInfo tPipeInfo =
-            new TShowPipeInfo(
-                pipe.getCreateTime(),
-                pipe.getPipeName(),
-                SyncConstant.ROLE_SENDER,
-                pipe.getPipeSinkName(),
-                pipe.getStatus().name(),
-                pipe.getMessageType().name());
-        list.add(tPipeInfo);
+        list.add(pipe.getTShowPipeInfo());
       }
     }
-    // show pipe in receiver
+    list.addAll(showPipeForReceiver(pipeName));
+    return list;
+  }
+
+  /**
+   * // show pipe in receiver
+   *
+   * @param pipeName null means show all pipe
+   */
+  public List<TShowPipeInfo> showPipeForReceiver(String pipeName) {
+    boolean showAll = StringUtils.isEmpty(pipeName);
+    List<TShowPipeInfo> list = new ArrayList<>();
     for (TSyncIdentityInfo identityInfo : 
receiverManager.getAllTSyncIdentityInfos()) {
       if (showAll || pipeName.equals(identityInfo.getPipeName())) {
         TShowPipeInfo tPipeInfo =
@@ -412,6 +416,7 @@ public class SyncService implements IService {
                 SyncConstant.ROLE_RECEIVER,
                 identityInfo.getAddress(),
                 PipeStatus.RUNNING.name(),
+                String.format("storageGroup='%s'", 
identityInfo.getStorageGroup()),
                 // TODO: implement receiver message
                 PipeMessage.PipeMessageType.NORMAL.name());
         list.add(tPipeInfo);
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index 7f318b0a39..ff00e4dc32 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -502,7 +502,8 @@ struct TShowPipeInfo {
   3: required string role
   4: required string remote
   5: required string status
-  6: required string message
+  6: required string attributes
+  7: required string message
 }
 
 struct TGetAllPipeInfoResp{

Reply via email to