This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5692 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0c0b21bd79e098c88300c98887bf4dabbc44e66d Author: Steve Yurong Su <[email protected]> AuthorDate: Sun Mar 19 05:16:29 2023 +0800 Pipe: Skeleton Code Framework --- .../pipe/task/meta/PipeTaskMetaAccessor.java | 20 +--- .../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 3 + .../pipe/agent/{ => plugin}/PipePluginAgent.java | 5 +- .../HeartbeatScheduler.java} | 21 +---- .../MetaSyncScheduler.java} | 20 +--- .../pipe/agent/{ => runtime}/PipeRuntimeAgent.java | 4 +- .../db/pipe/agent/{ => task}/PipeTaskAgent.java | 4 +- .../PipeTaskRegionAgent.java} | 20 +--- .../collector/PipeCollectorEventPendingQueue.java} | 20 +--- .../collector/PipeCollectorEventSelector.java} | 20 +--- .../historical/PipeHistoricalCollector.java} | 20 +--- .../collector/realtime/PipeRealtimeCollector.java} | 20 +--- .../realtime/cache/PipeRealtimeEventCache.java} | 20 +--- .../realtime/listener/IoTLogListerner.java} | 20 +--- .../realtime/listener/RatisLogListener.java} | 20 +--- .../realtime/listener/SimpleLogListener.java} | 20 +--- .../listener/TsFileGenerationListener.java} | 20 +--- .../collector/realtime/matcher/Rule.java} | 20 +--- .../realtime/matcher/RulePrefixMatchTree.java} | 20 +--- .../collector/realtime/recorder/TsFileEpoch.java} | 20 +--- .../realtime/recorder/TsFileEpochRecorder.java} | 20 +--- .../connector/PipeConnectorContainer.java} | 20 +--- .../connector/PipeConnectorManager.java} | 20 +--- .../PipeConnectorPluginRuntimeWrapper.java} | 19 ++-- .../event/PipeTabletInsertionEvent.java} | 30 +++--- .../event/PipeTsFileInsertionEvent.java} | 22 ++--- .../iotdb/db/pipe/core/event/access/PipeRow.java | 102 +++++++++++++++++++++ .../db/pipe/core/event/access/PipeRowIterator.java | 60 ++++++++++++ .../event/collector/PipeEventCollector.java} | 27 +++--- .../event/collector/PipeRowCollector.java} | 21 ++--- .../event/indexer/PipeEventIndexer.java} | 20 +--- .../event/indexer/PipeIoTEventIndexer.java} | 20 +--- .../event/indexer/PipeRatisEventIndexer.java} | 20 +--- .../event/indexer/PipeSimpleEventIndexer.java} | 20 +--- .../event/indexer/PipeTsFileEventIndexer.java} | 20 +--- .../PipeProcessorPluginRuntimeWrapper.java} | 19 ++-- .../executor/PipeAssignerSubtaskExecutor.java} | 20 +--- .../executor/PipeConnectorSubtaskExecutor.java} | 20 +--- .../executor/PipeProcessorSubtaskExecutor.java} | 20 +--- .../executor/PipeSubtaskExecutor.java} | 20 +--- .../pipe/execution/executor/PipeTaskExecutor.java | 49 ++++++++++ .../scheduler/PipeAssignerSubtaskScheduler.java} | 24 +++-- .../scheduler/PipeConnectorSubtaskScheduler.java} | 24 +++-- .../scheduler/PipeProcessorSubtaskScheduler.java} | 24 +++-- .../scheduler/PipeSubtaskScheduler.java} | 21 ++--- .../execution/scheduler/PipeTaskScheduler.java | 60 ++++++++++++ .../PipeFileManager.java} | 20 +--- .../PipeRaftlogHolder.java} | 20 +--- .../PipeTsFileHolder.java} | 20 +--- .../PipeWALHolder.java} | 20 +--- .../PipeRuntimeAgent.java => task/PipeTask.java} | 34 ++++--- .../PipeTaskBuilder.java} | 21 +---- .../metrics/PipeTaskRuntimeRecorder.java} | 20 +--- .../runnable/PipeAssignerSubtask.java} | 20 ++-- .../runnable/PipeConnectorSubtask.java} | 20 ++-- .../runnable/PipeProcessorSubtask.java} | 20 ++-- .../runnable/PipeSubtask.java} | 20 ++-- .../stage/PipeTaskCollectorStage.java} | 25 +++-- .../stage/PipeTaskConnectorStage.java} | 25 +++-- .../stage/PipeTaskProcessorStage.java} | 25 +++-- .../stage/PipeTaskStage.java} | 41 ++++++--- .../impl/DataNodeInternalRPCServiceImpl.java | 6 +- 62 files changed, 571 insertions(+), 865 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java index 5034fb50e7..04653122c2 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.commons.pipe.task.meta; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeTaskMetaAccessor {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java index 7e3b3a1e87..1c50fe5b40 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java @@ -20,6 +20,9 @@ package org.apache.iotdb.db.pipe.agent; import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper; +import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent; +import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent; +import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent; /** PipeAgent is the entry point of the pipe module in DatNode. */ public class PipeAgent { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java similarity index 97% rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java rename to server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java index 57cb0fb1b3..6831ac6a34 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.agent.plugin; import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper; import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta; @@ -188,7 +188,8 @@ public class PipePluginAgent { private static PipePluginAgent instance = null; } - static PipePluginAgent setupAndGetInstance(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) { + public static PipePluginAgent setupAndGetInstance( + DataNodePipePluginMetaKeeper pipePluginMetaKeeper) { if (PipePluginAgentServiceHolder.instance == null) { PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java index 5034fb50e7..de3f30f388 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java @@ -17,22 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.agent.runtime; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */ +public class HeartbeatScheduler {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java index 5034fb50e7..366176cd1c 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.agent.runtime; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class MetaSyncScheduler {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java similarity index 92% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java index e42b1f66f3..cbfe53be8b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.agent.runtime; public class PipeRuntimeAgent { @@ -29,7 +29,7 @@ public class PipeRuntimeAgent { private static PipeRuntimeAgent INSTANCE = null; } - static PipeRuntimeAgent setupAndGetInstance() { + public static PipeRuntimeAgent setupAndGetInstance() { if (PipeRuntimeAgentHolder.INSTANCE == null) { PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java similarity index 92% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java index 5034fb50e7..dd8a6984f0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.agent.task; public class PipeTaskAgent { @@ -29,7 +29,7 @@ public class PipeTaskAgent { private static PipeTaskAgent instance = null; } - static PipeTaskAgent setupAndGetInstance() { + public static PipeTaskAgent setupAndGetInstance() { if (PipeTaskAgentHolder.instance == null) { PipeTaskAgentHolder.instance = new PipeTaskAgent(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java index 5034fb50e7..7be285ef66 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.agent.task; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeTaskRegionAgent {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java index 5034fb50e7..c3bfb68015 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeCollectorEventPendingQueue {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java index 5034fb50e7..e1c50a9601 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeCollectorEventSelector {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java index 5034fb50e7..6e680de847 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.historical; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeHistoricalCollector {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java index 5034fb50e7..b5fad778b0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.realtime; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeRealtimeCollector {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java index 5034fb50e7..7525e06b54 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.realtime.cache; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeRealtimeEventCache {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java index 5034fb50e7..b91334430d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.realtime.listener; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class IoTLogListerner {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java index 5034fb50e7..3aa42354f5 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.realtime.listener; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class RatisLogListener {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java index 5034fb50e7..b5e1eaf93b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.realtime.listener; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class SimpleLogListener {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java index 5034fb50e7..4c28795219 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.realtime.listener; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class TsFileGenerationListener {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java index 5034fb50e7..948fab2acd 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.realtime.matcher; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class Rule {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java index 5034fb50e7..a543d7f0ef 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.realtime.matcher; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class RulePrefixMatchTree {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java index 5034fb50e7..a85d3a0b70 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.realtime.recorder; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class TsFileEpoch {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java index 5034fb50e7..6948cef200 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.collector.realtime.recorder; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class TsFileEpochRecorder {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java index 5034fb50e7..8651ed3df1 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.connector; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeConnectorContainer {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java index 5034fb50e7..881edd67f8 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.connector; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeConnectorManager {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java similarity index 63% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java index 5034fb50e7..c1c5be1166 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java @@ -17,22 +17,15 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.connector; -public class PipeTaskAgent { +import org.apache.iotdb.pipe.api.PipeConnector; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public class PipeConnectorPluginRuntimeWrapper { - private PipeTaskAgent() {} + private final PipeConnector pipeConnector; - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; + public PipeConnectorPluginRuntimeWrapper(PipeConnector pipeConnector) { + this.pipeConnector = pipeConnector; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java similarity index 51% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java index e42b1f66f3..a58e8db497 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java @@ -17,22 +17,30 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.event; -public class PipeRuntimeAgent { +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent; +import org.apache.iotdb.tsfile.write.record.Tablet; - ///////////////////////// Singleton Instance Holder ///////////////////////// +import java.util.Iterator; +import java.util.function.BiConsumer; - private PipeRuntimeAgent() {} +public class PipeTabletInsertionEvent implements TabletInsertionEvent { - private static class PipeRuntimeAgentHolder { - private static PipeRuntimeAgent INSTANCE = null; + @Override + public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { + return null; } - static PipeRuntimeAgent setupAndGetInstance() { - if (PipeRuntimeAgentHolder.INSTANCE == null) { - PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent(); - } - return PipeRuntimeAgentHolder.INSTANCE; + @Override + public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, RowCollector> consumer) { + return null; + } + + @Override + public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) { + return null; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java similarity index 63% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java index 5034fb50e7..d11b4f780d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java @@ -17,22 +17,20 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.event; -public class PipeTaskAgent { +import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public class PipeTsFileInsertionEvent implements TsFileInsertionEvent { - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; + @Override + public Iterable<TabletInsertionEvent> toTabletInsertionEvents() { + return null; } - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; + @Override + public TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) { + return null; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java new file mode 100644 index 0000000000..43b438445f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.core.event.access; + +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import org.apache.iotdb.pipe.api.type.Binary; +import org.apache.iotdb.pipe.api.type.Type; +import org.apache.iotdb.tsfile.read.common.Path; + +import java.io.IOException; +import java.util.List; + +public class PipeRow implements Row { + + @Override + public long getTime() throws IOException { + return 0; + } + + @Override + public int getInt(int columnIndex) throws IOException { + return 0; + } + + @Override + public long getLong(int columnIndex) throws IOException { + return 0; + } + + @Override + public float getFloat(int columnIndex) throws IOException { + return 0; + } + + @Override + public double getDouble(int columnIndex) throws IOException { + return 0; + } + + @Override + public boolean getBoolean(int columnIndex) throws IOException { + return false; + } + + @Override + public Binary getBinary(int columnIndex) throws IOException { + return null; + } + + @Override + public String getString(int columnIndex) throws IOException { + return null; + } + + @Override + public Type getDataType(int columnIndex) { + return null; + } + + @Override + public boolean isNull(int columnIndex) { + return false; + } + + @Override + public int size() { + return 0; + } + + @Override + public int getColumnIndex(Path columnName) throws PipeParameterNotValidException { + return 0; + } + + @Override + public List<Path> getColumnNames() { + return null; + } + + @Override + public List<Type> getColumnTypes() { + return null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java new file mode 100644 index 0000000000..960214bea6 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.core.event.access; + +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.access.RowIterator; +import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import org.apache.iotdb.pipe.api.type.Type; +import org.apache.iotdb.tsfile.read.common.Path; + +import java.io.IOException; +import java.util.List; + +public class PipeRowIterator implements RowIterator { + + @Override + public boolean hasNextRow() { + return false; + } + + @Override + public Row next() throws IOException { + return null; + } + + @Override + public void reset() {} + + @Override + public int getColumnIndex(Path columnName) throws PipeParameterNotValidException { + return 0; + } + + @Override + public List<Path> getColumnNames() { + return null; + } + + @Override + public List<Type> getColumnTypes() { + return null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java similarity index 53% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java index e42b1f66f3..2c2bbfd38f 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java @@ -17,22 +17,23 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.event.collector; -public class PipeRuntimeAgent { +import org.apache.iotdb.pipe.api.collector.EventCollector; +import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent; +import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent; - ///////////////////////// Singleton Instance Holder ///////////////////////// +import java.io.IOException; - private PipeRuntimeAgent() {} +public class PipeEventCollector implements EventCollector { - private static class PipeRuntimeAgentHolder { - private static PipeRuntimeAgent INSTANCE = null; - } + @Override + public void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException {} - static PipeRuntimeAgent setupAndGetInstance() { - if (PipeRuntimeAgentHolder.INSTANCE == null) { - PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent(); - } - return PipeRuntimeAgentHolder.INSTANCE; - } + @Override + public void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException {} + + @Override + public void collectDeletionEvent(DeletionEvent event) throws IOException {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java index 5034fb50e7..525e79c137 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java @@ -17,22 +17,15 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.event.collector; -public class PipeTaskAgent { +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.collector.RowCollector; - ///////////////////////// Singleton Instance Holder ///////////////////////// +import java.io.IOException; - private PipeTaskAgent() {} +public class PipeRowCollector implements RowCollector { - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + @Override + public void collectRow(Row row) throws IOException {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java index 5034fb50e7..22cbce9e04 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.event.indexer; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public interface PipeEventIndexer {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java index 5034fb50e7..63aff8a519 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.event.indexer; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeIoTEventIndexer {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java index 5034fb50e7..4520855085 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.event.indexer; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeRatisEventIndexer {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java index 5034fb50e7..d5add45547 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.event.indexer; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeSimpleEventIndexer {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java index 5034fb50e7..be53e89751 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.event.indexer; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeTsFileEventIndexer implements PipeEventIndexer {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java similarity index 63% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java index 5034fb50e7..3153d2ae0e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java @@ -17,22 +17,15 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.core.processor; -public class PipeTaskAgent { +import org.apache.iotdb.pipe.api.PipeProcessor; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public class PipeProcessorPluginRuntimeWrapper { - private PipeTaskAgent() {} + private final PipeProcessor pipeProcessor; - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; + public PipeProcessorPluginRuntimeWrapper(PipeProcessor pipeProcessor) { + this.pipeProcessor = pipeProcessor; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java index 5034fb50e7..cc3dd43987 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.execution.executor; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeAssignerSubtaskExecutor implements PipeSubtaskExecutor {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java index 5034fb50e7..98eaf31d1b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.execution.executor; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeConnectorSubtaskExecutor implements PipeSubtaskExecutor {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java index 5034fb50e7..c61871fafe 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.execution.executor; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeProcessorSubtaskExecutor implements PipeSubtaskExecutor {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java index 5034fb50e7..7d97605dff 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.execution.executor; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public interface PipeSubtaskExecutor {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java new file mode 100644 index 0000000000..4437fb119d --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.execution.executor; + +/** + * PipeTaskExecutor is responsible for executing the pipe tasks, and it is scheduled by the + * PipeTaskScheduler. It is a singleton class. + */ +public class PipeTaskExecutor { + + private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor = + new PipeAssignerSubtaskExecutor(); + private final PipeProcessorSubtaskExecutor processorSubtaskExecutor = + new PipeProcessorSubtaskExecutor(); + private final PipeConnectorSubtaskExecutor connectorSubtaskExecutor = + new PipeConnectorSubtaskExecutor(); + + ///////////////////////// Singleton Instance Holder ///////////////////////// + + private PipeTaskExecutor() {} + + private static class PipeTaskExecutorHolder { + private static PipeTaskExecutor instance = null; + } + + public static PipeTaskExecutor setupAndGetInstance() { + if (PipeTaskExecutorHolder.instance == null) { + PipeTaskExecutorHolder.instance = new PipeTaskExecutor(); + } + return PipeTaskExecutorHolder.instance; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java index 5034fb50e7..2cab31d737 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java @@ -17,22 +17,20 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.execution.scheduler; -public class PipeTaskAgent { +import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public class PipeAssignerSubtaskScheduler implements PipeSubtaskScheduler { + @Override + public void createSubtask(String subtaskId, PipeSubtask subtask) {} - private PipeTaskAgent() {} + @Override + public void dropSubtask(String subtaskId) {} - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } + @Override + public void startSubtask(String subtaskId) {} - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + @Override + public void stopSubtask(String subtaskId) {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java index 5034fb50e7..c53c6b040d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java @@ -17,22 +17,20 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.execution.scheduler; -public class PipeTaskAgent { +import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public class PipeConnectorSubtaskScheduler implements PipeSubtaskScheduler { + @Override + public void createSubtask(String subtaskId, PipeSubtask subtask) {} - private PipeTaskAgent() {} + @Override + public void dropSubtask(String subtaskId) {} - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } + @Override + public void startSubtask(String subtaskId) {} - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + @Override + public void stopSubtask(String subtaskId) {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java index 5034fb50e7..9f5df481b8 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java @@ -17,22 +17,20 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.execution.scheduler; -public class PipeTaskAgent { +import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public class PipeProcessorSubtaskScheduler implements PipeSubtaskScheduler { + @Override + public void createSubtask(String subtaskId, PipeSubtask subtask) {} - private PipeTaskAgent() {} + @Override + public void dropSubtask(String subtaskId) {} - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } + @Override + public void startSubtask(String subtaskId) {} - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + @Override + public void stopSubtask(String subtaskId) {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java index 5034fb50e7..c87f949103 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java @@ -17,22 +17,17 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.execution.scheduler; -public class PipeTaskAgent { +import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public interface PipeSubtaskScheduler { - private PipeTaskAgent() {} + void createSubtask(String subtaskId, PipeSubtask subtask); - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } + void dropSubtask(String subtaskId); - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + void startSubtask(String subtaskId); + + void stopSubtask(String subtaskId); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java new file mode 100644 index 0000000000..cda2cc9466 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.execution.scheduler; + +import org.apache.iotdb.db.pipe.task.PipeTask; + +/** + * PipeTaskScheduler is responsible for scheduling the pipe tasks. It takes the pipe tasks and + * executes them in the PipeTaskExecutor. It is a singleton class. + */ +public class PipeTaskScheduler { + + private final PipeSubtaskScheduler assignerSubtaskScheduler; + private final PipeSubtaskScheduler processorSubtaskScheduler; + private final PipeSubtaskScheduler connectorSubtaskScheduler; + + public void createPipeTask(PipeTask pipeTask) {} + + public void dropPipeTask(String pipeName) {} + + public void startPipeTask(String pipeName) {} + + public void stopPipeTask(String pipeName) {} + + ///////////////////////// Singleton Instance Holder ///////////////////////// + + private PipeTaskScheduler() { + assignerSubtaskScheduler = new PipeAssignerSubtaskScheduler(); + processorSubtaskScheduler = new PipeProcessorSubtaskScheduler(); + connectorSubtaskScheduler = new PipeConnectorSubtaskScheduler(); + } + + private static class PipeTaskSchedulerHolder { + private static PipeTaskScheduler instance = null; + } + + public static PipeTaskScheduler setupAndGetInstance() { + if (PipeTaskSchedulerHolder.instance == null) { + PipeTaskSchedulerHolder.instance = new PipeTaskScheduler(); + } + return PipeTaskSchedulerHolder.instance; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java index 5034fb50e7..25ce3d1142 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.resource; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeFileManager {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java index 5034fb50e7..984c3fbaf2 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.resource; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeRaftlogHolder {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java index 5034fb50e7..04fe515e59 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.resource; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeTsFileHolder {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java index 5034fb50e7..78e0a7c612 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.resource; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeWALHolder {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java similarity index 51% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java index e42b1f66f3..e3a2819deb 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java @@ -17,22 +17,32 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task; -public class PipeRuntimeAgent { +import org.apache.iotdb.db.pipe.task.metrics.PipeTaskRuntimeRecorder; +import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public class PipeTask { - private PipeRuntimeAgent() {} + private final String pipeName; - private static class PipeRuntimeAgentHolder { - private static PipeRuntimeAgent INSTANCE = null; - } + private final PipeTaskStage collectorStage; + private final PipeTaskStage processorStage; + private final PipeTaskStage connectorStage; + + private final PipeTaskRuntimeRecorder runtimeRecorder; + + public PipeTask( + String pipeName, + PipeTaskStage collectorStage, + PipeTaskStage processorStage, + PipeTaskStage connectorStage) { + this.pipeName = pipeName; + + this.collectorStage = collectorStage; + this.processorStage = processorStage; + this.connectorStage = connectorStage; - static PipeRuntimeAgent setupAndGetInstance() { - if (PipeRuntimeAgentHolder.INSTANCE == null) { - PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent(); - } - return PipeRuntimeAgentHolder.INSTANCE; + runtimeRecorder = new PipeTaskRuntimeRecorder(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java index 5034fb50e7..19e5f460ea 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java @@ -17,22 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +/** PipeTaskBuilder is used to build a PipeTask. */ +public class PipeTaskBuilder {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java index 5034fb50e7..6f09614958 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java @@ -17,22 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task.metrics; -public class PipeTaskAgent { - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } - - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } -} +public class PipeTaskRuntimeRecorder {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java similarity index 63% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java index 5034fb50e7..5890801ea6 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java @@ -17,22 +17,14 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task.runnable; -public class PipeTaskAgent { +public class PipeAssignerSubtask extends PipeSubtask { - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; + public PipeAssignerSubtask(String taskID) { + super(taskID); } - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + @Override + public void runMayThrow() throws Throwable {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java similarity index 63% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java index 5034fb50e7..b199607241 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java @@ -17,22 +17,14 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task.runnable; -public class PipeTaskAgent { +public class PipeConnectorSubtask extends PipeSubtask { - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; + public PipeConnectorSubtask(String taskID) { + super(taskID); } - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + @Override + public void runMayThrow() throws Throwable {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java similarity index 63% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java index 5034fb50e7..cfdf0123e1 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java @@ -17,22 +17,14 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task.runnable; -public class PipeTaskAgent { +public class PipeProcessorSubtask extends PipeSubtask { - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskAgent() {} - - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; + public PipeProcessorSubtask(String taskID) { + super(taskID); } - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + @Override + public void runMayThrow() throws Throwable {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java similarity index 63% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java index 5034fb50e7..daebd15e47 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java @@ -17,22 +17,20 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task.runnable; -public class PipeTaskAgent { +import org.apache.iotdb.commons.concurrent.WrappedRunnable; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public abstract class PipeSubtask extends WrappedRunnable { - private PipeTaskAgent() {} + private final String taskID; - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; + public PipeSubtask(String taskID) { + super(); + this.taskID = taskID; } - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; + public String getTaskID() { + return taskID; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java index 5034fb50e7..930a06cc94 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java @@ -17,22 +17,21 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task.stage; -public class PipeTaskAgent { +import org.apache.iotdb.pipe.api.exception.PipeException; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public class PipeTaskCollectorStage implements PipeTaskStage { - private PipeTaskAgent() {} + @Override + public void create() throws PipeException {} - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } + @Override + public void start() throws PipeException {} - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + @Override + public void stop() throws PipeException {} + + @Override + public void drop() throws PipeException {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java copy to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java index 5034fb50e7..fddb99fb5e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java @@ -17,22 +17,21 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task.stage; -public class PipeTaskAgent { +import org.apache.iotdb.pipe.api.exception.PipeException; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public class PipeTaskConnectorStage implements PipeTaskStage { - private PipeTaskAgent() {} + @Override + public void create() throws PipeException {} - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } + @Override + public void start() throws PipeException {} - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + @Override + public void stop() throws PipeException {} + + @Override + public void drop() throws PipeException {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java similarity index 62% rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java rename to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java index 5034fb50e7..5a22721a8c 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java @@ -17,22 +17,21 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task.stage; -public class PipeTaskAgent { +import org.apache.iotdb.pipe.api.exception.PipeException; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public class PipeTaskProcessorStage implements PipeTaskStage { - private PipeTaskAgent() {} + @Override + public void create() throws PipeException {} - private static class PipeTaskAgentHolder { - private static PipeTaskAgent instance = null; - } + @Override + public void start() throws PipeException {} - static PipeTaskAgent setupAndGetInstance() { - if (PipeTaskAgentHolder.instance == null) { - PipeTaskAgentHolder.instance = new PipeTaskAgent(); - } - return PipeTaskAgentHolder.instance; - } + @Override + public void stop() throws PipeException {} + + @Override + public void drop() throws PipeException {} } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java similarity index 52% rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java rename to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java index e42b1f66f3..09ae67ef76 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java @@ -17,22 +17,37 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent; +package org.apache.iotdb.db.pipe.task.stage; -public class PipeRuntimeAgent { +import org.apache.iotdb.pipe.api.exception.PipeException; - ///////////////////////// Singleton Instance Holder ///////////////////////// +public interface PipeTaskStage { - private PipeRuntimeAgent() {} + /** + * Create a pipe task stage. + * + * @throws PipeException if failed to create a pipe task stage. + */ + void create() throws PipeException; - private static class PipeRuntimeAgentHolder { - private static PipeRuntimeAgent INSTANCE = null; - } + /** + * Start a pipe task stage. + * + * @throws PipeException if failed to start a pipe task stage. + */ + void start() throws PipeException; - static PipeRuntimeAgent setupAndGetInstance() { - if (PipeRuntimeAgentHolder.INSTANCE == null) { - PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent(); - } - return PipeRuntimeAgentHolder.INSTANCE; - } + /** + * Stop a pipe task stage. + * + * @throws PipeException if failed to stop a pipe task stage. + */ + void stop() throws PipeException; + + /** + * Drop a pipe task stage. + * + * @throws PipeException if failed to drop a pipe task stage. + */ + void drop() throws PipeException; } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java index 308afe736d..39b4e59b9f 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.service.thrift.impl; -import com.google.common.collect.ImmutableList; import org.apache.iotdb.common.rpc.thrift.*; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonConfig; @@ -113,6 +112,8 @@ import org.apache.iotdb.tsfile.exception.NotImplementedException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.write.record.Tablet; + +import com.google.common.collect.ImmutableList; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1068,7 +1069,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface if (configNodeLocations != null) { ConfigNodeInfo.getInstance() .updateConfigNodeList( - configNodeLocations.parallelStream() + configNodeLocations + .parallelStream() .map(TConfigNodeLocation::getInternalEndPoint) .collect(Collectors.toList())); }
