This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 904a2a52695 [IOTDB-5940] Pipe: support 1.2 -> 1.1 sync connector 
(#10107)
904a2a52695 is described below

commit 904a2a52695dc0f62071cb1b723a4c2488f8bc48
Author: Caideyipi <[email protected]>
AuthorDate: Sun Jun 11 01:23:58 2023 +0800

    [IOTDB-5940] Pipe: support 1.2 -> 1.1 sync connector (#10107)
    
    Co-authored-by: yschengzi <[email protected]>
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../persistence/pipe/PipePluginInfo.java           |   8 +-
 .../org/apache/iotdb/pipe/api/PipeCollector.java   |   7 +-
 .../org/apache/iotdb/pipe/api/PipeConnector.java   |   6 +-
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   |   6 +-
 .../iotdb/pipe/api/customizer/PipeStrategy.java    |  28 ---
 .../PipeCollectorRuntimeConfiguration.java}        |  10 +-
 .../PipeConnectorRuntimeConfiguration.java}        |   6 +-
 .../PipeProcessorRuntimeConfiguration.java}        |   6 +-
 .../PipeRuntimeConfiguration.java}                 |   7 +-
 .../PipeRuntimeEnvironment.java}                   |   9 +-
 .../PipeConnectorRuntimeConfiguration.java         |  84 -------
 .../retry/EqualRetryIntervalStrategy.java          |  63 -----
 .../retry/ExponentialRetryIntervalStrategy.java    |  70 ------
 .../customizer/connector/retry/RetryStrategy.java  |  34 ---
 .../{ => parameter}/PipeParameterValidator.java    |   2 +-
 .../customizer/{ => parameter}/PipeParameters.java |   6 +-
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |   4 +-
 .../plugin/builtin/collector/IoTDBCollector.java   |   6 +-
 .../builtin/connector/DoNothingConnector.java      |   6 +-
 ...tConnector.java => IoTDBSyncConnectorV1_1.java} |  16 +-
 .../builtin/connector/IoTDBThriftConnector.java    |   6 +-
 .../builtin/processor/DoNothingProcessor.java      |   6 +-
 .../commons/pipe/task/meta/PipeStaticMeta.java     |   2 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      |   8 +-
 .../pipe/collector/IoTDBDataRegionCollector.java   |  68 ++----
 .../PipeHistoricalDataRegionTsFileCollector.java   |  61 +++--
 .../realtime/PipeRealtimeDataRegionCollector.java  |  26 +-
 .../PipeRealtimeDataRegionFakeCollector.java       |  12 +-
 .../PipeRealtimeDataRegionHybridCollector.java     |   8 +-
 .../PipeRealtimeDataRegionLogCollector.java        |   7 +-
 .../PipeRealtimeDataRegionTsFileCollector.java     |   7 +-
 .../{ => constant}/PipeCollectorConstant.java      |   4 +-
 .../{ => constant}/PipeConnectorConstant.java      |   7 +-
 .../{ => constant}/PipeProcessorConstant.java      |   2 +-
 .../configuraion/PipeTaskRuntimeConfiguration.java |  42 ++++
 .../env/PipeTaskCollectorRuntimeEnvironment.java   |  33 ++-
 .../plugin/env/PipeTaskRuntimeEnvironment.java     |  25 +-
 .../lagacy/IoTDBSyncConnectorImplV1_1.java         | 268 +++++++++++++++++++++
 .../pipe/connector/v1/IoTDBThriftConnectorV1.java  |  33 ++-
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |   2 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  12 +
 .../common/tablet/PipeRawTabletInsertionEvent.java |   2 +-
 .../db/pipe/processor/PipeDoNothingProcessor.java  |   8 +-
 .../pipe/resource/wal/PipeWALResourceManager.java  |   5 +-
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  16 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  62 ++---
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  10 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  29 +--
 .../task/subtask/PipeConnectorSubtaskManager.java  |  43 ++--
 .../collector/CachedSchemaPatternMatcherTest.java  |  31 +--
 .../db/pipe/collector/PipeRealtimeCollectTest.java |  38 +--
 51 files changed, 652 insertions(+), 615 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 22f0cb97943..784bf7df744 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -34,10 +34,10 @@ import 
org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTabl
 import org.apache.iotdb.confignode.consensus.response.udf.JarResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
-import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
-import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
index 724c1595571..8771febead4 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.pipe.api;
 
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 /**
@@ -48,7 +48,6 @@ import org.apache.iotdb.pipe.api.event.Event;
  *       cancelled (the `DROP PIPE` command is executed).
  * </ul>
  */
-// TODO: support event lifecycle management
 public interface PipeCollector extends PipePlugin {
 
   /**
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 6d74847e763..083590c1aef 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.pipe.api;
 
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index e94384d5f23..3c35e8b9e13 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.pipe.api;
 
 import org.apache.iotdb.pipe.api.collector.EventCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java
deleted file mode 100644
index 433ccc83512..00000000000
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.customizer;
-
-import org.apache.iotdb.pipe.api.exception.PipeStrategyNotValidException;
-
-public interface PipeStrategy {
-
-  /** @throws PipeStrategyNotValidException if invalid strategy is set */
-  void check() throws PipeStrategyNotValidException;
-}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
similarity index 75%
rename from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java
rename to 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
index c75f85bb7a7..071bfeb60c6 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
@@ -17,12 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.customizer;
+package org.apache.iotdb.pipe.api.customizer.configuration;
 
-import org.apache.iotdb.pipe.api.exception.PipeException;
-
-public interface PipeRuntimeConfiguration {
-
-  /** @throws PipeException if invalid runtime configuration is set */
-  void check() throws PipeException;
-}
+public interface PipeCollectorRuntimeConfiguration extends 
PipeRuntimeConfiguration {}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeConnectorRuntimeConfiguration.java
similarity index 82%
copy from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
copy to 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeConnectorRuntimeConfiguration.java
index a103b1db97a..4fc6813bef8 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeConnectorRuntimeConfiguration.java
@@ -17,8 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.customizer.connector.reuse;
+package org.apache.iotdb.pipe.api.customizer.configuration;
 
-import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
-
-public interface ReuseStrategy extends PipeStrategy {}
+public interface PipeConnectorRuntimeConfiguration extends 
PipeRuntimeConfiguration {}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeProcessorRuntimeConfiguration.java
similarity index 82%
copy from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
copy to 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeProcessorRuntimeConfiguration.java
index a103b1db97a..04c6f89ddbc 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeProcessorRuntimeConfiguration.java
@@ -17,8 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.customizer.connector.reuse;
+package org.apache.iotdb.pipe.api.customizer.configuration;
 
-import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
-
-public interface ReuseStrategy extends PipeStrategy {}
+public interface PipeProcessorRuntimeConfiguration extends 
PipeRuntimeConfiguration {}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeConfiguration.java
similarity index 82%
rename from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
rename to 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeConfiguration.java
index a103b1db97a..827d9f93ae7 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeConfiguration.java
@@ -17,8 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.customizer.connector.reuse;
+package org.apache.iotdb.pipe.api.customizer.configuration;
 
-import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
+public interface PipeRuntimeConfiguration {
 
-public interface ReuseStrategy extends PipeStrategy {}
+  PipeRuntimeEnvironment getRuntimeEnvironment();
+}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
similarity index 81%
rename from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java
rename to 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
index 3df93c6c5cd..455d293dccc 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
@@ -17,8 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.customizer.connector.parallel;
+package org.apache.iotdb.pipe.api.customizer.configuration;
 
-import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
+public interface PipeRuntimeEnvironment {
 
-public interface ParallelStrategy extends PipeStrategy {}
+  String getPipeName();
+
+  long getCreationTime();
+}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java
deleted file mode 100644
index 08417800108..00000000000
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.customizer.connector;
-
-import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
-import 
org.apache.iotdb.pipe.api.customizer.connector.parallel.ParallelStrategy;
-import org.apache.iotdb.pipe.api.customizer.connector.retry.RetryStrategy;
-import org.apache.iotdb.pipe.api.customizer.connector.reuse.ReuseStrategy;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-
-/**
- * Used in {@link PipeConnector#customize(PipeParameters, 
PipeConnectorRuntimeConfiguration)} to customize
- * the runtime behavior of the PipeConnector.
- * <p>
- * Supports calling methods in a chain.
- * <p>
- * Sample code:
- * <pre>{@code
- * @Override
- * public void beforeStart(PipeParameters params, 
PipeConnectorRuntimeConfiguration configs) {
- *   configs
- *       .reuseStrategy(X)
- *       .parallelStrategy(Y)
- *       .retryStrategy(Z);
- * }</pre>
- */
-public class PipeConnectorRuntimeConfiguration implements 
PipeRuntimeConfiguration {
-
-  private ReuseStrategy reuseStrategy;
-  private ParallelStrategy parallelStrategy;
-  private RetryStrategy retryStrategy;
-
-  public PipeConnectorRuntimeConfiguration reuseStrategy(ReuseStrategy 
reuseStrategy) {
-    this.reuseStrategy = reuseStrategy;
-    return this;
-  }
-
-  public PipeConnectorRuntimeConfiguration parallelStrategy(ParallelStrategy 
parallelStrategy) {
-    this.parallelStrategy = parallelStrategy;
-    return this;
-  }
-
-  public PipeConnectorRuntimeConfiguration retryStrategy(RetryStrategy 
retryStrategy) {
-    this.retryStrategy = retryStrategy;
-    return this;
-  }
-
-  @Override
-  public void check() throws PipeException {
-    if (reuseStrategy == null) {
-      throw new PipeException("ReuseStrategy is not set!");
-    }
-    reuseStrategy.check();
-
-    if (parallelStrategy == null) {
-      throw new PipeException("ParallelStrategy is not set!");
-    }
-    parallelStrategy.check();
-
-    if (retryStrategy == null) {
-      throw new PipeException("RetryStrategy is not set!");
-    }
-    retryStrategy.check();
-  }
-}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java
deleted file mode 100644
index fe693cc6fac..00000000000
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.customizer.connector.retry;
-
-import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.exception.PipeStrategyNotValidException;
-
-/**
- * Used in {@link PipeConnector#customize(PipeParameters, 
PipeConnectorRuntimeConfiguration)}. When
- * the PipeConnector fails to connect to the sink, it will try to reconnect by 
the specified
- * strategy.
- *
- * <p>When PipeConnector is set to {@link EqualRetryIntervalStrategy}, the 
interval of waiting time
- * for retrying will be the same.
- *
- * @see PipeConnector
- * @see PipeConnectorRuntimeConfiguration
- */
-public class EqualRetryIntervalStrategy implements RetryStrategy {
-
-  private final int maxRetryTimes;
-  private final long retryInterval;
-
-  /**
-   * @param maxRetryTimes maxRetryTimes > 0
-   * @param retryInterval retryInterval > 0
-   */
-  public EqualRetryIntervalStrategy(int maxRetryTimes, long retryInterval) {
-    this.maxRetryTimes = maxRetryTimes;
-    this.retryInterval = retryInterval;
-  }
-
-  @Override
-  public void check() {
-    if (maxRetryTimes <= 0) {
-      throw new PipeStrategyNotValidException(
-          String.format("Parameter maxRetryTimes(%d) should be greater than 
zero.", maxRetryTimes));
-    }
-    if (retryInterval <= 0) {
-      throw new PipeStrategyNotValidException(
-          String.format("Parameter retryInterval(%d) should be greater than 
zero.", retryInterval));
-    }
-  }
-}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java
deleted file mode 100644
index 797372158cd..00000000000
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.customizer.connector.retry;
-
-import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-
-/**
- * Used in {@link PipeConnector#customize(PipeParameters, 
PipeConnectorRuntimeConfiguration)}. When
- * the PipeConnector fails to connect to the sink, it will try to reconnect by 
the specified
- * strategy.
- *
- * <p>When PipeConnector is set to {@link ExponentialRetryIntervalStrategy}, 
the interval of waiting
- * time for retrying will be increased exponentially each time.
- *
- * @see PipeConnector
- * @see PipeConnectorRuntimeConfiguration
- */
-public class ExponentialRetryIntervalStrategy implements RetryStrategy {
-
-  private final int maxRetryTimes;
-  private final long initInterval;
-  private final double backOffFactor;
-
-  /**
-   * @param maxRetryTimes maxRetryTimes > 0
-   * @param initInterval retryInterval > 0
-   * @param backOffFactor backOffFactor > 0
-   */
-  public ExponentialRetryIntervalStrategy(
-      int maxRetryTimes, long initInterval, double backOffFactor) {
-    this.maxRetryTimes = maxRetryTimes;
-    this.initInterval = initInterval;
-    this.backOffFactor = backOffFactor;
-  }
-
-  @Override
-  public void check() {
-    if (maxRetryTimes <= 0) {
-      throw new RuntimeException(
-          String.format("Parameter maxRetryTimes(%d) should be greater than 
zero.", maxRetryTimes));
-    }
-    if (initInterval <= 0) {
-      throw new RuntimeException(
-          String.format("Parameter retryInterval(%d) should be greater than 
zero.", initInterval));
-    }
-    if (backOffFactor <= 0) {
-      throw new RuntimeException(
-          String.format("Parameter backOffFactor(%f) should be greater than 
zero.", backOffFactor));
-    }
-  }
-}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java
deleted file mode 100644
index 27e9c08d1ff..00000000000
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.customizer.connector.retry;
-
-import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
-import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-
-/**
- * Used to customize the strategy for reconnecting to sinks in {@link
- * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}.
- *
- * <p>When the PipeConnector fails to connect to the sink, it will try to 
reconnect by the specified
- * strategy.
- */
-public interface RetryStrategy extends PipeStrategy {}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
similarity index 98%
rename from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
rename to 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
index 2ba9fffa612..a70151a97fe 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.customizer;
+package org.apache.iotdb.pipe.api.customizer.parameter;
 
 import org.apache.iotdb.pipe.api.exception.PipeAttributeNotProvidedException;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
similarity index 94%
rename from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java
rename to 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 1dd852d9b36..cfea33aa3b9 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.customizer;
+package org.apache.iotdb.pipe.api.customizer.parameter;
 
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.PipeProcessor;
-import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
 
 import java.util.Map;
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index d1a95bb45ba..8bf376138e3 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.plugin.builtin;
 
 import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnectorV1_1;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
 
@@ -34,7 +35,8 @@ public enum BuiltinPipePlugin {
 
   // connectors
   DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class),
-  IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class);
+  IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class),
+  IOTDB_SYNC_CONNECTOR_V_1_1("iotdb_sync_connector_v1.1", 
IoTDBSyncConnectorV1_1.class),
   ;
 
   private final String pipePluginName;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
index 9902cce4ca0..8d3f0276126 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.commons.pipe.plugin.builtin.collector;
 
 import org.apache.iotdb.pipe.api.PipeCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 /**
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
index 2522fdc66f6..0e18d1ffea9 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
 import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java
similarity index 81%
copy from 
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
copy to 
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java
index 82ddd05ba3c..d994b096353 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java
@@ -20,20 +20,20 @@
 package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
 import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 /**
- * This class is a placeholder and should not be initialized. It represents 
the IoTDB Thrift
- * connector. There is a real implementation in the server module but cannot 
be imported here. The
- * pipe agent in the server module will replace this class with the real 
implementation when
- * initializing the IoTDB Thrift connector.
+ * This class is a placeholder and should not be initialized. It represents 
the IoTDB Sync connector
+ * (for IoTDB v1.1). There is a real implementation in the server module but 
cannot be imported
+ * here. The pipe agent in the server module will replace this class with the 
real implementation
+ * when initializing the IoTDB Sync connector.
  */
-public class IoTDBThriftConnector implements PipeConnector {
+public class IoTDBSyncConnectorV1_1 implements PipeConnector {
 
   @Override
   public void validate(PipeParameterValidator validator) {
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index 82ddd05ba3c..315e1347a6d 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
 import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index d083783a716..511414dde73 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -21,9 +21,9 @@ package 
org.apache.iotdb.commons.pipe.plugin.builtin.processor;
 
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index df8be6538a2..09a43c4ef76 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.commons.pipe.task.meta;
 
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 454cf5c1788..9266bdf9a22 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -25,14 +25,14 @@ import 
org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
 import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
 import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
-import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
-import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
index bb247981bf6..7916e1d92fb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.collector;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import 
org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionCollector;
 import 
org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionTsFileCollector;
@@ -29,12 +28,11 @@ import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionFakeCol
 import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionLogCollector;
 import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionTsFileCollector;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
-import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.pipe.api.PipeCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
@@ -44,12 +42,12 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_ENABLE;
-import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE;
-import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_FILE;
-import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_HYBRID;
-import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_LOG;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_ENABLE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_FILE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_HYBRID;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_LOG;
 
 public class IoTDBDataRegionCollector implements PipeCollector {
 
@@ -57,31 +55,17 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
 
   private final AtomicBoolean hasBeenStarted;
 
-  private final PipeTaskMeta pipeTaskMeta;
-  private final long creationTime;
-  private final UnboundedBlockingPendingQueue<Event> collectorPendingQueue;
-
-  // TODO: support pattern in historical collector
   private PipeHistoricalDataRegionCollector historicalCollector;
   private PipeRealtimeDataRegionCollector realtimeCollector;
 
   private int dataRegionId;
 
-  public IoTDBDataRegionCollector(
-      PipeTaskMeta pipeTaskMeta,
-      long creationTime,
-      UnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
+  public IoTDBDataRegionCollector() {
     this.hasBeenStarted = new AtomicBoolean(false);
-
-    this.pipeTaskMeta = pipeTaskMeta;
-    this.creationTime = creationTime;
-    this.collectorPendingQueue = collectorPendingQueue;
   }
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
-
     // validate collector.history.enable and collector.realtime.enable
     validator
         .validateAttributeValueRange(
@@ -115,48 +99,34 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
 
   private void constructHistoricalCollector(PipeParameters parameters) {
     // enable historical collector by default
-    historicalCollector =
-        parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true)
-            ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta, 
Long.MIN_VALUE)
-            // We define the realtime data as the data generated after the 
creation time
-            // of the pipe from user's perspective. But we still need to use
-            // PipeHistoricalDataRegionCollector to collect the realtime data 
generated between the
-            // creation time of the pipe and the time when the pipe starts, 
because those data
-            // can not be listened by PipeRealtimeDataRegionCollector, and 
should be collected by
-            // PipeHistoricalDataRegionCollector from implementation 
perspective.
-            : new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta, 
creationTime);
+    historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
   }
 
   private void constructRealtimeCollector(PipeParameters parameters) {
     // enable realtime collector by default
     if (!parameters.getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) {
-      realtimeCollector = new 
PipeRealtimeDataRegionFakeCollector(pipeTaskMeta);
+      realtimeCollector = new PipeRealtimeDataRegionFakeCollector();
       return;
     }
 
     // use hybrid mode by default
     if (!parameters.hasAttribute(COLLECTOR_REALTIME_MODE)) {
-      realtimeCollector =
-          new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, 
collectorPendingQueue);
+      realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
       return;
     }
 
     switch (parameters.getString(COLLECTOR_REALTIME_MODE)) {
       case COLLECTOR_REALTIME_MODE_FILE:
-        realtimeCollector =
-            new PipeRealtimeDataRegionTsFileCollector(pipeTaskMeta, 
collectorPendingQueue);
+        realtimeCollector = new PipeRealtimeDataRegionTsFileCollector();
         break;
       case COLLECTOR_REALTIME_MODE_LOG:
-        realtimeCollector =
-            new PipeRealtimeDataRegionLogCollector(pipeTaskMeta, 
collectorPendingQueue);
+        realtimeCollector = new PipeRealtimeDataRegionLogCollector();
         break;
       case COLLECTOR_REALTIME_MODE_HYBRID:
-        realtimeCollector =
-            new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, 
collectorPendingQueue);
+        realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
         break;
       default:
-        realtimeCollector =
-            new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, 
collectorPendingQueue);
+        realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
         LOGGER.warn(
             String.format(
                 "Unsupported collector realtime mode: %s, create a hybrid 
collector.",
@@ -167,7 +137,7 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
   @Override
   public void customize(PipeParameters parameters, 
PipeCollectorRuntimeConfiguration configuration)
       throws Exception {
-    dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
+    dataRegionId = ((PipeTaskCollectorRuntimeEnvironment) 
configuration).getRegionId();
 
     historicalCollector.customize(parameters, configuration);
     realtimeCollector.customize(parameters, configuration);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index f39923fd67a..48b3a1c11ae 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -27,12 +27,12 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.utils.DateTimeUtils;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.slf4j.Logger;
@@ -44,51 +44,48 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
-import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
-import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
-import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.DATA_REGION_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_KEY;
 
 public class PipeHistoricalDataRegionTsFileCollector extends 
PipeHistoricalDataRegionCollector {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class);
 
-  private final PipeTaskMeta pipeTaskMeta;
-  private final ProgressIndex startIndex;
+  private PipeTaskMeta pipeTaskMeta;
+  private ProgressIndex startIndex;
 
   private int dataRegionId;
 
   private String pattern;
 
-  private final long historicalDataCollectionTimeLowerBound; // arrival time
   private long historicalDataCollectionStartTime; // event time
   private long historicalDataCollectionEndTime; // event time
 
-  private Queue<PipeTsFileInsertionEvent> pendingQueue;
+  private long historicalDataCollectionTimeLowerBound; // arrival time
 
-  public PipeHistoricalDataRegionTsFileCollector(
-      PipeTaskMeta pipeTaskMeta, long historicalDataCollectionTimeLowerBound) {
-    this.pipeTaskMeta = pipeTaskMeta;
-    this.startIndex = pipeTaskMeta.getProgressIndex();
+  private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
-    this.historicalDataCollectionTimeLowerBound = 
historicalDataCollectionTimeLowerBound;
-  }
+  public PipeHistoricalDataRegionTsFileCollector() {}
 
   @Override
-  public void validate(PipeParameterValidator validator) throws Exception {
-    validator.validateRequiredAttribute(DATA_REGION_KEY);
-  }
+  public void validate(PipeParameterValidator validator) {}
 
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration) {
-    dataRegionId = parameters.getInt(DATA_REGION_KEY);
+    final PipeTaskCollectorRuntimeEnvironment environment =
+        (PipeTaskCollectorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
 
-    pattern =
-        parameters.getStringOrDefault(
-            PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
-            PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
+    pipeTaskMeta = environment.getPipeTaskMeta();
+    startIndex = environment.getPipeTaskMeta().getProgressIndex();
+
+    dataRegionId = environment.getRegionId();
+
+    pattern = parameters.getStringOrDefault(COLLECTOR_PATTERN_KEY, 
COLLECTOR_PATTERN_DEFAULT_VALUE);
 
     // user may set the COLLECTOR_HISTORY_START_TIME and 
COLLECTOR_HISTORY_END_TIME without
     // enabling the historical data collection, which may affect the realtime 
data collection.
@@ -105,6 +102,18 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                 parameters.getString(COLLECTOR_HISTORY_END_TIME), 
ZoneId.systemDefault())
             : Long.MAX_VALUE;
 
+    // enable historical collector by default
+    historicalDataCollectionTimeLowerBound =
+        parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true)
+            ? Long.MIN_VALUE
+            // We define the realtime data as the data generated after the 
creation time
+            // of the pipe from user's perspective. But we still need to use
+            // PipeHistoricalDataRegionCollector to collect the realtime data 
generated between the
+            // creation time of the pipe and the time when the pipe starts, 
because those data
+            // can not be listened by PipeRealtimeDataRegionCollector, and 
should be collected by
+            // PipeHistoricalDataRegionCollector from implementation 
perspective.
+            : environment.getCreationTime();
+
     // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the 
realtime only mode.
     // realtime only mode -> (historicalDataCollectionTimeLowerBound != 
Long.MIN_VALUE)
     //
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
index 658da872795..ab9be42b062 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -21,28 +21,24 @@ package org.apache.iotdb.db.pipe.collector.realtime;
 
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.pipe.api.PipeCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 public abstract class PipeRealtimeDataRegionCollector implements PipeCollector 
{
 
-  protected final PipeTaskMeta pipeTaskMeta;
-
   protected String pattern;
   protected String dataRegionId;
+  protected PipeTaskMeta pipeTaskMeta;
 
-  public PipeRealtimeDataRegionCollector(PipeTaskMeta pipeTaskMeta) {
-    this.pipeTaskMeta = pipeTaskMeta;
-  }
+  public PipeRealtimeDataRegionCollector() {}
 
   @Override
-  public void validate(PipeParameterValidator validator) throws Exception {
-    validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
-  }
+  public void validate(PipeParameterValidator validator) throws Exception {}
 
   @Override
   public void customize(PipeParameters parameters, 
PipeCollectorRuntimeConfiguration configuration)
@@ -51,7 +47,11 @@ public abstract class PipeRealtimeDataRegionCollector 
implements PipeCollector {
         parameters.getStringOrDefault(
             PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
             PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
-    dataRegionId = parameters.getString(PipeCollectorConstant.DATA_REGION_KEY);
+
+    final PipeTaskCollectorRuntimeEnvironment environment =
+        (PipeTaskCollectorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
+    dataRegionId = String.valueOf(environment.getRegionId());
+    pipeTaskMeta = environment.getPipeTaskMeta();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
index bd0913132b5..fbc28c4dbca 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
@@ -19,19 +19,13 @@
 
 package org.apache.iotdb.db.pipe.collector.realtime;
 
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 public class PipeRealtimeDataRegionFakeCollector extends 
PipeRealtimeDataRegionCollector {
-
-  public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) {
-    super(pipeTaskMeta);
-  }
-
   @Override
   public void validate(PipeParameterValidator validator) {}
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 50349097d50..1551fc2a582 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.collector.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
@@ -33,7 +32,6 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-// TODO: make this collector as a builtin pipe plugin. register it in 
BuiltinPipePlugin.
 public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegionCollector {
 
   private static final Logger LOGGER =
@@ -43,10 +41,8 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
-  public PipeRealtimeDataRegionHybridCollector(
-      PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> 
pendingQueue) {
-    super(pipeTaskMeta);
-    this.pendingQueue = pendingQueue;
+  public PipeRealtimeDataRegionHybridCollector() {
+    this.pendingQueue = new UnboundedBlockingPendingQueue<>();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index 845637b5f85..42da6a5d6fc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.collector.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
@@ -40,10 +39,8 @@ public class PipeRealtimeDataRegionLogCollector extends 
PipeRealtimeDataRegionCo
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
-  public PipeRealtimeDataRegionLogCollector(
-      PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> 
pendingQueue) {
-    super(pipeTaskMeta);
-    this.pendingQueue = pendingQueue;
+  public PipeRealtimeDataRegionLogCollector() {
+    this.pendingQueue = new UnboundedBlockingPendingQueue<>();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index da0c1fb90c3..cd56caae708 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.collector.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
@@ -40,10 +39,8 @@ public class PipeRealtimeDataRegionTsFileCollector extends 
PipeRealtimeDataRegio
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
-  public PipeRealtimeDataRegionTsFileCollector(
-      PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> 
pendingQueue) {
-    super(pipeTaskMeta);
-    this.pendingQueue = pendingQueue;
+  public PipeRealtimeDataRegionTsFileCollector() {
+    this.pendingQueue = new UnboundedBlockingPendingQueue<>();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
similarity index 94%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
index 105d45f8484..a98bac82bdd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.config.constant;
 
 public class PipeCollectorConstant {
 
@@ -26,8 +26,6 @@ public class PipeCollectorConstant {
   public static final String COLLECTOR_PATTERN_KEY = "collector.pattern";
   public static final String COLLECTOR_PATTERN_DEFAULT_VALUE = "root";
 
-  public static final String DATA_REGION_KEY = "collector.data-region";
-
   public static final String COLLECTOR_HISTORY_ENABLE_KEY = 
"collector.history.enable";
   public static final String COLLECTOR_HISTORY_START_TIME = 
"collector.history.start-time";
   public static final String COLLECTOR_HISTORY_END_TIME = 
"collector.history.end-time";
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
similarity index 76%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 7b88c56ef20..7c92206de81 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.config.constant;
 
 public class PipeConnectorConstant {
 
@@ -26,6 +26,11 @@ public class PipeConnectorConstant {
   public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
   public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
 
+  public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
+  public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
+  public static final String CONNECTOR_IOTDB_PASSWORD_KEY = 
"connector.password";
+  public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
+
   private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
similarity index 95%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
index 1af34f3ef31..8caa87c03ec 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.config.constant;
 
 public class PipeProcessorConstant {
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java
new file mode 100644
index 00000000000..29810c46960
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.config.plugin.configuraion;
+
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
+
+public class PipeTaskRuntimeConfiguration
+    implements PipeCollectorRuntimeConfiguration,
+        PipeProcessorRuntimeConfiguration,
+        PipeConnectorRuntimeConfiguration {
+
+  private final PipeRuntimeEnvironment environment;
+
+  public PipeTaskRuntimeConfiguration(PipeRuntimeEnvironment environment) {
+    this.environment = environment;
+  }
+
+  @Override
+  public PipeRuntimeEnvironment getRuntimeEnvironment() {
+    return environment;
+  }
+}
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java
similarity index 56%
rename from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java
index a0bbd230338..530c977b9eb 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java
@@ -17,19 +17,28 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.customizer.processor;
+package org.apache.iotdb.db.pipe.config.plugin.env;
 
-import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 
-/**
- * Used in {@link PipeProcessor#customize(PipeParameters, 
PipeProcessorRuntimeConfiguration)} to
- * customize the runtime behavior of the PipeProcessor.
- */
-public class PipeProcessorRuntimeConfiguration implements 
PipeRuntimeConfiguration {
+public class PipeTaskCollectorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
+
+  private final int regionId;
+
+  private final PipeTaskMeta pipeTaskMeta;
+
+  public PipeTaskCollectorRuntimeEnvironment(
+      String pipeName, long creationTime, int regionId, PipeTaskMeta 
pipeTaskMeta) {
+    super(pipeName, creationTime);
+    this.regionId = regionId;
+    this.pipeTaskMeta = pipeTaskMeta;
+  }
+
+  public int getRegionId() {
+    return regionId;
+  }
 
-  @Override
-  public void check() throws PipeException {}
+  public PipeTaskMeta getPipeTaskMeta() {
+    return pipeTaskMeta;
+  }
 }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
similarity index 59%
rename from 
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
index e99d21be71f..a935dcf2dac 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
@@ -17,14 +17,27 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.customizer.collector;
+package org.apache.iotdb.db.pipe.config.plugin.env;
 
-import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.exception.PipeException;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
 
-// TODO: complete this class
-public class PipeCollectorRuntimeConfiguration implements 
PipeRuntimeConfiguration {
+public class PipeTaskRuntimeEnvironment implements PipeRuntimeEnvironment {
+
+  private final String pipeName;
+  private final long creationTime;
+
+  public PipeTaskRuntimeEnvironment(String pipeName, long creationTime) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+  }
+
+  @Override
+  public String getPipeName() {
+    return pipeName;
+  }
 
   @Override
-  public void check() throws PipeException {}
+  public long getCreationTime() {
+    return creationTime;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
new file mode 100644
index 00000000000..3099cf008cc
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.lagacy;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
+
+public class IoTDBSyncConnectorImplV1_1 implements PipeConnector {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncConnectorImplV1_1.class);
+
+  private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
+  public static final String IOTDB_SYNC_CONNECTOR_VERSION = "1.1";
+
+  private String ipAddress;
+  private int port;
+
+  private String user;
+  private String password;
+
+  private String pipeName;
+  private Long creationTime;
+
+  private IoTDBThriftConnectorClient client;
+
+  private static SessionPool sessionPool;
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    validator
+        .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY)
+        .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY);
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
+      throws Exception {
+    this.ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY);
+    this.port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY);
+
+    this.user =
+        parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, 
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+    this.password =
+        parameters.getStringOrDefault(
+            CONNECTOR_IOTDB_PASSWORD_KEY, 
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+
+    pipeName = configuration.getRuntimeEnvironment().getPipeName();
+    creationTime = configuration.getRuntimeEnvironment().getCreationTime();
+  }
+
+  @Override
+  public void handshake() throws Exception {
+    close();
+
+    client =
+        new IoTDBThriftConnectorClient(
+            new ThriftClientProperty.Builder()
+                
.setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
+                
.setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
+                .build(),
+            ipAddress,
+            port);
+
+    try {
+      final TSyncIdentityInfo identityInfo =
+          new TSyncIdentityInfo(
+              pipeName, creationTime, IOTDB_SYNC_CONNECTOR_VERSION, 
IoTDBConstant.PATH_ROOT);
+      final TSStatus status = client.handshake(identityInfo);
+      if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        String errorMsg =
+            String.format(
+                "The receiver %s:%s rejected the pipe task because %s",
+                ipAddress, port, status.message);
+        LOGGER.warn(errorMsg);
+        throw new PipeRuntimeCriticalException(errorMsg);
+      }
+    } catch (TException e) {
+      LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress, 
port), e);
+      throw new PipeConnectionException(e.getMessage(), e);
+    }
+
+    sessionPool =
+        new SessionPool.Builder()
+            .host(ipAddress)
+            .port(port)
+            .user(user)
+            .password(password)
+            .maxSize(1)
+            .build();
+  }
+
+  @Override
+  public void heartbeat() throws Exception {}
+
+  @Override
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    try {
+      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+        doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+      } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+        doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
+      } else {
+        throw new NotImplementedException(
+            "IoTDBSyncConnectorV1_1 only support PipeInsertNodeInsertionEvent 
and PipeTabletInsertionEvent.");
+      }
+    } catch (TException e) {
+      LOGGER.error(
+          "Network error when transfer tablet insertion event: {}.", 
tabletInsertionEvent, e);
+      // the connection may be broken, try to reconnect by catching 
PipeConnectionException
+      throw new PipeConnectionException(
+          String.format(
+              "Network error when transfer tablet insertion event, because 
%s.", e.getMessage()),
+          e);
+    }
+  }
+
+  private void doTransfer(PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeInsertionEvent)
+      throws IoTDBConnectionException, StatementExecutionException {
+    sessionPool.insertTablet(pipeInsertNodeInsertionEvent.convertToTablet());
+  }
+
+  private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
+      throws PipeException, TException, IoTDBConnectionException, 
StatementExecutionException {
+    sessionPool.insertTablet(pipeTabletInsertionEvent.convertToTablet());
+  }
+
+  @Override
+  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
+    if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+      throw new NotImplementedException(
+          "IoTDBSyncConnectorV1_1 only support PipeTsFileInsertionEvent.");
+    }
+
+    try {
+      doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
+    } catch (TException e) {
+      LOGGER.error(
+          "Network error when transfer tsFile insertion event: {}.", 
tsFileInsertionEvent, e);
+      // The connection may be broken, try to reconnect by catching 
PipeConnectionException
+      throw new PipeConnectionException("Network error when transfer tsFile 
insertion event.", e);
+    }
+  }
+
+  private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
+      throws PipeException, TException, InterruptedException, IOException {
+    pipeTsFileInsertionEvent.waitForTsFileClose();
+
+    final File tsFile = pipeTsFileInsertionEvent.getTsFile();
+    transportSingleFilePieceByPiece(tsFile);
+    client.sendPipeData(ByteBuffer.wrap(new TsFilePipeData("", 
tsFile.getName(), -1).serialize()));
+  }
+
+  private void transportSingleFilePieceByPiece(File file) throws IOException {
+    // Cut the file into pieces to send
+    long position = 0;
+
+    // Try small piece to rebase the file position.
+    final byte[] buffer = new 
byte[PipeConfig.getInstance().getPipeConnectorReadFileBufferSize()];
+    try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
+      while (true) {
+        final int dataLength = randomAccessFile.read(buffer);
+        if (dataLength == -1) {
+          break;
+        }
+
+        final ByteBuffer buffToSend = ByteBuffer.wrap(buffer, 0, dataLength);
+        final TSyncTransportMetaInfo metaInfo =
+            new TSyncTransportMetaInfo(file.getName(), position);
+
+        final TSStatus status = client.sendFile(metaInfo, buffToSend);
+
+        if ((status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode())) {
+          // Success
+          position += dataLength;
+        } else if (status.code == 
TSStatusCode.SYNC_FILE_REDIRECTION_ERROR.getStatusCode()) {
+          position = Long.parseLong(status.message);
+          randomAccessFile.seek(position);
+          LOGGER.info(
+              String.format("Redirect to position %s in transferring tsFile 
%s.", position, file));
+        } else if (status.code == 
TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) {
+          String errorMsg =
+              String.format("Network failed to receive tsFile %s, status: %s", 
file, status);
+          LOGGER.warn(errorMsg);
+          throw new PipeConnectionException(errorMsg);
+        }
+      }
+    } catch (TException e) {
+      LOGGER.error(String.format("Cannot send pipe data to receiver %s:%s.", 
ipAddress, port), e);
+      throw new PipeConnectionException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void transfer(Event event) throws Exception {
+    LOGGER.warn("IoTDBSyncConnectorV1_1 does not support transfer generic 
event: {}.", event);
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+    if (sessionPool != null) {
+      sessionPool.close();
+      sessionPool = null;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index 04719ad2f44..3e1e0db312c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
 import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient;
 import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
@@ -38,9 +37,9 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -59,6 +58,9 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.Arrays;
 
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+
 public class IoTDBThriftConnectorV1 implements PipeConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftConnectorV1.class);
@@ -76,15 +78,15 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     validator
-        
.validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY)
-        
.validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY);
+        .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY)
+        .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY);
   }
 
   @Override
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
-    this.ipAddress = 
parameters.getString(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY);
-    this.port = 
parameters.getInt(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY);
+    this.ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY);
+    this.port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY);
   }
 
   @Override
@@ -102,11 +104,16 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
             ipAddress,
             port);
 
-    final TPipeTransferResp resp =
-        client.pipeTransfer(
-            
PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision()));
-    if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(String.format("Handshake error, result status 
%s.", resp.status));
+    try {
+      final TPipeTransferResp resp =
+          client.pipeTransfer(
+              
PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision()));
+      if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new PipeException(String.format("Handshake error, result status 
%s.", resp.status));
+      }
+    } catch (TException e) {
+      LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress, 
port), e);
+      throw new PipeConnectionException(e.getMessage(), e);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 4b181f83549..27975236127 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.concurrent.atomic.AtomicInteger;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index a86842875d4..8a3f9ca8a77 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -139,6 +139,18 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     }
   }
 
+  public Tablet convertToTablet() {
+    try {
+      if (dataContainer == null) {
+        dataContainer = new TabletInsertionDataContainer(getInsertNode(), 
getPattern());
+      }
+      return dataContainer.convertToTablet();
+    } catch (Exception e) {
+      LOGGER.error("Convert to tablet error.", e);
+      throw new PipeException("Convert to tablet error.", e);
+    }
+  }
+
   /////////////////////////// Object ///////////////////////////
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 8629eb52ba4..c8bc2fd55ab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.event.common.tablet;
 
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
index c01751ed824..70d21e85b3a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
@@ -19,14 +19,14 @@
 
 package org.apache.iotdb.db.pipe.processor;
 
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 80eb6c37ffa..cadb4746984 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -5,9 +5,9 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -26,7 +26,8 @@ public class PipeWALResourceManager implements AutoCloseable {
   private final ScheduledFuture<?> ttlCheckerFuture;
 
   public PipeWALResourceManager() {
-    memtableIdToPipeWALResourceMap = new HashMap<>();
+    // memtableIdToPipeWALResourceMap can be concurrently accessed by multiple 
threads
+    memtableIdToPipeWALResourceMap = new ConcurrentHashMap<>();
 
     memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
     for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 7ad1bffa2a4..5a9e40ccfb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -45,20 +45,26 @@ public class PipeTaskBuilder {
     // we first build the collector and connector, then build the processor.
     final PipeTaskCollectorStage collectorStage =
         new PipeTaskCollectorStage(
-            dataRegionId,
-            pipeTaskMeta,
+            pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
-            pipeStaticMeta.getCollectorParameters());
+            pipeStaticMeta.getCollectorParameters(),
+            dataRegionId,
+            pipeTaskMeta);
+
     final PipeTaskConnectorStage connectorStage =
-        new PipeTaskConnectorStage(pipeStaticMeta.getConnectorParameters());
+        new PipeTaskConnectorStage(
+            pipeStaticMeta.getPipeName(),
+            pipeStaticMeta.getCreationTime(),
+            pipeStaticMeta.getConnectorParameters());
 
     // the processor connects the collector and connector.
     final PipeTaskProcessorStage processorStage =
         new PipeTaskProcessorStage(
             pipeStaticMeta.getPipeName(),
+            pipeStaticMeta.getCreationTime(),
+            pipeStaticMeta.getProcessorParameters(),
             dataRegionId,
             collectorStage.getEventSupplier(),
-            pipeStaticMeta.getProcessorParameters(),
             connectorStage.getPipeConnectorPendingQueue());
 
     return new PipeTask(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index ca1ec8fda9b..524b94eb604 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -24,64 +24,48 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.collector.IoTDBDataRegionCollector;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
-import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-import java.util.HashMap;
-
 public class PipeTaskCollectorStage extends PipeTaskStage {
 
   private final PipeCollector pipeCollector;
 
   public PipeTaskCollectorStage(
-      TConsensusGroupId dataRegionId,
-      PipeTaskMeta pipeTaskMeta,
+      String pipeName,
       long creationTime,
-      PipeParameters collectorParameters) {
-    PipeParameters localizedCollectorParameters;
-
+      PipeParameters collectorParameters,
+      TConsensusGroupId dataRegionId,
+      PipeTaskMeta pipeTaskMeta) {
     // TODO: avoid if-else, use reflection to create collector all the time
-    if (collectorParameters
-        .getStringOrDefault(
-            PipeCollectorConstant.COLLECTOR_KEY,
-            BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())
-        .equals(BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())) {
-      // we want to pass data region id to collector, so we need to create a 
new collector
-      // parameters and put data region id into it. we can't put data region 
id into collector
-      // parameters directly, because the given collector parameters may be 
used by other pipe task.
-      localizedCollectorParameters =
-          new PipeParameters(new 
HashMap<>(collectorParameters.getAttribute()));
-      // set data region id to collector parameters, so that collector can get 
data region id inside
-      // collector
-      localizedCollectorParameters
-          .getAttribute()
-          .put(PipeCollectorConstant.DATA_REGION_KEY, 
String.valueOf(dataRegionId.getId()));
-
-      this.pipeCollector =
-          new IoTDBDataRegionCollector(
-              pipeTaskMeta, creationTime, new 
UnboundedBlockingPendingQueue<>());
-    } else {
-      localizedCollectorParameters = collectorParameters;
-
-      this.pipeCollector = 
PipeAgent.plugin().reflectCollector(localizedCollectorParameters);
-    }
+    this.pipeCollector =
+        collectorParameters
+                .getStringOrDefault(
+                    PipeCollectorConstant.COLLECTOR_KEY,
+                    BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())
+                .equals(BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())
+            ? new IoTDBDataRegionCollector()
+            : PipeAgent.plugin().reflectCollector(collectorParameters);
 
     // validate and customize should be called before createSubtask. this 
allows collector exposing
     // exceptions in advance.
     try {
       // 1. validate collector parameters
-      pipeCollector.validate(new 
PipeParameterValidator(localizedCollectorParameters));
+      pipeCollector.validate(new PipeParameterValidator(collectorParameters));
 
       // 2. customize collector
       final PipeCollectorRuntimeConfiguration runtimeConfiguration =
-          new PipeCollectorRuntimeConfiguration();
-      pipeCollector.customize(localizedCollectorParameters, 
runtimeConfiguration);
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskCollectorRuntimeEnvironment(
+                  pipeName, creationTime, dataRegionId.getId(), pipeTaskMeta));
+      pipeCollector.customize(collectorParameters, runtimeConfiguration);
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 6b96097fd6f..ff4e954e614 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,10 +19,11 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtaskManager;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
@@ -32,13 +33,16 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
 
   protected String connectorSubtaskId;
 
-  public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) {
+  public PipeTaskConnectorStage(
+      String pipeName, long creationTime, PipeParameters 
pipeConnectorParameters) {
     this.pipeConnectorParameters = pipeConnectorParameters;
+
     connectorSubtaskId =
         PipeConnectorSubtaskManager.instance()
             .register(
                 
PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
-                pipeConnectorParameters);
+                pipeConnectorParameters,
+                new PipeTaskRuntimeEnvironment(pipeName, creationTime));
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 02c6576ce94..60fbc3798df 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.pipe.task.stage;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
+import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.processor.PipeDoNothingProcessor;
@@ -31,37 +33,35 @@ import 
org.apache.iotdb.db.pipe.task.connection.EventSupplier;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 public class PipeTaskProcessorStage extends PipeTaskStage {
 
-  protected final PipeProcessorSubtaskExecutor executor =
+  private final PipeProcessorSubtaskExecutor executor =
       PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
 
-  protected final PipeParameters pipeProcessorParameters;
-  protected final PipeProcessor pipeProcessor;
-  protected final PipeProcessorSubtask pipeProcessorSubtask;
+  private final PipeProcessorSubtask pipeProcessorSubtask;
 
   /**
    * @param pipeName pipe name
+   * @param creationTime pipe creation time
+   * @param pipeProcessorParameters used to create pipe processor
    * @param dataRegionId data region id
    * @param pipeCollectorInputEventSupplier used to input events from pipe 
collector
-   * @param pipeProcessorParameters used to create pipe processor
    * @param pipeConnectorOutputPendingQueue used to output events to pipe 
connector
    */
   public PipeTaskProcessorStage(
       String pipeName,
+      long creationTime,
+      PipeParameters pipeProcessorParameters,
       TConsensusGroupId dataRegionId,
       EventSupplier pipeCollectorInputEventSupplier,
-      PipeParameters pipeProcessorParameters,
       BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
-    this.pipeProcessorParameters = pipeProcessorParameters;
-
-    pipeProcessor =
+    final PipeProcessor pipeProcessor =
         pipeProcessorParameters
                 .getStringOrDefault(
                     PipeProcessorConstant.PROCESSOR_KEY,
@@ -69,6 +69,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
                 
.equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
             ? new PipeDoNothingProcessor()
             : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
+
     // validate and customize should be called before createSubtask. this 
allows collector exposing
     // exceptions in advance.
     try {
@@ -77,7 +78,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
 
       // 2. customize processor
       final PipeProcessorRuntimeConfiguration runtimeConfiguration =
-          new PipeProcessorRuntimeConfiguration();
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskRuntimeEnvironment(pipeName, creationTime));
       pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration);
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
index a6e338aae2e..fadb6a0184f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
@@ -22,14 +22,16 @@ package org.apache.iotdb.db.pipe.task.subtask;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
+import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import org.apache.iotdb.db.pipe.connector.lagacy.IoTDBSyncConnectorImplV1_1;
 import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorV1;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
@@ -43,7 +45,9 @@ public class PipeConnectorSubtaskManager {
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
   public synchronized String register(
-      PipeConnectorSubtaskExecutor executor, PipeParameters 
pipeConnectorParameters) {
+      PipeConnectorSubtaskExecutor executor,
+      PipeParameters pipeConnectorParameters,
+      PipeRuntimeEnvironment pipeRuntimeEnvironment) {
     final String attributeSortedString =
         new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
 
@@ -51,20 +55,25 @@ public class PipeConnectorSubtaskManager {
       // TODO: construct all PipeConnector with the same reflection method, 
avoid using if-else
       // 1. construct, validate and customize PipeConnector, and then 
handshake (create connection)
       // with the target
-      final PipeConnector pipeConnector =
-          pipeConnectorParameters
-                  .getStringOrDefault(
-                      PipeConnectorConstant.CONNECTOR_KEY,
-                      
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
-                  
.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
-              ? new IoTDBThriftConnectorV1()
-              : PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
+      final String connectorKey =
+          pipeConnectorParameters.getStringOrDefault(
+              PipeConnectorConstant.CONNECTOR_KEY,
+              BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
+
+      PipeConnector pipeConnector;
+      if 
(connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()))
 {
+        pipeConnector = new IoTDBThriftConnectorV1();
+      } else if (connectorKey.equals(
+          BuiltinPipePlugin.IOTDB_SYNC_CONNECTOR_V_1_1.getPipePluginName())) {
+        pipeConnector = new IoTDBSyncConnectorImplV1_1();
+      } else {
+        pipeConnector = 
PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
+      }
+
       try {
         pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
-        final PipeConnectorRuntimeConfiguration runtimeConfiguration =
-            new PipeConnectorRuntimeConfiguration();
-        pipeConnector.customize(pipeConnectorParameters, runtimeConfiguration);
-        // TODO: use runtimeConfiguration to configure PipeConnector
+        pipeConnector.customize(
+            pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
         pipeConnector.handshake();
       } catch (Exception e) {
         throw new PipeException(
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
index ef8e2e67e3a..e0977828d75 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
@@ -19,12 +19,13 @@
 
 package org.apache.iotdb.db.pipe.collector;
 
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
 import 
org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
@@ -64,38 +65,34 @@ public class CachedSchemaPatternMatcherTest {
 
   @Test
   public void testCachedMatcher() throws Exception {
-    PipeRealtimeDataRegionCollector databaseCollector =
-        new PipeRealtimeDataRegionFakeCollector(null);
+    PipeRealtimeDataRegionCollector databaseCollector = new 
PipeRealtimeDataRegionFakeCollector();
     databaseCollector.customize(
         new PipeParameters(
             new HashMap<String, String>() {
               {
                 put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root");
-                put(PipeCollectorConstant.DATA_REGION_KEY, "1");
               }
             }),
-        null);
+        new PipeTaskRuntimeConfiguration(new 
PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
     collectorList.add(databaseCollector);
 
     int deviceCollectorNum = 10;
     int seriesCollectorNum = 10;
     for (int i = 0; i < deviceCollectorNum; i++) {
-      PipeRealtimeDataRegionCollector deviceCollector =
-          new PipeRealtimeDataRegionFakeCollector(null);
+      PipeRealtimeDataRegionCollector deviceCollector = new 
PipeRealtimeDataRegionFakeCollector();
       int finalI1 = i;
       deviceCollector.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root." + 
finalI1);
-                  put(PipeCollectorConstant.DATA_REGION_KEY, "1");
                 }
               }),
-          null);
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
       collectorList.add(deviceCollector);
       for (int j = 0; j < seriesCollectorNum; j++) {
-        PipeRealtimeDataRegionCollector seriesCollector =
-            new PipeRealtimeDataRegionFakeCollector(null);
+        PipeRealtimeDataRegionCollector seriesCollector = new 
PipeRealtimeDataRegionFakeCollector();
         int finalI = i;
         int finalJ = j;
         seriesCollector.customize(
@@ -105,10 +102,10 @@ public class CachedSchemaPatternMatcherTest {
                     put(
                         PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
                         "root." + finalI + "." + finalJ);
-                    put(PipeCollectorConstant.DATA_REGION_KEY, "1");
                   }
                 }),
-            null);
+            new PipeTaskRuntimeConfiguration(
+                new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
         collectorList.add(seriesCollector);
       }
     }
@@ -152,9 +149,7 @@ public class CachedSchemaPatternMatcherTest {
 
   public static class PipeRealtimeDataRegionFakeCollector extends 
PipeRealtimeDataRegionCollector {
 
-    public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) {
-      super(pipeTaskMeta);
-    }
+    public PipeRealtimeDataRegionFakeCollector() {}
 
     @Override
     public Event supply() {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
index 202c1527bd2..1b12e5822f9 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
@@ -28,10 +28,11 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
 import 
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import 
org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
-import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -98,55 +99,58 @@ public class PipeRealtimeCollectTest {
   }
 
   @Test
-  public void testRealtimeCollectProcess() throws ExecutionException, 
InterruptedException {
+  public void testRealtimeCollectProcess() {
     // set up realtime collector
 
     try (PipeRealtimeDataRegionHybridCollector collector1 =
-            new PipeRealtimeDataRegionHybridCollector(null, new 
UnboundedBlockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector();
         PipeRealtimeDataRegionHybridCollector collector2 =
-            new PipeRealtimeDataRegionHybridCollector(null, new 
UnboundedBlockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector();
         PipeRealtimeDataRegionHybridCollector collector3 =
-            new PipeRealtimeDataRegionHybridCollector(null, new 
UnboundedBlockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector();
         PipeRealtimeDataRegionHybridCollector collector4 =
-            new PipeRealtimeDataRegionHybridCollector(
-                null, new UnboundedBlockingPendingQueue<>())) {
+            new PipeRealtimeDataRegionHybridCollector()) {
 
       collector1.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1);
-                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
                 }
               }),
-          null);
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskCollectorRuntimeEnvironment(
+                  "1", 1, Integer.parseInt(dataRegion1), null)));
       collector2.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2);
-                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
                 }
               }),
-          null);
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskCollectorRuntimeEnvironment(
+                  "1", 1, Integer.parseInt(dataRegion1), null)));
       collector3.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1);
-                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
                 }
               }),
-          null);
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskCollectorRuntimeEnvironment(
+                  "1", 1, Integer.parseInt(dataRegion2), null)));
       collector4.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2);
-                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
                 }
               }),
-          null);
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskCollectorRuntimeEnvironment(
+                  "1", 1, Integer.parseInt(dataRegion2), null)));
 
       PipeRealtimeDataRegionCollector[] collectors =
           new PipeRealtimeDataRegionCollector[] {collector1, collector2, 
collector3, collector4};

Reply via email to