This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ce843c2c06 [IOTDB-5779] PipeConnector reuse strategy based on
reference count mechanism (#9629)
ce843c2c06 is described below
commit ce843c2c060db29fcd8c8953b7a562879d233134
Author: Itami Sho <[email protected]>
AuthorDate: Tue Apr 25 09:54:40 2023 +0800
[IOTDB-5779] PipeConnector reuse strategy based on reference count
mechanism (#9629)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../commons/pipe/task/meta/PipeStaticMeta.java | 24 +++--
.../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 6 +-
.../db/pipe/agent/plugin/PipePluginAgent.java | 34 +++----
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 17 +---
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 19 ++--
.../db/pipe/agent/task/PipeTaskRegionAgent.java | 22 -----
.../core/connector/PipeConnectorContainer.java | 22 -----
.../pipe/core/connector/PipeConnectorManager.java | 22 -----
.../connector/PipeConnectorSubtaskLifeCycle.java | 98 ++++++++++++++++++
.../connector/PipeConnectorSubtaskManager.java | 107 ++++++++++++++++++++
.../execution/executor/PipeSubtaskExecutor.java | 12 ++-
.../pipe/task/callable/PipeConnectorSubtask.java | 37 -------
.../pipe/task/callable/PipeProcessorSubtask.java | 37 -------
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 41 +++++++-
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 109 ++++++++++++++++++++-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 38 ++++++-
.../iotdb/db/pipe/task/stage/PipeTaskStage.java | 34 ++-----
.../task/{callable => subtask}/DecoratingLock.java | 2 +-
.../{callable => subtask}/PipeAssignerSubtask.java | 7 +-
.../subtask/PipeConnectorSubtask.java} | 44 ++++++---
.../subtask/PipeProcessorSubtask.java} | 38 ++++---
.../task/{callable => subtask}/PipeSubtask.java | 8 +-
.../executor/PipeAssignerSubtaskExecutorTest.java | 2 +-
.../executor/PipeConnectorSubtaskExecutorTest.java | 6 +-
.../executor/PipeProcessorSubtaskExecutorTest.java | 12 ++-
.../executor/PipeSubtaskExecutorTest.java | 2 +-
26 files changed, 528 insertions(+), 272 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index f0e2c77e5b..3b9c94c94c 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.commons.pipe.task.meta;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -37,6 +38,10 @@ public class PipeStaticMeta {
private Map<String, String> processorAttributes = new HashMap<>();
private Map<String, String> connectorAttributes = new HashMap<>();
+ private PipeParameters collectorParameters;
+ private PipeParameters processorParameters;
+ private PipeParameters connectorParameters;
+
private PipeStaticMeta() {}
public PipeStaticMeta(
@@ -50,6 +55,9 @@ public class PipeStaticMeta {
this.collectorAttributes = collectorAttributes;
this.processorAttributes = processorAttributes;
this.connectorAttributes = connectorAttributes;
+ collectorParameters = new PipeParameters(collectorAttributes);
+ processorParameters = new PipeParameters(processorAttributes);
+ connectorParameters = new PipeParameters(connectorAttributes);
}
public String getPipeName() {
@@ -60,16 +68,16 @@ public class PipeStaticMeta {
return createTime;
}
- public Map<String, String> getCollectorAttributes() {
- return collectorAttributes;
+ public PipeParameters getCollectorParameters() {
+ return collectorParameters;
}
- public Map<String, String> getProcessorAttributes() {
- return processorAttributes;
+ public PipeParameters getProcessorParameters() {
+ return processorParameters;
}
- public Map<String, String> getConnectorAttributes() {
- return connectorAttributes;
+ public PipeParameters getConnectorParameters() {
+ return connectorParameters;
}
public ByteBuffer serialize() throws IOException {
@@ -127,6 +135,10 @@ public class PipeStaticMeta {
ReadWriteIOUtils.readString(byteBuffer),
ReadWriteIOUtils.readString(byteBuffer));
}
+ pipeStaticMeta.collectorParameters = new
PipeParameters(pipeStaticMeta.collectorAttributes);
+ pipeStaticMeta.processorParameters = new
PipeParameters(pipeStaticMeta.processorAttributes);
+ pipeStaticMeta.connectorParameters = new
PipeParameters(pipeStaticMeta.connectorAttributes);
+
return pipeStaticMeta;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index a38d86f974..f490a171f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -32,9 +32,9 @@ public class PipeAgent {
/** Private constructor to prevent users from creating a new instance. */
private PipeAgent() {
- pipePluginAgent = PipePluginAgent.setupAndGetInstance();
- pipeTaskAgent = PipeTaskAgent.setupAndGetInstance();
- pipeRuntimeAgent = PipeRuntimeAgent.setupAndGetInstance();
+ pipePluginAgent = new PipePluginAgent();
+ pipeTaskAgent = new PipeTaskAgent();
+ pipeRuntimeAgent = new PipeRuntimeAgent();
}
/** The singleton holder of PipeAgent. */
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 1f5f70bd3d..e65495a7c3 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -24,7 +24,10 @@ import
org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
+import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeManagementException;
import org.slf4j.Logger;
@@ -43,6 +46,10 @@ public class PipePluginAgent {
private final DataNodePipePluginMetaKeeper pipePluginMetaKeeper;
+ public PipePluginAgent() {
+ this.pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
+ }
+
/////////////////////////////// Lock ///////////////////////////////
public void acquireLock() {
@@ -181,7 +188,15 @@ public class PipePluginAgent {
}
}
- public PipePlugin reflect(String pluginName) {
+ public PipeProcessor reflectProcessor(PipeParameters processorParameters) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public PipeConnector reflectConnector(PipeParameters connectorParameters) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ private PipePlugin reflect(String pluginName) {
PipePluginMeta information =
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
if (information == null) {
String errorMessage =
@@ -207,21 +222,4 @@ public class PipePluginAgent {
throw new RuntimeException(errorMessage);
}
}
-
- ///////////////////////// Singleton Instance Holder
/////////////////////////
-
- private PipePluginAgent() {
- this.pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
- }
-
- private static class PipePluginAgentServiceHolder {
- private static PipePluginAgent instance = null;
- }
-
- public static PipePluginAgent setupAndGetInstance() {
- if (PipePluginAgentServiceHolder.instance == null) {
- PipePluginAgentServiceHolder.instance = new PipePluginAgent();
- }
- return PipePluginAgentServiceHolder.instance;
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 55e3b0c1ec..8c03766e45 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.agent.runtime;
-import org.apache.iotdb.db.pipe.task.callable.PipeSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,19 +35,4 @@ public class PipeRuntimeAgent {
subtask.getTaskID(),
subtask.getLastFailedCause());
}
-
- ///////////////////////// Singleton Instance Holder
/////////////////////////
-
- private PipeRuntimeAgent() {}
-
- private static class PipeRuntimeAgentHolder {
- private static PipeRuntimeAgent INSTANCE = null;
- }
-
- public static PipeRuntimeAgent setupAndGetInstance() {
- if (PipeRuntimeAgentHolder.INSTANCE == null) {
- PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
- }
- return PipeRuntimeAgentHolder.INSTANCE;
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index dd8a6984f0..9fffa86706 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -19,20 +19,19 @@
package org.apache.iotdb.db.pipe.agent.task;
-public class PipeTaskAgent {
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
- ///////////////////////// Singleton Instance Holder
/////////////////////////
+public class PipeTaskAgent {
- private PipeTaskAgent() {}
+ private final PipeMetaKeeper pipeMetaKeeper;
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
+ public PipeTaskAgent() {
+ pipeMetaKeeper = new PipeMetaKeeper();
}
- public static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
+ // TODO: remove this method
+ public PipeMeta getPipeMeta(String pipeName) {
+ return pipeMetaKeeper.getPipeMeta(pipeName);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
deleted file mode 100644
index 7be285ef66..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.task;
-
-public class PipeTaskRegionAgent {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
deleted file mode 100644
index 8651ed3df1..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.connector;
-
-public class PipeConnectorContainer {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
deleted file mode 100644
index 881edd67f8..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.connector;
-
-public class PipeConnectorManager {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
new file mode 100644
index 0000000000..7e71ae2f7a
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+import
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
+
+public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
+
+ private final PipeConnectorSubtaskExecutor executor;
+ private final PipeConnectorSubtask subtask;
+
+ private int runningTaskCount;
+ private int aliveTaskCount;
+
+ public PipeConnectorSubtaskLifeCycle(
+ PipeConnectorSubtaskExecutor executor, PipeConnectorSubtask subtask) {
+ this.executor = executor;
+ this.subtask = subtask;
+
+ runningTaskCount = 0;
+ aliveTaskCount = 0;
+ }
+
+ public PipeConnectorSubtask getSubtask() {
+ return subtask;
+ }
+
+ public synchronized void register() {
+ if (aliveTaskCount < 0) {
+ throw new IllegalStateException("aliveTaskCount < 0");
+ }
+ if (aliveTaskCount == 0) {
+ executor.register(subtask);
+ runningTaskCount = 0;
+ }
+ aliveTaskCount++;
+ }
+
+ /**
+ * @return true if the subtask is out of life cycle, indicating that the
subtask should never be
+ * used again
+ */
+ public synchronized boolean deregister() {
+ if (aliveTaskCount <= 0) {
+ throw new IllegalStateException("aliveTaskCount <= 0");
+ }
+ if (aliveTaskCount == 1) {
+ executor.deregister(subtask.getTaskID());
+ // this subtask is out of life cycle, should never be used again
+ return true;
+ }
+ aliveTaskCount--;
+ return false;
+ }
+
+ public synchronized void start() {
+ if (runningTaskCount < 0) {
+ throw new IllegalStateException("runningTaskCount < 0");
+ }
+ if (runningTaskCount == 0) {
+ executor.start(subtask.getTaskID());
+ }
+ runningTaskCount++;
+ }
+
+ public synchronized void stop() {
+ if (runningTaskCount <= 0) {
+ throw new IllegalStateException("runningTaskCount <= 0");
+ }
+ if (runningTaskCount == 1) {
+ executor.stop(subtask.getTaskID());
+ }
+ runningTaskCount--;
+ }
+
+ @Override
+ public synchronized void close() {
+ executor.deregister(subtask.getTaskID());
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
new file mode 100644
index 0000000000..a00cedf7b5
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class PipeConnectorSubtaskManager {
+
+ private final Map<String, PipeConnectorSubtaskLifeCycle>
+ attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
+
+ public synchronized String register(
+ PipeConnectorSubtaskExecutor executor, PipeParameters
connectorAttributes) {
+ final String attributeSortedString =
+ new TreeMap<>(connectorAttributes.getAttribute()).toString();
+
+ if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
+ final PipeConnector pipeConnector =
PipeAgent.plugin().reflectConnector(connectorAttributes);
+ final PipeConnectorSubtask pipeConnectorSubtask =
+ new PipeConnectorSubtask(attributeSortedString, pipeConnector);
+ final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
+ new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask);
+ attributeSortedString2SubtaskLifeCycleMap.put(
+ attributeSortedString, pipeConnectorSubtaskLifeCycle);
+ }
+
+
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).register();
+
+ return attributeSortedString;
+ }
+
+ public synchronized void deregister(String attributeSortedString) {
+ if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
+ throw new PipeException(
+ "Failed to deregister PipeConnectorSubtask. No such subtask: " +
attributeSortedString);
+ }
+
+ if
(attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister())
{
+ attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
+ }
+ }
+
+ public synchronized void start(String attributeSortedString) {
+ if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
+ throw new PipeException(
+ "Failed to deregister PipeConnectorSubtask. No such subtask: " +
attributeSortedString);
+ }
+
+
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start();
+ }
+
+ public synchronized void stop(String attributeSortedString) {
+ if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
+ throw new PipeException(
+ "Failed to deregister PipeConnectorSubtask. No such subtask: " +
attributeSortedString);
+ }
+
+
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
+ }
+
+ public PipeConnectorSubtask getPipeConnectorSubtask(String
attributeSortedString) {
+ if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
+ throw new PipeException(
+ "Failed to get PipeConnectorSubtask. No such subtask: " +
attributeSortedString);
+ }
+
+ return
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
+ }
+
+ ///////////////////////// Singleton Instance Holder
/////////////////////////
+
+ private PipeConnectorSubtaskManager() {}
+
+ private static class PipeSubtaskManagerHolder {
+ private static final PipeConnectorSubtaskManager INSTANCE = new
PipeConnectorSubtaskManager();
+ }
+
+ public static PipeConnectorSubtaskManager instance() {
+ return PipeSubtaskManagerHolder.INSTANCE;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 05956f0a90..bf4f8daf2e 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.execution.executor;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.pipe.task.callable.PipeSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -99,7 +99,15 @@ public abstract class PipeSubtaskExecutor {
public final void deregister(String subTaskID) {
stop(subTaskID);
- registeredIdSubtaskMapper.remove(subTaskID);
+ final PipeSubtask subtask = registeredIdSubtaskMapper.remove(subTaskID);
+
+ if (subtask != null) {
+ try {
+ subtask.close();
+ } catch (Exception e) {
+ LOGGER.error("Failed to close the subtask {}.", subTaskID, e);
+ }
+ }
}
@TestOnly
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeConnectorSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeConnectorSubtask.java
deleted file mode 100644
index dd1ec6db5d..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeConnectorSubtask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.callable;
-
-import
org.apache.iotdb.db.pipe.core.connector.PipeConnectorPluginRuntimeWrapper;
-
-public class PipeConnectorSubtask extends PipeSubtask {
-
- private final PipeConnectorPluginRuntimeWrapper pipeConnector;
-
- public PipeConnectorSubtask(String taskID, PipeConnectorPluginRuntimeWrapper
pipeConnector) {
- super(taskID);
- this.pipeConnector = pipeConnector;
- }
-
- @Override
- protected void executeForAWhile() {
- pipeConnector.executeForAWhile();
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeProcessorSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeProcessorSubtask.java
deleted file mode 100644
index b13dbc4c18..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeProcessorSubtask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.callable;
-
-import
org.apache.iotdb.db.pipe.core.processor.PipeProcessorPluginRuntimeWrapper;
-
-public class PipeProcessorSubtask extends PipeSubtask {
-
- private final PipeProcessorPluginRuntimeWrapper pipeProcessor;
-
- public PipeProcessorSubtask(String taskID, PipeProcessorPluginRuntimeWrapper
pipeProcessor) {
- super(taskID);
- this.pipeProcessor = pipeProcessor;
- }
-
- @Override
- protected void executeForAWhile() {
- pipeProcessor.executeForAWhile();
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 1962555dae..a496376023 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -20,12 +20,43 @@
package org.apache.iotdb.db.pipe.task.stage;
import org.apache.iotdb.db.pipe.execution.executor.PipeAssignerSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.callable.PipeAssignerSubtask;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.subtask.PipeAssignerSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.exception.PipeException;
-public class PipeTaskCollectorStage extends PipeTaskStage {
+public class PipeTaskCollectorStage implements PipeTaskStage {
- protected PipeTaskCollectorStage(
- PipeAssignerSubtaskExecutor executor, PipeAssignerSubtask subtask) {
- super(executor, subtask);
+ private final PipeSubtaskExecutor executor;
+ private final PipeSubtask subtask;
+
+ PipeTaskCollectorStage(PipeAssignerSubtaskExecutor executor,
PipeAssignerSubtask subtask) {
+ this.executor = executor;
+ this.subtask = subtask;
+ }
+
+ @Override
+ public void create() throws PipeException {
+ executor.register(subtask);
+ }
+
+ @Override
+ public void start() throws PipeException {
+ executor.start(subtask.getTaskID());
+ }
+
+ @Override
+ public void stop() throws PipeException {
+ executor.stop(subtask.getTaskID());
+ }
+
+ @Override
+ public void drop() throws PipeException {
+ executor.deregister(subtask.getTaskID());
+ }
+
+ @Override
+ public PipeSubtask getSubtask() {
+ return subtask;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 5b7f48e16b..0194dc5c8a 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,13 +19,114 @@
package org.apache.iotdb.db.pipe.task.stage;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.db.pipe.core.connector.PipeConnectorSubtaskManager;
import
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.callable.PipeConnectorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeException;
-public class PipeTaskConnectorStage extends PipeTaskStage {
+public class PipeTaskConnectorStage implements PipeTaskStage {
+
+ protected final PipeConnectorSubtaskExecutor executor;
+ protected final PipeParameters connectorAttributes;
+
+ protected PipeStatus status = null;
+ protected String connectorSubtaskId = null;
+ protected boolean hasBeenExternallyStopped = false;
protected PipeTaskConnectorStage(
- PipeConnectorSubtaskExecutor executor, PipeConnectorSubtask subtask) {
- super(executor, subtask);
+ PipeConnectorSubtaskExecutor executor, PipeParameters
connectorAttributes) {
+ this.executor = executor;
+ this.connectorAttributes = connectorAttributes;
+ }
+
+ @Override
+ public synchronized void create() throws PipeException {
+ if (status != null) {
+ if (status == PipeStatus.RUNNING) {
+ throw new PipeException(
+ String.format("The PipeConnectorSubtask %s has been started",
connectorSubtaskId));
+ }
+ if (status == PipeStatus.DROPPED) {
+ throw new PipeException(
+ String.format("The PipeConnectorSubtask %s has been dropped",
connectorSubtaskId));
+ }
+ // status == PipeStatus.STOPPED
+ if (hasBeenExternallyStopped) {
+ throw new PipeException(
+ String.format(
+ "The PipeConnectorSubtask %s has been externally stopped",
connectorSubtaskId));
+ }
+ // otherwise, do nothing to allow retry strategy
+ return;
+ }
+
+ // status == null, register the connector
+ connectorSubtaskId =
+ PipeConnectorSubtaskManager.instance().register(executor,
connectorAttributes);
+ status = PipeStatus.STOPPED;
+ }
+
+ @Override
+ public synchronized void start() throws PipeException {
+ if (status == null) {
+ throw new PipeException(
+ String.format("The PipeConnectorSubtask %s has not been created",
connectorSubtaskId));
+ }
+ if (status == PipeStatus.RUNNING) {
+ // do nothing to allow retry strategy
+ return;
+ }
+ if (status == PipeStatus.DROPPED) {
+ throw new PipeException(
+ String.format("The PipeConnectorSubtask %s has been dropped",
connectorSubtaskId));
+ }
+
+ // status == PipeStatus.STOPPED, start the connector
+ PipeConnectorSubtaskManager.instance().start(connectorSubtaskId);
+ status = PipeStatus.RUNNING;
+ }
+
+ @Override
+ public synchronized void stop() throws PipeException {
+ if (status == null) {
+ throw new PipeException(
+ String.format("The PipeConnectorSubtask %s has not been created",
connectorSubtaskId));
+ }
+ if (status == PipeStatus.STOPPED) {
+ // do nothing to allow retry strategy
+ return;
+ }
+ if (status == PipeStatus.DROPPED) {
+ throw new PipeException(
+ String.format("The PipeConnectorSubtask %s has been dropped",
connectorSubtaskId));
+ }
+
+ // status == PipeStatus.RUNNING, stop the connector
+ PipeConnectorSubtaskManager.instance().stop(connectorSubtaskId);
+ status = PipeStatus.STOPPED;
+ hasBeenExternallyStopped = true;
+ }
+
+ @Override
+ public synchronized void drop() throws PipeException {
+ if (status == null) {
+ throw new PipeException(
+ String.format("The PipeConnectorSubtask %s has not been created",
connectorSubtaskId));
+ }
+ if (status == PipeStatus.DROPPED) {
+ // do nothing to allow retry strategy
+ return;
+ }
+
+ // status == PipeStatus.RUNNING or PipeStatus.STOPPED, drop the connector
+ PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
+ status = PipeStatus.DROPPED;
+ }
+
+ @Override
+ public PipeSubtask getSubtask() {
+ return
PipeConnectorSubtaskManager.instance().getPipeConnectorSubtask(connectorSubtaskId);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 6f2e058193..e7345be1e1 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -20,12 +20,44 @@
package org.apache.iotdb.db.pipe.task.stage;
import
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.callable.PipeProcessorSubtask;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.exception.PipeException;
-public class PipeTaskProcessorStage extends PipeTaskStage {
+public class PipeTaskProcessorStage implements PipeTaskStage {
+
+ protected final PipeSubtaskExecutor executor;
+ protected final PipeSubtask subtask;
protected PipeTaskProcessorStage(
PipeProcessorSubtaskExecutor executor, PipeProcessorSubtask subtask) {
- super(executor, subtask);
+ this.executor = executor;
+ this.subtask = subtask;
+ }
+
+ @Override
+ public void create() throws PipeException {
+ executor.register(subtask);
+ }
+
+ @Override
+ public void start() throws PipeException {
+ executor.start(subtask.getTaskID());
+ }
+
+ @Override
+ public void stop() throws PipeException {
+ executor.stop(subtask.getTaskID());
+ }
+
+ @Override
+ public void drop() throws PipeException {
+ executor.deregister(subtask.getTaskID());
+ }
+
+ @Override
+ public PipeSubtask getSubtask() {
+ return subtask;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index 3cbe0b569b..5f57fbabcb 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -19,62 +19,42 @@
package org.apache.iotdb.db.pipe.task.stage;
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.callable.PipeSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
import org.apache.iotdb.pipe.api.exception.PipeException;
-public abstract class PipeTaskStage {
-
- protected final PipeSubtaskExecutor executor;
- protected final PipeSubtask subtask;
-
- protected PipeTaskStage(PipeSubtaskExecutor executor, PipeSubtask subtask) {
- this.executor = executor;
- this.subtask = subtask;
- }
+public interface PipeTaskStage {
/**
* Create a pipe task stage.
*
* @throws PipeException if failed to create a pipe task stage.
*/
- public final void create() throws PipeException {
- executor.register(subtask);
- }
-
+ void create() throws PipeException;
/**
* Start a pipe task stage.
*
* @throws PipeException if failed to start a pipe task stage.
*/
- public final void start() throws PipeException {
- executor.start(subtask.getTaskID());
- }
+ void start() throws PipeException;
/**
* Stop a pipe task stage.
*
* @throws PipeException if failed to stop a pipe task stage.
*/
- public final void stop() throws PipeException {
- executor.stop(subtask.getTaskID());
- }
+ void stop() throws PipeException;
/**
* Drop a pipe task stage.
*
* @throws PipeException if failed to drop a pipe task stage.
*/
- public final void drop() throws PipeException {
- executor.deregister(subtask.getTaskID());
- }
+ void drop() throws PipeException;
/**
* Get the pipe subtask.
*
* @return the pipe subtask.
*/
- public final PipeSubtask getSubtask() {
- return subtask;
- }
+ PipeSubtask getSubtask();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/DecoratingLock.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/DecoratingLock.java
similarity index 96%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/task/callable/DecoratingLock.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/DecoratingLock.java
index 54f562416b..4e62a566d7 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/DecoratingLock.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/DecoratingLock.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.task.callable;
+package org.apache.iotdb.db.pipe.task.subtask;
import java.util.concurrent.atomic.AtomicBoolean;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeAssignerSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
similarity index 91%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeAssignerSubtask.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
index 64d421cf8c..39ffaefeeb 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeAssignerSubtask.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.task.callable;
+package org.apache.iotdb.db.pipe.task.subtask;
public class PipeAssignerSubtask extends PipeSubtask {
@@ -29,4 +29,9 @@ public class PipeAssignerSubtask extends PipeSubtask {
protected void executeForAWhile() {
// do nothing
}
+
+ @Override
+ public void close() {
+ // TODO
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
similarity index 64%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 1981f6b87f..e6764cf699 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.connector;
+package org.apache.iotdb.db.pipe.task.subtask;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
@@ -29,29 +29,37 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
-public class PipeConnectorPluginRuntimeWrapper {
+public class PipeConnectorSubtask extends PipeSubtask {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(PipeConnectorPluginRuntimeWrapper.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConnectorSubtask.class);
- private final Queue<Event> inputEventQueue;
+ // input
+ private final ArrayBlockingQueue<Event> pendingQueue;
+ // output
private final PipeConnector pipeConnector;
- public PipeConnectorPluginRuntimeWrapper(
- Queue<Event> inputEventQueue, PipeConnector pipeConnector) {
- this.inputEventQueue = inputEventQueue;
+ /** @param taskID connectorAttributeSortedString */
+ public PipeConnectorSubtask(String taskID, PipeConnector pipeConnector) {
+ super(taskID);
+ // TODO: make the size of the queue size reasonable and configurable
+ this.pendingQueue = new ArrayBlockingQueue<>(1024 * 1024);
this.pipeConnector = pipeConnector;
}
+ public ArrayBlockingQueue<Event> getInputPendingQueue() {
+ return pendingQueue;
+ }
+
// TODO: for a while
- public void executeForAWhile() {
- if (inputEventQueue.isEmpty()) {
+ @Override
+ protected void executeForAWhile() {
+ if (pendingQueue.isEmpty()) {
return;
}
- final Event event = inputEventQueue.poll();
+ final Event event = pendingQueue.poll();
try {
if (event instanceof TabletInsertionEvent) {
@@ -70,4 +78,16 @@ public class PipeConnectorPluginRuntimeWrapper {
e);
}
}
+
+ @Override
+ public void close() {
+ try {
+ pipeConnector.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.info(
+ "Error occurred during closing PipeConnector, perhaps need to check
whether the implementation of PipeConnector is correct according to the
pipe-api description.",
+ e);
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
similarity index 71%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index a5b56bc68e..aa8772f2a1 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.processor;
+package org.apache.iotdb.db.pipe.task.subtask;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -30,32 +30,34 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
-public class PipeProcessorPluginRuntimeWrapper {
+public class PipeProcessorSubtask extends PipeSubtask {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(PipeProcessorPluginRuntimeWrapper.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeProcessorSubtask.class);
- private final Queue<Event> inputEventQueue;
+ private final ArrayBlockingQueue<Event> pendingEventQueue;
private final PipeProcessor pipeProcessor;
private final EventCollector outputEventCollector;
- public PipeProcessorPluginRuntimeWrapper(
- Queue<Event> inputEventQueue,
+ public PipeProcessorSubtask(
+ String taskID,
+ ArrayBlockingQueue<Event> pendingEventQueue,
PipeProcessor pipeProcessor,
EventCollector outputEventCollector) {
- this.inputEventQueue = inputEventQueue;
+ super(taskID);
this.pipeProcessor = pipeProcessor;
+ this.pendingEventQueue = pendingEventQueue;
this.outputEventCollector = outputEventCollector;
}
- public void executeForAWhile() {
- if (inputEventQueue.isEmpty()) {
+ @Override
+ protected void executeForAWhile() {
+ if (pendingEventQueue.isEmpty()) {
return;
}
- final Event event = inputEventQueue.poll();
+ final Event event = pendingEventQueue.poll();
try {
if (event instanceof TabletInsertionEvent) {
@@ -74,4 +76,16 @@ public class PipeProcessorPluginRuntimeWrapper {
e);
}
}
+
+ @Override
+ public void close() {
+ try {
+ pipeProcessor.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.info(
+ "Error occurred during closing PipeProcessor, perhaps need to check
whether the implementation of PipeProcessor is correct according to the
pipe-api description.",
+ e);
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
similarity index 95%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeSubtask.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 983d70b32b..45b0331700 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeSubtask.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -17,9 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.task.callable;
+package org.apache.iotdb.db.pipe.task.subtask;
-import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-public abstract class PipeSubtask implements FutureCallback<Void>,
Callable<Void> {
+public abstract class PipeSubtask implements FutureCallback<Void>,
Callable<Void>, AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeSubtask.class);
@@ -95,7 +95,7 @@ public abstract class PipeSubtask implements
FutureCallback<Void>, Callable<Void
retryCount,
throwable);
lastFailedCause = throwable;
- PipeRuntimeAgent.setupAndGetInstance().report(this);
+ PipeAgent.runtime().report(this);
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
index 43bb0673f7..c3c5aedb86 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.execution.executor;
-import org.apache.iotdb.db.pipe.task.callable.PipeAssignerSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeAssignerSubtask;
import org.junit.Before;
import org.mockito.Mockito;
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 694ee93b92..6fbc7fe2c6 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.pipe.execution.executor;
-import
org.apache.iotdb.db.pipe.core.connector.PipeConnectorPluginRuntimeWrapper;
-import org.apache.iotdb.db.pipe.task.callable.PipeConnectorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
+import org.apache.iotdb.pipe.api.PipeConnector;
import org.junit.Before;
import org.mockito.Mockito;
@@ -36,7 +36,7 @@ public class PipeConnectorSubtaskExecutorTest extends
PipeSubtaskExecutorTest {
subtask =
Mockito.spy(
new PipeConnectorSubtask(
- "PipeConnectorSubtaskExecutorTest",
mock(PipeConnectorPluginRuntimeWrapper.class)) {
+ "PipeConnectorSubtaskExecutorTest", mock(PipeConnector.class))
{
@Override
public void executeForAWhile() {}
});
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index c4446ae5b4..1abf2ab05e 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,12 +19,15 @@
package org.apache.iotdb.db.pipe.execution.executor;
-import
org.apache.iotdb.db.pipe.core.processor.PipeProcessorPluginRuntimeWrapper;
-import org.apache.iotdb.db.pipe.task.callable.PipeProcessorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.junit.Before;
import org.mockito.Mockito;
+import java.util.concurrent.ArrayBlockingQueue;
+
import static org.mockito.Mockito.mock;
public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
@@ -36,7 +39,10 @@ public class PipeProcessorSubtaskExecutorTest extends
PipeSubtaskExecutorTest {
subtask =
Mockito.spy(
new PipeProcessorSubtask(
- "PipeProcessorSubtaskExecutorTest",
mock(PipeProcessorPluginRuntimeWrapper.class)) {
+ "PipeProcessorSubtaskExecutorTest",
+ mock(ArrayBlockingQueue.class),
+ mock(PipeProcessor.class),
+ mock(EventCollector.class)) {
@Override
public void executeForAWhile() {}
});
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
index 8b4238d4c2..d24efc3fb7 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.execution.executor;
-import org.apache.iotdb.db.pipe.task.callable.PipeSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
import org.junit.After;
import org.junit.Assert;