This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ce843c2c06 [IOTDB-5779] PipeConnector reuse strategy based on 
reference count mechanism (#9629)
ce843c2c06 is described below

commit ce843c2c060db29fcd8c8953b7a562879d233134
Author: Itami Sho <[email protected]>
AuthorDate: Tue Apr 25 09:54:40 2023 +0800

    [IOTDB-5779] PipeConnector reuse strategy based on reference count 
mechanism (#9629)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../commons/pipe/task/meta/PipeStaticMeta.java     |  24 +++--
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  |   6 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      |  34 +++----
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  17 +---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  19 ++--
 .../db/pipe/agent/task/PipeTaskRegionAgent.java    |  22 -----
 .../core/connector/PipeConnectorContainer.java     |  22 -----
 .../pipe/core/connector/PipeConnectorManager.java  |  22 -----
 .../connector/PipeConnectorSubtaskLifeCycle.java   |  98 ++++++++++++++++++
 .../connector/PipeConnectorSubtaskManager.java     | 107 ++++++++++++++++++++
 .../execution/executor/PipeSubtaskExecutor.java    |  12 ++-
 .../pipe/task/callable/PipeConnectorSubtask.java   |  37 -------
 .../pipe/task/callable/PipeProcessorSubtask.java   |  37 -------
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  41 +++++++-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java | 109 ++++++++++++++++++++-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  38 ++++++-
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |  34 ++-----
 .../task/{callable => subtask}/DecoratingLock.java |   2 +-
 .../{callable => subtask}/PipeAssignerSubtask.java |   7 +-
 .../subtask/PipeConnectorSubtask.java}             |  44 ++++++---
 .../subtask/PipeProcessorSubtask.java}             |  38 ++++---
 .../task/{callable => subtask}/PipeSubtask.java    |   8 +-
 .../executor/PipeAssignerSubtaskExecutorTest.java  |   2 +-
 .../executor/PipeConnectorSubtaskExecutorTest.java |   6 +-
 .../executor/PipeProcessorSubtaskExecutorTest.java |  12 ++-
 .../executor/PipeSubtaskExecutorTest.java          |   2 +-
 26 files changed, 528 insertions(+), 272 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index f0e2c77e5b..3b9c94c94c 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.commons.pipe.task.meta;
 
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -37,6 +38,10 @@ public class PipeStaticMeta {
   private Map<String, String> processorAttributes = new HashMap<>();
   private Map<String, String> connectorAttributes = new HashMap<>();
 
+  private PipeParameters collectorParameters;
+  private PipeParameters processorParameters;
+  private PipeParameters connectorParameters;
+
   private PipeStaticMeta() {}
 
   public PipeStaticMeta(
@@ -50,6 +55,9 @@ public class PipeStaticMeta {
     this.collectorAttributes = collectorAttributes;
     this.processorAttributes = processorAttributes;
     this.connectorAttributes = connectorAttributes;
+    collectorParameters = new PipeParameters(collectorAttributes);
+    processorParameters = new PipeParameters(processorAttributes);
+    connectorParameters = new PipeParameters(connectorAttributes);
   }
 
   public String getPipeName() {
@@ -60,16 +68,16 @@ public class PipeStaticMeta {
     return createTime;
   }
 
-  public Map<String, String> getCollectorAttributes() {
-    return collectorAttributes;
+  public PipeParameters getCollectorParameters() {
+    return collectorParameters;
   }
 
-  public Map<String, String> getProcessorAttributes() {
-    return processorAttributes;
+  public PipeParameters getProcessorParameters() {
+    return processorParameters;
   }
 
-  public Map<String, String> getConnectorAttributes() {
-    return connectorAttributes;
+  public PipeParameters getConnectorParameters() {
+    return connectorParameters;
   }
 
   public ByteBuffer serialize() throws IOException {
@@ -127,6 +135,10 @@ public class PipeStaticMeta {
           ReadWriteIOUtils.readString(byteBuffer), 
ReadWriteIOUtils.readString(byteBuffer));
     }
 
+    pipeStaticMeta.collectorParameters = new 
PipeParameters(pipeStaticMeta.collectorAttributes);
+    pipeStaticMeta.processorParameters = new 
PipeParameters(pipeStaticMeta.processorAttributes);
+    pipeStaticMeta.connectorParameters = new 
PipeParameters(pipeStaticMeta.connectorAttributes);
+
     return pipeStaticMeta;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index a38d86f974..f490a171f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -32,9 +32,9 @@ public class PipeAgent {
 
   /** Private constructor to prevent users from creating a new instance. */
   private PipeAgent() {
-    pipePluginAgent = PipePluginAgent.setupAndGetInstance();
-    pipeTaskAgent = PipeTaskAgent.setupAndGetInstance();
-    pipeRuntimeAgent = PipeRuntimeAgent.setupAndGetInstance();
+    pipePluginAgent = new PipePluginAgent();
+    pipeTaskAgent = new PipeTaskAgent();
+    pipeRuntimeAgent = new PipeRuntimeAgent();
   }
 
   /** The singleton holder of PipeAgent. */
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 1f5f70bd3d..e65495a7c3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -24,7 +24,10 @@ import 
org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
 import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
 import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
+import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 
 import org.slf4j.Logger;
@@ -43,6 +46,10 @@ public class PipePluginAgent {
 
   private final DataNodePipePluginMetaKeeper pipePluginMetaKeeper;
 
+  public PipePluginAgent() {
+    this.pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
+  }
+
   /////////////////////////////// Lock ///////////////////////////////
 
   public void acquireLock() {
@@ -181,7 +188,15 @@ public class PipePluginAgent {
     }
   }
 
-  public PipePlugin reflect(String pluginName) {
+  public PipeProcessor reflectProcessor(PipeParameters processorParameters) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  public PipeConnector reflectConnector(PipeParameters connectorParameters) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  private PipePlugin reflect(String pluginName) {
     PipePluginMeta information = 
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
     if (information == null) {
       String errorMessage =
@@ -207,21 +222,4 @@ public class PipePluginAgent {
       throw new RuntimeException(errorMessage);
     }
   }
-
-  /////////////////////////  Singleton Instance Holder  
/////////////////////////
-
-  private PipePluginAgent() {
-    this.pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
-  }
-
-  private static class PipePluginAgentServiceHolder {
-    private static PipePluginAgent instance = null;
-  }
-
-  public static PipePluginAgent setupAndGetInstance() {
-    if (PipePluginAgentServiceHolder.instance == null) {
-      PipePluginAgentServiceHolder.instance = new PipePluginAgent();
-    }
-    return PipePluginAgentServiceHolder.instance;
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 55e3b0c1ec..8c03766e45 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.agent.runtime;
 
-import org.apache.iotdb.db.pipe.task.callable.PipeSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,19 +35,4 @@ public class PipeRuntimeAgent {
         subtask.getTaskID(),
         subtask.getLastFailedCause());
   }
-
-  /////////////////////////  Singleton Instance Holder  
/////////////////////////
-
-  private PipeRuntimeAgent() {}
-
-  private static class PipeRuntimeAgentHolder {
-    private static PipeRuntimeAgent INSTANCE = null;
-  }
-
-  public static PipeRuntimeAgent setupAndGetInstance() {
-    if (PipeRuntimeAgentHolder.INSTANCE == null) {
-      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
-    }
-    return PipeRuntimeAgentHolder.INSTANCE;
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index dd8a6984f0..9fffa86706 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -19,20 +19,19 @@
 
 package org.apache.iotdb.db.pipe.agent.task;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 
-  /////////////////////////  Singleton Instance Holder  
/////////////////////////
+public class PipeTaskAgent {
 
-  private PipeTaskAgent() {}
+  private final PipeMetaKeeper pipeMetaKeeper;
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  public PipeTaskAgent() {
+    pipeMetaKeeper = new PipeMetaKeeper();
   }
 
-  public static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
+  // TODO: remove this method
+  public PipeMeta getPipeMeta(String pipeName) {
+    return pipeMetaKeeper.getPipeMeta(pipeName);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
deleted file mode 100644
index 7be285ef66..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.task;
-
-public class PipeTaskRegionAgent {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
deleted file mode 100644
index 8651ed3df1..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.connector;
-
-public class PipeConnectorContainer {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
deleted file mode 100644
index 881edd67f8..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.connector;
-
-public class PipeConnectorManager {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
new file mode 100644
index 0000000000..7e71ae2f7a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
+
+public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
+
+  private final PipeConnectorSubtaskExecutor executor;
+  private final PipeConnectorSubtask subtask;
+
+  private int runningTaskCount;
+  private int aliveTaskCount;
+
+  public PipeConnectorSubtaskLifeCycle(
+      PipeConnectorSubtaskExecutor executor, PipeConnectorSubtask subtask) {
+    this.executor = executor;
+    this.subtask = subtask;
+
+    runningTaskCount = 0;
+    aliveTaskCount = 0;
+  }
+
+  public PipeConnectorSubtask getSubtask() {
+    return subtask;
+  }
+
+  public synchronized void register() {
+    if (aliveTaskCount < 0) {
+      throw new IllegalStateException("aliveTaskCount < 0");
+    }
+    if (aliveTaskCount == 0) {
+      executor.register(subtask);
+      runningTaskCount = 0;
+    }
+    aliveTaskCount++;
+  }
+
+  /**
+   * @return true if the subtask is out of life cycle, indicating that the 
subtask should never be
+   *     used again
+   */
+  public synchronized boolean deregister() {
+    if (aliveTaskCount <= 0) {
+      throw new IllegalStateException("aliveTaskCount <= 0");
+    }
+    if (aliveTaskCount == 1) {
+      executor.deregister(subtask.getTaskID());
+      // this subtask is out of life cycle, should never be used again
+      return true;
+    }
+    aliveTaskCount--;
+    return false;
+  }
+
+  public synchronized void start() {
+    if (runningTaskCount < 0) {
+      throw new IllegalStateException("runningTaskCount < 0");
+    }
+    if (runningTaskCount == 0) {
+      executor.start(subtask.getTaskID());
+    }
+    runningTaskCount++;
+  }
+
+  public synchronized void stop() {
+    if (runningTaskCount <= 0) {
+      throw new IllegalStateException("runningTaskCount <= 0");
+    }
+    if (runningTaskCount == 1) {
+      executor.stop(subtask.getTaskID());
+    }
+    runningTaskCount--;
+  }
+
+  @Override
+  public synchronized void close() {
+    executor.deregister(subtask.getTaskID());
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
new file mode 100644
index 0000000000..a00cedf7b5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class PipeConnectorSubtaskManager {
+
+  private final Map<String, PipeConnectorSubtaskLifeCycle>
+      attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
+
+  public synchronized String register(
+      PipeConnectorSubtaskExecutor executor, PipeParameters 
connectorAttributes) {
+    final String attributeSortedString =
+        new TreeMap<>(connectorAttributes.getAttribute()).toString();
+
+    if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
+      final PipeConnector pipeConnector = 
PipeAgent.plugin().reflectConnector(connectorAttributes);
+      final PipeConnectorSubtask pipeConnectorSubtask =
+          new PipeConnectorSubtask(attributeSortedString, pipeConnector);
+      final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
+          new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask);
+      attributeSortedString2SubtaskLifeCycleMap.put(
+          attributeSortedString, pipeConnectorSubtaskLifeCycle);
+    }
+
+    
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).register();
+
+    return attributeSortedString;
+  }
+
+  public synchronized void deregister(String attributeSortedString) {
+    if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
+      throw new PipeException(
+          "Failed to deregister PipeConnectorSubtask. No such subtask: " + 
attributeSortedString);
+    }
+
+    if 
(attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister())
 {
+      attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
+    }
+  }
+
+  public synchronized void start(String attributeSortedString) {
+    if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
+      throw new PipeException(
+          "Failed to deregister PipeConnectorSubtask. No such subtask: " + 
attributeSortedString);
+    }
+
+    
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start();
+  }
+
+  public synchronized void stop(String attributeSortedString) {
+    if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
+      throw new PipeException(
+          "Failed to deregister PipeConnectorSubtask. No such subtask: " + 
attributeSortedString);
+    }
+
+    
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
+  }
+
+  public PipeConnectorSubtask getPipeConnectorSubtask(String 
attributeSortedString) {
+    if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
+      throw new PipeException(
+          "Failed to get PipeConnectorSubtask. No such subtask: " + 
attributeSortedString);
+    }
+
+    return 
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
+  }
+
+  /////////////////////////  Singleton Instance Holder  
/////////////////////////
+
+  private PipeConnectorSubtaskManager() {}
+
+  private static class PipeSubtaskManagerHolder {
+    private static final PipeConnectorSubtaskManager INSTANCE = new 
PipeConnectorSubtaskManager();
+  }
+
+  public static PipeConnectorSubtaskManager instance() {
+    return PipeSubtaskManagerHolder.INSTANCE;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 05956f0a90..bf4f8daf2e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.execution.executor;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.pipe.task.callable.PipeSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -99,7 +99,15 @@ public abstract class PipeSubtaskExecutor {
   public final void deregister(String subTaskID) {
     stop(subTaskID);
 
-    registeredIdSubtaskMapper.remove(subTaskID);
+    final PipeSubtask subtask = registeredIdSubtaskMapper.remove(subTaskID);
+
+    if (subtask != null) {
+      try {
+        subtask.close();
+      } catch (Exception e) {
+        LOGGER.error("Failed to close the subtask {}.", subTaskID, e);
+      }
+    }
   }
 
   @TestOnly
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeConnectorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeConnectorSubtask.java
deleted file mode 100644
index dd1ec6db5d..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeConnectorSubtask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.callable;
-
-import 
org.apache.iotdb.db.pipe.core.connector.PipeConnectorPluginRuntimeWrapper;
-
-public class PipeConnectorSubtask extends PipeSubtask {
-
-  private final PipeConnectorPluginRuntimeWrapper pipeConnector;
-
-  public PipeConnectorSubtask(String taskID, PipeConnectorPluginRuntimeWrapper 
pipeConnector) {
-    super(taskID);
-    this.pipeConnector = pipeConnector;
-  }
-
-  @Override
-  protected void executeForAWhile() {
-    pipeConnector.executeForAWhile();
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeProcessorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeProcessorSubtask.java
deleted file mode 100644
index b13dbc4c18..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeProcessorSubtask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.callable;
-
-import 
org.apache.iotdb.db.pipe.core.processor.PipeProcessorPluginRuntimeWrapper;
-
-public class PipeProcessorSubtask extends PipeSubtask {
-
-  private final PipeProcessorPluginRuntimeWrapper pipeProcessor;
-
-  public PipeProcessorSubtask(String taskID, PipeProcessorPluginRuntimeWrapper 
pipeProcessor) {
-    super(taskID);
-    this.pipeProcessor = pipeProcessor;
-  }
-
-  @Override
-  protected void executeForAWhile() {
-    pipeProcessor.executeForAWhile();
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 1962555dae..a496376023 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -20,12 +20,43 @@
 package org.apache.iotdb.db.pipe.task.stage;
 
 import org.apache.iotdb.db.pipe.execution.executor.PipeAssignerSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.callable.PipeAssignerSubtask;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.subtask.PipeAssignerSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class PipeTaskCollectorStage extends PipeTaskStage {
+public class PipeTaskCollectorStage implements PipeTaskStage {
 
-  protected PipeTaskCollectorStage(
-      PipeAssignerSubtaskExecutor executor, PipeAssignerSubtask subtask) {
-    super(executor, subtask);
+  private final PipeSubtaskExecutor executor;
+  private final PipeSubtask subtask;
+
+  PipeTaskCollectorStage(PipeAssignerSubtaskExecutor executor, 
PipeAssignerSubtask subtask) {
+    this.executor = executor;
+    this.subtask = subtask;
+  }
+
+  @Override
+  public void create() throws PipeException {
+    executor.register(subtask);
+  }
+
+  @Override
+  public void start() throws PipeException {
+    executor.start(subtask.getTaskID());
+  }
+
+  @Override
+  public void stop() throws PipeException {
+    executor.stop(subtask.getTaskID());
+  }
+
+  @Override
+  public void drop() throws PipeException {
+    executor.deregister(subtask.getTaskID());
+  }
+
+  @Override
+  public PipeSubtask getSubtask() {
+    return subtask;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 5b7f48e16b..0194dc5c8a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,13 +19,114 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.db.pipe.core.connector.PipeConnectorSubtaskManager;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.callable.PipeConnectorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class PipeTaskConnectorStage extends PipeTaskStage {
+public class PipeTaskConnectorStage implements PipeTaskStage {
+
+  protected final PipeConnectorSubtaskExecutor executor;
+  protected final PipeParameters connectorAttributes;
+
+  protected PipeStatus status = null;
+  protected String connectorSubtaskId = null;
+  protected boolean hasBeenExternallyStopped = false;
 
   protected PipeTaskConnectorStage(
-      PipeConnectorSubtaskExecutor executor, PipeConnectorSubtask subtask) {
-    super(executor, subtask);
+      PipeConnectorSubtaskExecutor executor, PipeParameters 
connectorAttributes) {
+    this.executor = executor;
+    this.connectorAttributes = connectorAttributes;
+  }
+
+  @Override
+  public synchronized void create() throws PipeException {
+    if (status != null) {
+      if (status == PipeStatus.RUNNING) {
+        throw new PipeException(
+            String.format("The PipeConnectorSubtask %s has been started", 
connectorSubtaskId));
+      }
+      if (status == PipeStatus.DROPPED) {
+        throw new PipeException(
+            String.format("The PipeConnectorSubtask %s has been dropped", 
connectorSubtaskId));
+      }
+      // status == PipeStatus.STOPPED
+      if (hasBeenExternallyStopped) {
+        throw new PipeException(
+            String.format(
+                "The PipeConnectorSubtask %s has been externally stopped", 
connectorSubtaskId));
+      }
+      // otherwise, do nothing to allow retry strategy
+      return;
+    }
+
+    // status == null, register the connector
+    connectorSubtaskId =
+        PipeConnectorSubtaskManager.instance().register(executor, 
connectorAttributes);
+    status = PipeStatus.STOPPED;
+  }
+
+  @Override
+  public synchronized void start() throws PipeException {
+    if (status == null) {
+      throw new PipeException(
+          String.format("The PipeConnectorSubtask %s has not been created", 
connectorSubtaskId));
+    }
+    if (status == PipeStatus.RUNNING) {
+      // do nothing to allow retry strategy
+      return;
+    }
+    if (status == PipeStatus.DROPPED) {
+      throw new PipeException(
+          String.format("The PipeConnectorSubtask %s has been dropped", 
connectorSubtaskId));
+    }
+
+    // status == PipeStatus.STOPPED, start the connector
+    PipeConnectorSubtaskManager.instance().start(connectorSubtaskId);
+    status = PipeStatus.RUNNING;
+  }
+
+  @Override
+  public synchronized void stop() throws PipeException {
+    if (status == null) {
+      throw new PipeException(
+          String.format("The PipeConnectorSubtask %s has not been created", 
connectorSubtaskId));
+    }
+    if (status == PipeStatus.STOPPED) {
+      // do nothing to allow retry strategy
+      return;
+    }
+    if (status == PipeStatus.DROPPED) {
+      throw new PipeException(
+          String.format("The PipeConnectorSubtask %s has been dropped", 
connectorSubtaskId));
+    }
+
+    // status == PipeStatus.RUNNING, stop the connector
+    PipeConnectorSubtaskManager.instance().stop(connectorSubtaskId);
+    status = PipeStatus.STOPPED;
+    hasBeenExternallyStopped = true;
+  }
+
+  @Override
+  public synchronized void drop() throws PipeException {
+    if (status == null) {
+      throw new PipeException(
+          String.format("The PipeConnectorSubtask %s has not been created", 
connectorSubtaskId));
+    }
+    if (status == PipeStatus.DROPPED) {
+      // do nothing to allow retry strategy
+      return;
+    }
+
+    // status == PipeStatus.RUNNING or PipeStatus.STOPPED, drop the connector
+    PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
+    status = PipeStatus.DROPPED;
+  }
+
+  @Override
+  public PipeSubtask getSubtask() {
+    return 
PipeConnectorSubtaskManager.instance().getPipeConnectorSubtask(connectorSubtaskId);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 6f2e058193..e7345be1e1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -20,12 +20,44 @@
 package org.apache.iotdb.db.pipe.task.stage;
 
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.callable.PipeProcessorSubtask;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class PipeTaskProcessorStage extends PipeTaskStage {
+public class PipeTaskProcessorStage implements PipeTaskStage {
+
+  protected final PipeSubtaskExecutor executor;
+  protected final PipeSubtask subtask;
 
   protected PipeTaskProcessorStage(
       PipeProcessorSubtaskExecutor executor, PipeProcessorSubtask subtask) {
-    super(executor, subtask);
+    this.executor = executor;
+    this.subtask = subtask;
+  }
+
+  @Override
+  public void create() throws PipeException {
+    executor.register(subtask);
+  }
+
+  @Override
+  public void start() throws PipeException {
+    executor.start(subtask.getTaskID());
+  }
+
+  @Override
+  public void stop() throws PipeException {
+    executor.stop(subtask.getTaskID());
+  }
+
+  @Override
+  public void drop() throws PipeException {
+    executor.deregister(subtask.getTaskID());
+  }
+
+  @Override
+  public PipeSubtask getSubtask() {
+    return subtask;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index 3cbe0b569b..5f57fbabcb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -19,62 +19,42 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.callable.PipeSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public abstract class PipeTaskStage {
-
-  protected final PipeSubtaskExecutor executor;
-  protected final PipeSubtask subtask;
-
-  protected PipeTaskStage(PipeSubtaskExecutor executor, PipeSubtask subtask) {
-    this.executor = executor;
-    this.subtask = subtask;
-  }
+public interface PipeTaskStage {
 
   /**
    * Create a pipe task stage.
    *
    * @throws PipeException if failed to create a pipe task stage.
    */
-  public final void create() throws PipeException {
-    executor.register(subtask);
-  }
-
+  void create() throws PipeException;
   /**
    * Start a pipe task stage.
    *
    * @throws PipeException if failed to start a pipe task stage.
    */
-  public final void start() throws PipeException {
-    executor.start(subtask.getTaskID());
-  }
+  void start() throws PipeException;
 
   /**
    * Stop a pipe task stage.
    *
    * @throws PipeException if failed to stop a pipe task stage.
    */
-  public final void stop() throws PipeException {
-    executor.stop(subtask.getTaskID());
-  }
+  void stop() throws PipeException;
 
   /**
    * Drop a pipe task stage.
    *
    * @throws PipeException if failed to drop a pipe task stage.
    */
-  public final void drop() throws PipeException {
-    executor.deregister(subtask.getTaskID());
-  }
+  void drop() throws PipeException;
 
   /**
    * Get the pipe subtask.
    *
    * @return the pipe subtask.
    */
-  public final PipeSubtask getSubtask() {
-    return subtask;
-  }
+  PipeSubtask getSubtask();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/DecoratingLock.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/DecoratingLock.java
similarity index 96%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/task/callable/DecoratingLock.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/DecoratingLock.java
index 54f562416b..4e62a566d7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/DecoratingLock.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/DecoratingLock.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.callable;
+package org.apache.iotdb.db.pipe.task.subtask;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeAssignerSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
similarity index 91%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeAssignerSubtask.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
index 64d421cf8c..39ffaefeeb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeAssignerSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.callable;
+package org.apache.iotdb.db.pipe.task.subtask;
 
 public class PipeAssignerSubtask extends PipeSubtask {
 
@@ -29,4 +29,9 @@ public class PipeAssignerSubtask extends PipeSubtask {
   protected void executeForAWhile() {
     // do nothing
   }
+
+  @Override
+  public void close() {
+    // TODO
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
similarity index 64%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 1981f6b87f..e6764cf699 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.connector;
+package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -29,29 +29,37 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 
-public class PipeConnectorPluginRuntimeWrapper {
+public class PipeConnectorSubtask extends PipeSubtask {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeConnectorPluginRuntimeWrapper.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConnectorSubtask.class);
 
-  private final Queue<Event> inputEventQueue;
+  // input
+  private final ArrayBlockingQueue<Event> pendingQueue;
+  // output
   private final PipeConnector pipeConnector;
 
-  public PipeConnectorPluginRuntimeWrapper(
-      Queue<Event> inputEventQueue, PipeConnector pipeConnector) {
-    this.inputEventQueue = inputEventQueue;
+  /** @param taskID connectorAttributeSortedString */
+  public PipeConnectorSubtask(String taskID, PipeConnector pipeConnector) {
+    super(taskID);
+    // TODO: make the size of the queue size reasonable and configurable
+    this.pendingQueue = new ArrayBlockingQueue<>(1024 * 1024);
     this.pipeConnector = pipeConnector;
   }
 
+  public ArrayBlockingQueue<Event> getInputPendingQueue() {
+    return pendingQueue;
+  }
+
   // TODO: for a while
-  public void executeForAWhile() {
-    if (inputEventQueue.isEmpty()) {
+  @Override
+  protected void executeForAWhile() {
+    if (pendingQueue.isEmpty()) {
       return;
     }
 
-    final Event event = inputEventQueue.poll();
+    final Event event = pendingQueue.poll();
 
     try {
       if (event instanceof TabletInsertionEvent) {
@@ -70,4 +78,16 @@ public class PipeConnectorPluginRuntimeWrapper {
           e);
     }
   }
+
+  @Override
+  public void close() {
+    try {
+      pipeConnector.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOGGER.info(
+          "Error occurred during closing PipeConnector, perhaps need to check 
whether the implementation of PipeConnector is correct according to the 
pipe-api description.",
+          e);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
similarity index 71%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index a5b56bc68e..aa8772f2a1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.processor;
+package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -30,32 +30,34 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 
-public class PipeProcessorPluginRuntimeWrapper {
+public class PipeProcessorSubtask extends PipeSubtask {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeProcessorPluginRuntimeWrapper.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeProcessorSubtask.class);
 
-  private final Queue<Event> inputEventQueue;
+  private final ArrayBlockingQueue<Event> pendingEventQueue;
   private final PipeProcessor pipeProcessor;
   private final EventCollector outputEventCollector;
 
-  public PipeProcessorPluginRuntimeWrapper(
-      Queue<Event> inputEventQueue,
+  public PipeProcessorSubtask(
+      String taskID,
+      ArrayBlockingQueue<Event> pendingEventQueue,
       PipeProcessor pipeProcessor,
       EventCollector outputEventCollector) {
-    this.inputEventQueue = inputEventQueue;
+    super(taskID);
     this.pipeProcessor = pipeProcessor;
+    this.pendingEventQueue = pendingEventQueue;
     this.outputEventCollector = outputEventCollector;
   }
 
-  public void executeForAWhile() {
-    if (inputEventQueue.isEmpty()) {
+  @Override
+  protected void executeForAWhile() {
+    if (pendingEventQueue.isEmpty()) {
       return;
     }
 
-    final Event event = inputEventQueue.poll();
+    final Event event = pendingEventQueue.poll();
 
     try {
       if (event instanceof TabletInsertionEvent) {
@@ -74,4 +76,16 @@ public class PipeProcessorPluginRuntimeWrapper {
           e);
     }
   }
+
+  @Override
+  public void close() {
+    try {
+      pipeProcessor.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOGGER.info(
+          "Error occurred during closing PipeProcessor, perhaps need to check 
whether the implementation of PipeProcessor is correct according to the 
pipe-api description.",
+          e);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeSubtask.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
similarity index 95%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeSubtask.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 983d70b32b..45b0331700 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -17,9 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.callable;
+package org.apache.iotdb.db.pipe.task.subtask;
 
-import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public abstract class PipeSubtask implements FutureCallback<Void>, 
Callable<Void> {
+public abstract class PipeSubtask implements FutureCallback<Void>, 
Callable<Void>, AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeSubtask.class);
 
@@ -95,7 +95,7 @@ public abstract class PipeSubtask implements 
FutureCallback<Void>, Callable<Void
           retryCount,
           throwable);
       lastFailedCause = throwable;
-      PipeRuntimeAgent.setupAndGetInstance().report(this);
+      PipeAgent.runtime().report(this);
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
index 43bb0673f7..c3c5aedb86 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.task.callable.PipeAssignerSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeAssignerSubtask;
 
 import org.junit.Before;
 import org.mockito.Mockito;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 694ee93b92..6fbc7fe2c6 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import 
org.apache.iotdb.db.pipe.core.connector.PipeConnectorPluginRuntimeWrapper;
-import org.apache.iotdb.db.pipe.task.callable.PipeConnectorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
+import org.apache.iotdb.pipe.api.PipeConnector;
 
 import org.junit.Before;
 import org.mockito.Mockito;
@@ -36,7 +36,7 @@ public class PipeConnectorSubtaskExecutorTest extends 
PipeSubtaskExecutorTest {
     subtask =
         Mockito.spy(
             new PipeConnectorSubtask(
-                "PipeConnectorSubtaskExecutorTest", 
mock(PipeConnectorPluginRuntimeWrapper.class)) {
+                "PipeConnectorSubtaskExecutorTest", mock(PipeConnector.class)) 
{
               @Override
               public void executeForAWhile() {}
             });
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index c4446ae5b4..1abf2ab05e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,12 +19,15 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import 
org.apache.iotdb.db.pipe.core.processor.PipeProcessorPluginRuntimeWrapper;
-import org.apache.iotdb.db.pipe.task.callable.PipeProcessorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
 
 import org.junit.Before;
 import org.mockito.Mockito;
 
+import java.util.concurrent.ArrayBlockingQueue;
+
 import static org.mockito.Mockito.mock;
 
 public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
@@ -36,7 +39,10 @@ public class PipeProcessorSubtaskExecutorTest extends 
PipeSubtaskExecutorTest {
     subtask =
         Mockito.spy(
             new PipeProcessorSubtask(
-                "PipeProcessorSubtaskExecutorTest", 
mock(PipeProcessorPluginRuntimeWrapper.class)) {
+                "PipeProcessorSubtaskExecutorTest",
+                mock(ArrayBlockingQueue.class),
+                mock(PipeProcessor.class),
+                mock(EventCollector.class)) {
               @Override
               public void executeForAWhile() {}
             });
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
index 8b4238d4c2..d24efc3fb7 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.task.callable.PipeSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 
 import org.junit.After;
 import org.junit.Assert;

Reply via email to