This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git


The following commit(s) were added to refs/heads/master by this push:
     new a812c4c  Complete the migration of the collector node project (#43)
a812c4c is described below

commit a812c4c819a32a1f248515684da3717ce7b86b30
Author: 0xB <[email protected]>
AuthorDate: Fri Feb 21 15:15:13 2025 +0800

    Complete the migration of the collector node project (#43)
    
    * Complete the migration of the collector node project
    
    * fix compile-check fail
---
 iotdb-collector/collector-core/pom.xml             | 119 ++++++++++
 .../org/apache/iotdb/collector/Application.java    |  93 ++++++++
 .../iotdb/collector/agent/CollectorAgent.java      |  50 ++++
 .../agent/collect/CollectorEventCollector.java     |  48 ++++
 .../executor/CollectorProcessorTaskExecutor.java   |  68 ++++++
 .../agent/executor/CollectorSinkTaskExecutor.java  |  67 ++++++
 .../executor/CollectorSourceTaskExecutor.java      |  67 ++++++
 .../agent/executor/CollectorTaskExecutor.java      |  64 ++++++
 .../agent/executor/CollectorTaskExecutorAgent.java |  53 +++++
 .../agent/plugin/CollectorPluginAgent.java         |  39 ++++
 .../agent/plugin/CollectorPluginConstructor.java   |  77 +++++++
 .../agent/task/CollectorProcessorTask.java         |  88 +++++++
 .../collector/agent/task/CollectorSinkTask.java    |  87 +++++++
 .../collector/agent/task/CollectorSourceTask.java  |  69 ++++++
 .../iotdb/collector/agent/task/CollectorTask.java  |  37 +++
 .../collector/agent/task/CollectorTaskAgent.java   | 117 ++++++++++
 .../collector/api/filter/ApiOriginFilter.java      |  52 +++++
 .../collector/api/impl/PingApiServiceImpl.java     |  38 ++++
 .../api/v1/handler/RequestValidationHandler.java   |  37 +++
 .../collector/api/v1/impl/AdminApiServiceImpl.java |  98 ++++++++
 .../iotdb/collector/config/ApiServiceOptions.java  |  31 +++
 .../iotdb/collector/config/Configuration.java      |  88 +++++++
 .../org/apache/iotdb/collector/config/Options.java | 103 +++++++++
 .../iotdb/collector/config/TrimProperties.java     |  51 +++++
 .../collector/plugin/BuiltinCollectorPlugin.java   |  74 ++++++
 .../builtin/processor/DoNothingProcessor.java      |  64 ++++++
 .../collector/plugin/builtin/sink/SessionSink.java |  68 ++++++
 .../plugin/builtin/source/HttpSource.java          |  79 +++++++
 .../plugin/builtin/source/event/SourceEvent.java   |  45 ++++
 .../apache/iotdb/collector/service/ApiService.java |  96 ++++++++
 .../apache/iotdb/collector/service/IService.java   |  36 +++
 .../src/main/resources/application.properties      |  20 ++
 iotdb-collector/collector-openapi/pom.xml          | 179 +++++++++++++++
 .../src/main/openapi3/iotdb_collector_rest_v1.yaml | 253 +++++++++++++++++++++
 .../collector-openapi/src/main/openapi3/ping.yaml  |  63 +++++
 iotdb-collector/pom.xml                            |  36 +++
 pom.xml                                            | 103 ++++++++-
 37 files changed, 2754 insertions(+), 3 deletions(-)

diff --git a/iotdb-collector/collector-core/pom.xml 
b/iotdb-collector/collector-core/pom.xml
new file mode 100644
index 0000000..ba372d0
--- /dev/null
+++ b/iotdb-collector/collector-core/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-collector</artifactId>
+        <version>1.3.2</version>
+    </parent>
+    <artifactId>collector-core</artifactId>
+    <name>IoTDB: Collector: Core</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>collector-openapi</artifactId>
+            <version>1.3.2</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-http</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.glassfish.jersey.inject</groupId>
+                    <artifactId>jersey-hk2</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>node-commons</artifactId>
+            <version>1.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>service-rpc</artifactId>
+            <version>1.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>pipe-api</artifactId>
+            <version>1.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.servlet</groupId>
+            <artifactId>jakarta.servlet-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.ws.rs</groupId>
+            <artifactId>jakarta.ws.rs-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-http</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-util</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.inject</groupId>
+            <artifactId>jersey-hk2</artifactId>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <usedDependencies>
+                        <!-- For some reason the plugin complains if this 
artifact is included -->
+                        
<usedDependency>org.eclipse.jetty:jetty-http</usedDependency>
+                        
<usedDependency>org.eclipse.jetty:jetty-util</usedDependency>
+                        
<usedDependency>org.glassfish.jersey.inject:jersey-hk2</usedDependency>
+                    </usedDependencies>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
new file mode 100644
index 0000000..6cf6456
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector;
+
+import org.apache.iotdb.collector.config.Configuration;
+import org.apache.iotdb.collector.service.ApiService;
+import org.apache.iotdb.collector.service.IService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+
+public class Application {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Application.class);
+
+  private final Configuration configuration = new Configuration();
+  private final LinkedList<IService> services = new LinkedList<>();
+
+  private Application() {
+    services.add(new ApiService());
+  }
+
+  public static void main(String[] args) {
+    LOGGER.info("[Application] Starting ...");
+    final long startTime = System.currentTimeMillis();
+
+    final Application application = new Application();
+
+    application.logAllOptions();
+    application.registerShutdownHook();
+    application.startServices();
+
+    LOGGER.info(
+        "[Application] Successfully started in {}ms", 
System.currentTimeMillis() - startTime);
+  }
+
+  private void logAllOptions() {
+    configuration.logAllOptions();
+  }
+
+  private void registerShutdownHook() {
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread(
+                () -> {
+                  LOGGER.warn("[Application] Exiting ...");
+
+                  for (final IService service : services) {
+                    try {
+                      service.stop();
+                    } catch (final Exception e) {
+                      LOGGER.warn(
+                          "[{}] Unexpected exception occurred when stopping: 
{}",
+                          service.name(),
+                          e.getMessage(),
+                          e);
+                    }
+                  }
+
+                  LOGGER.warn(
+                      "[Application] JVM report: total memory {}, free memory 
{}, used memory {}",
+                      Runtime.getRuntime().totalMemory(),
+                      Runtime.getRuntime().freeMemory(),
+                      Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory());
+                  LOGGER.warn("[Application] Exited.");
+                }));
+  }
+
+  private void startServices() {
+    for (final IService service : services) {
+      service.start();
+    }
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
new file mode 100644
index 0000000..bca4106
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent;
+
+import org.apache.iotdb.collector.agent.executor.CollectorTaskExecutorAgent;
+import org.apache.iotdb.collector.agent.plugin.CollectorPluginAgent;
+import org.apache.iotdb.collector.agent.task.CollectorTaskAgent;
+
+public class CollectorAgent {
+
+  private final CollectorTaskAgent collectorTaskAgent = 
CollectorTaskAgent.instance();
+  private final CollectorTaskExecutorAgent collectorTaskExecutorAgent =
+      CollectorTaskExecutorAgent.instance();
+  private final CollectorPluginAgent collectorPluginAgent = 
CollectorPluginAgent.instance();
+
+  private CollectorAgent() {}
+
+  public static CollectorTaskAgent task() {
+    return CollectorAgentHolder.INSTANCE.collectorTaskAgent;
+  }
+
+  public static CollectorTaskExecutorAgent executor() {
+    return CollectorAgentHolder.INSTANCE.collectorTaskExecutorAgent;
+  }
+
+  public static CollectorPluginAgent plugin() {
+    return CollectorAgentHolder.INSTANCE.collectorPluginAgent;
+  }
+
+  private static class CollectorAgentHolder {
+    private static final CollectorAgent INSTANCE = new CollectorAgent();
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java
new file mode 100644
index 0000000..056685e
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.collect;
+
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+public class CollectorEventCollector implements EventCollector {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorEventCollector.class);
+
+  private final BlockingQueue<Event> pendingQueue;
+
+  public CollectorEventCollector(final BlockingQueue<Event> pendingQueue) {
+    this.pendingQueue = pendingQueue;
+  }
+
+  @Override
+  public void collect(final Event event) {
+    try {
+      pendingQueue.put(event);
+    } catch (final InterruptedException e) {
+      LOGGER.warn("collect event failed because {}", e.getMessage(), e);
+    }
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
new file mode 100644
index 0000000..385fd60
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorProcessorTaskExecutor extends CollectorTaskExecutor {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(CollectorProcessorTaskExecutor.class);
+
+  private static final Map<String, ExecutorService> PROCESSOR_EXECUTOR = new 
ConcurrentHashMap<>();
+  private static final Map<String, CollectorTask> PROCESSOR_TASK_MAP = new 
ConcurrentHashMap<>();
+
+  public boolean validateIfAbsent(final String taskId) {
+    return !PROCESSOR_EXECUTOR.containsKey(taskId) && 
!PROCESSOR_TASK_MAP.containsKey(taskId);
+  }
+
+  @Override
+  public Optional<ExecutorService> getExecutor(final String taskId) {
+    return Optional.of(
+        
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-processor-executor-" 
+ taskId));
+  }
+
+  @Override
+  public void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService) {
+    final String taskId = collectorTask.getTaskId();
+    PROCESSOR_EXECUTOR.putIfAbsent(taskId, executorService);
+    PROCESSOR_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+    LOGGER.info("register collector processor task {}", taskId);
+  }
+
+  @Override
+  public void eraseExecution(final String taskId) {
+    PROCESSOR_TASK_MAP.remove(taskId).stop();
+    PROCESSOR_EXECUTOR.remove(taskId).shutdownNow();
+
+    LOGGER.info("deregister collector processor task {}", taskId);
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
new file mode 100644
index 0000000..a490e61
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorSinkTaskExecutor extends CollectorTaskExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorSinkTaskExecutor.class);
+
+  private static final Map<String, ExecutorService> SINK_EXECUTOR = new 
ConcurrentHashMap<>();
+  private static final Map<String, CollectorTask> SINK_TASK_MAP = new 
ConcurrentHashMap<>();
+
+  public boolean validateIfAbsent(final String taskId) {
+    return !SINK_EXECUTOR.containsKey(taskId) && 
!SINK_TASK_MAP.containsKey(taskId);
+  }
+
+  @Override
+  public Optional<ExecutorService> getExecutor(final String taskId) {
+    return Optional.of(
+        
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-sink-executor-" + 
taskId));
+  }
+
+  @Override
+  public void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService) {
+    final String taskId = collectorTask.getTaskId();
+    SINK_EXECUTOR.putIfAbsent(taskId, executorService);
+    SINK_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+    LOGGER.info("register collector sink task {}", taskId);
+  }
+
+  @Override
+  public void eraseExecution(final String taskId) {
+    SINK_TASK_MAP.remove(taskId).stop();
+    SINK_EXECUTOR.remove(taskId).shutdownNow();
+
+    LOGGER.info("deregister collector sink task {}", taskId);
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
new file mode 100644
index 0000000..fde8236
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorSourceTaskExecutor extends CollectorTaskExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorSourceTaskExecutor.class);
+
+  private static final Map<String, ExecutorService> SOURCE_EXECUTOR = new 
ConcurrentHashMap<>();
+  private static final Map<String, CollectorTask> SOURCE_TASK_MAP = new 
ConcurrentHashMap<>();
+
+  public boolean validateIfAbsent(final String taskId) {
+    return !SOURCE_EXECUTOR.containsKey(taskId) && 
!SOURCE_TASK_MAP.containsKey(taskId);
+  }
+
+  @Override
+  public Optional<ExecutorService> getExecutor(final String taskId) {
+    return Optional.of(
+        
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-source-executor-" + 
taskId));
+  }
+
+  @Override
+  public void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService) {
+    final String taskId = collectorTask.getTaskId();
+    SOURCE_EXECUTOR.put(taskId, executorService);
+    SOURCE_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+    LOGGER.info("register collector source task {}", taskId);
+  }
+
+  @Override
+  public void eraseExecution(String taskId) {
+    SOURCE_TASK_MAP.remove(taskId).stop();
+    SOURCE_EXECUTOR.remove(taskId).shutdownNow();
+
+    LOGGER.info("deregister collector source task {}", taskId);
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
new file mode 100644
index 0000000..8100766
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+public abstract class CollectorTaskExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorTaskExecutor.class);
+
+  public void register(final CollectorTask collectorTask) {
+    if (validateIfAbsent(collectorTask.getTaskId())) {
+      getExecutor(collectorTask.getTaskId())
+          .ifPresent(
+              executor -> {
+                executor.submit(collectorTask);
+                LOGGER.info("register success {}", collectorTask.getTaskId());
+                recordExecution(collectorTask, executor);
+              });
+    } else {
+      LOGGER.warn("task {} has existed", collectorTask.getTaskId());
+    }
+  }
+
+  public abstract boolean validateIfAbsent(final String taskId);
+
+  public abstract Optional<ExecutorService> getExecutor(final String taskId);
+
+  public abstract void recordExecution(
+      final CollectorTask collectorTask, final ExecutorService 
executorService);
+
+  public void deregister(final String taskId) {
+    if (!validateIfAbsent(taskId)) {
+      eraseExecution(taskId);
+    } else {
+      LOGGER.warn("task {} has not existed", taskId);
+    }
+  }
+
+  public abstract void eraseExecution(final String taskId);
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
new file mode 100644
index 0000000..5adcace
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+public class CollectorTaskExecutorAgent {
+
+  private final CollectorSourceTaskExecutor sourceTaskExecutor;
+  private final CollectorProcessorTaskExecutor processorTaskExecutor;
+  private final CollectorSinkTaskExecutor sinkTaskExecutor;
+
+  private CollectorTaskExecutorAgent() {
+    sourceTaskExecutor = new CollectorSourceTaskExecutor();
+    processorTaskExecutor = new CollectorProcessorTaskExecutor();
+    sinkTaskExecutor = new CollectorSinkTaskExecutor();
+  }
+
+  public CollectorSourceTaskExecutor getSourceTaskExecutor() {
+    return CollectorTaskExecutorAgentHolder.INSTANCE.sourceTaskExecutor;
+  }
+
+  public CollectorProcessorTaskExecutor getProcessorTaskExecutor() {
+    return CollectorTaskExecutorAgentHolder.INSTANCE.processorTaskExecutor;
+  }
+
+  public CollectorSinkTaskExecutor getSinkTaskExecutor() {
+    return CollectorTaskExecutorAgentHolder.INSTANCE.sinkTaskExecutor;
+  }
+
+  public static CollectorTaskExecutorAgent instance() {
+    return CollectorTaskExecutorAgentHolder.INSTANCE;
+  }
+
+  private static class CollectorTaskExecutorAgentHolder {
+    private static final CollectorTaskExecutorAgent INSTANCE = new 
CollectorTaskExecutorAgent();
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java
new file mode 100644
index 0000000..33ee5ca
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.plugin;
+
+public class CollectorPluginAgent {
+  private final CollectorPluginConstructor collectorPluginConstructor =
+      CollectorPluginConstructor.instance();
+
+  private CollectorPluginAgent() {}
+
+  public CollectorPluginConstructor constructor() {
+    return CollectorPluginAgentHolder.INSTANCE.collectorPluginConstructor;
+  }
+
+  public static CollectorPluginAgent instance() {
+    return new CollectorPluginAgent();
+  }
+
+  private static class CollectorPluginAgentHolder {
+    private static final CollectorPluginAgent INSTANCE = new 
CollectorPluginAgent();
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
new file mode 100644
index 0000000..cce94f6
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.plugin;
+
+import org.apache.iotdb.collector.plugin.BuiltinCollectorPlugin;
+import org.apache.iotdb.collector.plugin.builtin.processor.DoNothingProcessor;
+import org.apache.iotdb.collector.plugin.builtin.sink.SessionSink;
+import org.apache.iotdb.collector.plugin.builtin.source.HttpSource;
+import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.PipeSink;
+import org.apache.iotdb.pipe.api.PipeSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class CollectorPluginConstructor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorPluginConstructor.class);
+
+  protected final Map<String, Supplier<PipePlugin>> pluginConstructors = new 
HashMap<>();
+
+  private CollectorPluginConstructor() {
+    initConstructors();
+  }
+
+  private void initConstructors() {
+    pluginConstructors.put(
+        BuiltinCollectorPlugin.HTTP_SOURCE.getCollectorPluginName(), 
HttpSource::new);
+    pluginConstructors.put(
+        BuiltinCollectorPlugin.DO_NOTHING_PROCESSOR.getCollectorPluginName(),
+        DoNothingProcessor::new);
+    pluginConstructors.put(
+        BuiltinCollectorPlugin.IOTDB_SESSION_SINK.getCollectorPluginName(), 
SessionSink::new);
+    LOGGER.info("builtin plugin has been initialized");
+  }
+
+  public PipeSource getSource(final String pluginName) {
+    return (PipeSource) pluginConstructors.get(pluginName).get();
+  }
+
+  public PipeProcessor getProcessor(final String pluginName) {
+    return (PipeProcessor) pluginConstructors.get(pluginName).get();
+  }
+
+  public PipeSink getSink(final String pluginName) {
+    return (PipeSink) pluginConstructors.get(pluginName).get();
+  }
+
+  public static CollectorPluginConstructor instance() {
+    return CollectorPluginConstructorHolder.INSTANCE;
+  }
+
+  private static class CollectorPluginConstructorHolder {
+    private static final CollectorPluginConstructor INSTANCE = new 
CollectorPluginConstructor();
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
new file mode 100644
index 0000000..8d1acfc
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.collector.agent.collect.CollectorEventCollector;
+import org.apache.iotdb.commons.pipe.task.EventSupplier;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class CollectorProcessorTask extends CollectorTask {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorProcessorTask.class);
+
+  private final Map<String, String> processorAttribute;
+  private final PipeProcessor pipeProcessor;
+  private final EventSupplier eventSupplier;
+  private final BlockingQueue<Event> pendingQueue;
+  private final CollectorEventCollector collectorEventCollector;
+  private boolean isStarted = true;
+
+  public CollectorProcessorTask(
+      final String taskId,
+      final Map<String, String> processorAttribute,
+      final PipeProcessor pipeProcessor,
+      final EventSupplier eventSupplier,
+      final BlockingQueue<Event> pendingQueue) {
+    super(taskId);
+    this.processorAttribute = processorAttribute;
+    this.pipeProcessor = pipeProcessor;
+    this.eventSupplier = eventSupplier;
+    this.pendingQueue = pendingQueue;
+    this.collectorEventCollector = new CollectorEventCollector(pendingQueue);
+  }
+
+  @Override
+  public void runMayThrow() {
+    while (isStarted) {
+      try {
+        pipeProcessor.process(eventSupplier.supply(), collectorEventCollector);
+      } catch (final Exception e) {
+        LOGGER.warn("error occur while processing event because {}", 
e.getMessage());
+      }
+    }
+  }
+
+  public Map<String, String> getProcessorAttribute() {
+    return processorAttribute;
+  }
+
+  public PipeProcessor getPipeProcessor() {
+    return pipeProcessor;
+  }
+
+  public EventSupplier getEventSupplier() {
+    return eventSupplier;
+  }
+
+  public BlockingQueue<Event> getPendingQueue() {
+    return pendingQueue;
+  }
+
+  public void stop() {
+    isStarted = false;
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
new file mode 100644
index 0000000..77e65b4
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.pipe.api.PipeSink;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class CollectorSinkTask extends CollectorTask {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorSinkTask.class);
+
+  private final Map<String, String> sinkAttribute;
+  private final PipeSink pipeSink;
+  private final BlockingQueue<Event> pendingQueue;
+  private boolean isStarted = true;
+
+  public CollectorSinkTask(
+      final String taskId,
+      final Map<String, String> sinkAttribute,
+      final PipeSink pipeSink,
+      final BlockingQueue<Event> pendingQueue) {
+    super(taskId);
+    this.sinkAttribute = sinkAttribute;
+    this.pipeSink = pipeSink;
+    this.pendingQueue = pendingQueue;
+  }
+
+  @Override
+  public void runMayThrow() {
+    try {
+      pipeSink.handshake();
+    } catch (final Exception e) {
+      LOGGER.warn("handshake fail because {}", e.getMessage());
+    }
+    isStarted = true;
+    while (isStarted) {
+      try {
+        final Event event = pendingQueue.take();
+        pipeSink.transfer(event);
+        LOGGER.info("transfer event {} success, remain number is {}", event, 
pendingQueue.size());
+      } catch (final InterruptedException e) {
+        LOGGER.warn("interrupted while waiting for take a event");
+      } catch (final Exception e) {
+        LOGGER.warn("error occur while transfer event to endpoint");
+      }
+    }
+  }
+
+  public Map<String, String> getSinkAttribute() {
+    return sinkAttribute;
+  }
+
+  public PipeSink getPipeSink() {
+    return pipeSink;
+  }
+
+  public void stop() {
+    isStarted = false;
+  }
+
+  public BlockingQueue<Event> getPendingQueue() {
+    return pendingQueue;
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
new file mode 100644
index 0000000..26269c3
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.commons.pipe.task.EventSupplier;
+import org.apache.iotdb.pipe.api.PipeSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class CollectorSourceTask extends CollectorTask {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorSourceTask.class);
+
+  private final Map<String, String> sourceAttribute;
+  private final PipeSource pipeSource;
+
+  public CollectorSourceTask(
+      final String taskId, final Map<String, String> sourceAttribute, final 
PipeSource pipeSource) {
+    super(taskId);
+    this.sourceAttribute = sourceAttribute;
+    this.pipeSource = pipeSource;
+  }
+
+  @Override
+  public void runMayThrow() throws Throwable {
+    pipeSource.start();
+  }
+
+  public Map<String, String> getSourceAttribute() {
+    return sourceAttribute;
+  }
+
+  public PipeSource getPipeSource() {
+    return pipeSource;
+  }
+
+  public EventSupplier getEventSupplier() {
+    return pipeSource::supply;
+  }
+
+  @Override
+  public void stop() {
+    try {
+      pipeSource.close();
+    } catch (final Exception e) {
+      LOGGER.warn("failed to close pipe source {}", pipeSource, e);
+    }
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java
new file mode 100644
index 0000000..4f13ff9
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+
+public abstract class CollectorTask extends WrappedRunnable {
+
+  protected final String taskId;
+
+  protected CollectorTask(final String taskId) {
+    this.taskId = taskId;
+  }
+
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public abstract void stop();
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
new file mode 100644
index 0000000..9cd787d
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import 
org.apache.iotdb.collector.agent.executor.CollectorProcessorTaskExecutor;
+import org.apache.iotdb.collector.agent.executor.CollectorSinkTaskExecutor;
+import org.apache.iotdb.collector.agent.executor.CollectorSourceTaskExecutor;
+import org.apache.iotdb.collector.agent.executor.CollectorTaskExecutorAgent;
+import org.apache.iotdb.collector.agent.plugin.CollectorPluginAgent;
+import org.apache.iotdb.collector.agent.plugin.CollectorPluginConstructor;
+import org.apache.iotdb.collector.plugin.BuiltinCollectorPlugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class CollectorTaskAgent {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectorTaskAgent.class);
+
+  private static final CollectorPluginConstructor CONSTRUCTOR =
+      CollectorPluginAgent.instance().constructor();
+  private static final CollectorSourceTaskExecutor SOURCE_TASK_EXECUTOR =
+      CollectorTaskExecutorAgent.instance().getSourceTaskExecutor();
+  private static final CollectorProcessorTaskExecutor PROCESSOR_TASK_EXECUTOR =
+      CollectorTaskExecutorAgent.instance().getProcessorTaskExecutor();
+  private static final CollectorSinkTaskExecutor SINK_TASK_EXECUTOR =
+      CollectorTaskExecutorAgent.instance().getSinkTaskExecutor();
+
+  private CollectorTaskAgent() {}
+
+  public boolean createCollectorTask(
+      final Map<String, String> sourceAttribute,
+      final Map<String, String> processorAttribute,
+      final Map<String, String> sinkAttribute,
+      final String taskId) {
+    try {
+      final CollectorSourceTask collectorSourceTask =
+          new CollectorSourceTask(
+              taskId,
+              sourceAttribute,
+              CONSTRUCTOR.getSource(
+                  sourceAttribute.getOrDefault(
+                      "source-plugin",
+                      
BuiltinCollectorPlugin.HTTP_SOURCE.getCollectorPluginName())));
+      SOURCE_TASK_EXECUTOR.register(collectorSourceTask);
+
+      final CollectorProcessorTask collectorProcessorTask =
+          new CollectorProcessorTask(
+              taskId,
+              processorAttribute,
+              CONSTRUCTOR.getProcessor(
+                  processorAttribute.getOrDefault(
+                      "processor-plugin",
+                      
BuiltinCollectorPlugin.DO_NOTHING_PROCESSOR.getCollectorPluginName())),
+              collectorSourceTask.getEventSupplier(),
+              new LinkedBlockingQueue<>());
+      PROCESSOR_TASK_EXECUTOR.register(collectorProcessorTask);
+
+      final CollectorSinkTask collectorSinkTask =
+          new CollectorSinkTask(
+              taskId,
+              sinkAttribute,
+              CONSTRUCTOR.getSink(
+                  sinkAttribute.getOrDefault(
+                      "sink-plugin",
+                      
BuiltinCollectorPlugin.IOTDB_SESSION_SINK.getCollectorPluginName())),
+              collectorProcessorTask.getPendingQueue());
+      SINK_TASK_EXECUTOR.register(collectorSinkTask);
+    } catch (final Exception e) {
+      LOGGER.warn("create collector task error", e);
+      return false;
+    }
+
+    return true;
+  }
+
+  public boolean stopCollectorTask(final String taskId) {
+    try {
+      SOURCE_TASK_EXECUTOR.deregister(taskId);
+      PROCESSOR_TASK_EXECUTOR.deregister(taskId);
+      SINK_TASK_EXECUTOR.deregister(taskId);
+    } catch (final Exception e) {
+      LOGGER.warn("stop collector task error", e);
+      return false;
+    }
+
+    return true;
+  }
+
+  public static CollectorTaskAgent instance() {
+    return CollectorTaskAgentHolder.INSTANCE;
+  }
+
+  private static class CollectorTaskAgentHolder {
+    private static final CollectorTaskAgent INSTANCE = new 
CollectorTaskAgent();
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/filter/ApiOriginFilter.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/filter/ApiOriginFilter.java
new file mode 100644
index 0000000..008e309
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/filter/ApiOriginFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.filter;
+
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+
+public class ApiOriginFilter implements javax.servlet.Filter {
+  @Override
+  public void doFilter(
+      final ServletRequest request, final ServletResponse response, final 
FilterChain chain)
+      throws IOException, ServletException {
+    final HttpServletResponse res = (HttpServletResponse) response;
+    res.addHeader("Access-Control-Allow-Origin", "*");
+    res.addHeader("Access-Control-Allow-Methods", "GET, POST");
+    res.addHeader("Access-Control-Allow-Headers", "*");
+    chain.doFilter(request, response);
+  }
+
+  @Override
+  public void destroy() {
+    // do nothing
+  }
+
+  @Override
+  public void init(final FilterConfig filterConfig) throws ServletException {
+    // do nothing
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java
new file mode 100644
index 0000000..7d73e38
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.collector.api.impl;
+
+import org.apache.iotdb.collector.api.PingApiService;
+import org.apache.iotdb.collector.api.v1.model.ExecutionStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+public class PingApiServiceImpl extends PingApiService {
+
+  @Override
+  public Response tryPing(final SecurityContext securityContext) {
+    return Response.ok()
+        .entity(
+            new ExecutionStatus()
+                .code(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+                .message(TSStatusCode.SUCCESS_STATUS.name()))
+        .build();
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java
new file mode 100644
index 0000000..9456c2f
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.handler;
+
+import org.apache.iotdb.collector.api.v1.model.CreatePipeRequest;
+import org.apache.iotdb.collector.api.v1.model.StopPipeRequest;
+
+import java.util.Objects;
+
+public class RequestValidationHandler {
+  private RequestValidationHandler() {}
+
+  public static void validateCreateRequest(final CreatePipeRequest 
createPipeRequest) {
+    Objects.requireNonNull(createPipeRequest.getTaskId(), "taskId cannot be 
null");
+  }
+
+  public static void validateStopRequest(final StopPipeRequest 
stopPipeRequest) {
+    Objects.requireNonNull(stopPipeRequest.getTaskId(), "taskId cannot be 
null");
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
new file mode 100644
index 0000000..07d1f3e
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.impl;
+
+import org.apache.iotdb.collector.agent.CollectorAgent;
+import org.apache.iotdb.collector.api.v1.AdminApiService;
+import org.apache.iotdb.collector.api.v1.NotFoundException;
+import org.apache.iotdb.collector.api.v1.handler.RequestValidationHandler;
+import org.apache.iotdb.collector.api.v1.model.AlterPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.CreatePipeRequest;
+import org.apache.iotdb.collector.api.v1.model.DropPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.StartPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.StopPipeRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+public class AdminApiServiceImpl extends AdminApiService {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AdminApiServiceImpl.class);
+
+  @Override
+  public Response alterPipe(
+      final AlterPipeRequest alterPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    return Response.ok("alterPipe").build();
+  }
+
+  @Override
+  public Response createPipe(
+      final CreatePipeRequest createPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    RequestValidationHandler.validateCreateRequest(createPipeRequest);
+
+    final boolean createdResult =
+        CollectorAgent.task()
+            .createCollectorTask(
+                createPipeRequest.getSourceAttribute(),
+                createPipeRequest.getProcessorAttribute(),
+                createPipeRequest.getSinkAttribute(),
+                createPipeRequest.getTaskId());
+    if (createdResult) {
+      LOGGER.info("Create task successful");
+      return Response.status(Response.Status.OK).entity("create task 
success").build();
+    }
+    LOGGER.warn("Create task failed");
+    return Response.status(Response.Status.BAD_REQUEST).entity("create task 
fail").build();
+  }
+
+  @Override
+  public Response dropPipe(
+      final DropPipeRequest dropPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    return Response.ok("dropPipe").build();
+  }
+
+  @Override
+  public Response startPipe(
+      final StartPipeRequest startPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    return Response.ok("startPipe").build();
+  }
+
+  @Override
+  public Response stopPipe(
+      final StopPipeRequest stopPipeRequest, final SecurityContext 
securityContext)
+      throws NotFoundException {
+    RequestValidationHandler.validateStopRequest(stopPipeRequest);
+
+    final boolean stopResult = 
CollectorAgent.task().stopCollectorTask(stopPipeRequest.getTaskId());
+    if (stopResult) {
+      LOGGER.info("Stop task: {} successful", stopPipeRequest.getTaskId());
+      return Response.ok().entity("stop task: " + stopPipeRequest.getTaskId() 
+ " success").build();
+    }
+    LOGGER.warn("Stop task: {} failed", stopPipeRequest.getTaskId());
+    return Response.status(Response.Status.BAD_REQUEST).entity("stop task 
fail").build();
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
new file mode 100644
index 0000000..2e3ee55
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+public class ApiServiceOptions extends Options {
+
+  public static final Option<Integer> PORT =
+      new Option<Integer>("api_service_port", 17070) {
+        @Override
+        public void setValue(String valueString) {
+          value = Integer.parseInt(valueString);
+        }
+      };
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Configuration.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Configuration.java
new file mode 100644
index 0000000..115c1c0
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Configuration.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.Properties;
+
+public class Configuration {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Configuration.class);
+
+  private static final String CONFIG_FILE_NAME = "application.properties";
+
+  private final Options options = new Options();
+
+  public Configuration() {
+    loadProps();
+  }
+
+  private void loadProps() {
+    final Optional<URL> url = getPropsUrl();
+    if (url.isPresent()) {
+      try (final InputStream inputStream = url.get().openStream()) {
+        LOGGER.info("Start to read config file {}", url.get());
+        final Properties properties = new Properties();
+        properties.load(new InputStreamReader(inputStream, 
StandardCharsets.UTF_8));
+        final TrimProperties trimProperties = new TrimProperties();
+        trimProperties.putAll(properties);
+        options.loadProperties(trimProperties);
+      } catch (final FileNotFoundException e) {
+        LOGGER.error("Fail to find config file, reject startup.", e);
+        System.exit(-1);
+      } catch (final IOException e) {
+        LOGGER.error("IO exception when reading config file, reject startup.", 
e);
+        System.exit(-1);
+      } catch (final Exception e) {
+        LOGGER.error("Unexpected exception when reading config file, reject 
startup.", e);
+        System.exit(-1);
+      }
+    } else {
+      LOGGER.warn("{} is not found, use default configuration", 
CONFIG_FILE_NAME);
+    }
+  }
+
+  private Optional<URL> getPropsUrl() {
+    final URL url = Options.class.getResource("/" + CONFIG_FILE_NAME);
+
+    if (url != null) {
+      return Optional.of(url);
+    } else {
+      LOGGER.warn(
+          "Cannot find IOTDB_COLLECTOR_HOME or IOTDB_COLLECTOR_CONF 
environment variable when loading "
+              + "config file {}, use default configuration",
+          CONFIG_FILE_NAME);
+      return Optional.empty();
+    }
+  }
+
+  public void logAllOptions() {
+    options.logAllOptions();
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
new file mode 100644
index 0000000..8ad2887
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Options {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Options.class);
+
+  private static final Map<String, Option<?>> OPTIONS = new 
ConcurrentHashMap<>();
+
+  public abstract static class Option<T> {
+
+    private final String key;
+    private final T defaultValue;
+    protected T value;
+
+    Option(final String key, final T defaultValue) {
+      this.key = key;
+      this.defaultValue = defaultValue;
+
+      OPTIONS.put(key, this);
+    }
+
+    public String key() {
+      return key;
+    }
+
+    public boolean hasDefaultValue() {
+      return defaultValue != null;
+    }
+
+    public T defaultValue() {
+      return defaultValue;
+    }
+
+    public T value() {
+      return value == null ? defaultValue : value;
+    }
+
+    public abstract void setValue(final String valueString);
+
+    @Override
+    public String toString() {
+      return key + " = " + value();
+    }
+  }
+
+  public void loadProperties(final TrimProperties properties) {
+    properties
+        .stringPropertyNames()
+        .forEach(
+            key -> {
+              final Option<?> option = OPTIONS.get(key);
+              if (option != null) {
+                try {
+                  option.setValue(properties.getProperty(key));
+                } catch (final Exception e) {
+                  LOGGER.warn(
+                      "Unexpected exception when setting value for option: {}, 
given value: {}",
+                      key,
+                      properties.getProperty(key),
+                      e);
+                }
+              }
+            });
+  }
+
+  public Optional<Option<?>> getOption(final String key) {
+    return Optional.ofNullable(OPTIONS.get(key));
+  }
+
+  public void logAllOptions() {
+    LOGGER.info("========================== OPTIONS 
==========================");
+    for (final Option<?> option : OPTIONS.values()) {
+      LOGGER.info(option.toString());
+    }
+    
LOGGER.info("=============================================================");
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TrimProperties.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TrimProperties.java
new file mode 100644
index 0000000..1cf766d
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TrimProperties.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+import java.util.Optional;
+import java.util.Properties;
+
+public class TrimProperties extends Properties {
+  @Override
+  public synchronized Object get(Object key) {
+    Object value = super.get(key);
+    if (value instanceof String) {
+      return ((String) value).trim();
+    }
+    return value;
+  }
+
+  @Override
+  public synchronized Object put(Object key, Object value) {
+    if (value instanceof String) {
+      value = ((String) value).trim();
+    }
+    return super.put(key, value);
+  }
+
+  @Override
+  public synchronized String getProperty(String key, String defaultValue) {
+    String val = getProperty(key);
+    if (defaultValue != null) {
+      defaultValue = defaultValue.trim();
+    }
+    return Optional.ofNullable(val).map(String::trim).orElse(defaultValue);
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinCollectorPlugin.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinCollectorPlugin.java
new file mode 100644
index 0000000..2d676e5
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinCollectorPlugin.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin;
+
+import org.apache.iotdb.collector.plugin.builtin.processor.DoNothingProcessor;
+import org.apache.iotdb.collector.plugin.builtin.sink.SessionSink;
+import org.apache.iotdb.collector.plugin.builtin.source.HttpSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public enum BuiltinCollectorPlugin {
+
+  // Sources
+  HTTP_SOURCE("http-source", HttpSource.class),
+
+  // Processors
+  DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
+
+  // Sinks
+  IOTDB_SESSION_SINK("iotdb-session-sink", SessionSink.class);
+
+  private final String collectorPluginName;
+  private final Class<?> collectorPluginClass;
+  private final String className;
+
+  BuiltinCollectorPlugin(String collectorPluginName, Class<?> 
collectorPluginClass) {
+    this.collectorPluginName = collectorPluginName;
+    this.collectorPluginClass = collectorPluginClass;
+    this.className = collectorPluginClass.getName();
+  }
+
+  public String getCollectorPluginName() {
+    return collectorPluginName;
+  }
+
+  public Class<?> getCollectorPluginClass() {
+    return collectorPluginClass;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public static final Set<String> SHOW_COLLECTOR_PLUGINS_BLACKLIST =
+      Collections.unmodifiableSet(
+          new HashSet<>(
+              Arrays.asList(
+                  // Sources
+                  HTTP_SOURCE.getCollectorPluginName().toUpperCase(),
+                  // Processors
+                  DO_NOTHING_PROCESSOR.getCollectorPluginName().toUpperCase(),
+                  // Sinks
+                  IOTDB_SESSION_SINK.getCollectorPluginName().toUpperCase())));
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java
new file mode 100644
index 0000000..436fc20
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.builtin.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DoNothingProcessor implements PipeProcessor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DoNothingProcessor.class);
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    // Do Nothing
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeProcessorRuntimeConfiguration configuration)
+      throws Exception {
+    // Do Nothing
+  }
+
+  @Override
+  public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
+      throws Exception {
+    // Do Nothing
+  }
+
+  @Override
+  public void process(Event event, EventCollector eventCollector) throws 
Exception {
+    LOGGER.info("DoNothingProcessor process event: {}", event);
+    eventCollector.collect(event);
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Do Nothing
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/SessionSink.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/SessionSink.java
new file mode 100644
index 0000000..e2a5260
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/SessionSink.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.builtin.sink;
+
+import org.apache.iotdb.collector.plugin.builtin.source.event.SourceEvent;
+import org.apache.iotdb.pipe.api.PipeSink;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeSinkRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SessionSink implements PipeSink {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SessionSink.class);
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {}
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
+      throws Exception {}
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeSinkRuntimeConfiguration configuration)
+      throws Exception {}
+
+  @Override
+  public void handshake() throws Exception {
+    LOGGER.info("SessionSink handshake successfully");
+  }
+
+  @Override
+  public void heartbeat() throws Exception {}
+
+  @Override
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {}
+
+  @Override
+  public void transfer(Event event) throws Exception {
+    final SourceEvent sourceEvent = (SourceEvent) event;
+    LOGGER.info("SessionSink transfer successfully {}", sourceEvent);
+  }
+
+  @Override
+  public void close() throws Exception {}
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpSource.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpSource.java
new file mode 100644
index 0000000..65ba79b
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpSource.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.builtin.source;
+
+import org.apache.iotdb.collector.plugin.builtin.source.event.SourceEvent;
+import org.apache.iotdb.pipe.api.PipeSource;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class HttpSource implements PipeSource {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HttpSource.class);
+
+  private static final BlockingQueue<Event> queue = new 
LinkedBlockingQueue<>();
+  private boolean isStarted = true;
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {}
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
+      throws Exception {}
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeSourceRuntimeConfiguration configuration)
+      throws Exception {}
+
+  @Override
+  public void start() {
+    isStarted = true;
+    while (isStarted) {
+      Event event = new SourceEvent(String.valueOf(new 
Random().nextInt(1000)));
+      try {
+        queue.put(event);
+        Thread.sleep(1000);
+        LOGGER.info("event: {} created success", event);
+      } catch (final InterruptedException e) {
+        LOGGER.warn("failed to create event because {}", e.getMessage());
+      }
+    }
+  }
+
+  @Override
+  public Event supply() throws Exception {
+    return queue.take();
+  }
+
+  @Override
+  public void close() throws Exception {
+    isStarted = false;
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SourceEvent.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SourceEvent.java
new file mode 100644
index 0000000..813e25e
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SourceEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.builtin.source.event;
+
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class SourceEvent implements Event {
+  private String name;
+
+  public SourceEvent() {}
+
+  public SourceEvent(final String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(final String name) {
+    this.name = name;
+  }
+
+  @Override
+  public String toString() {
+    return "SourceEvent [name=" + name + "]";
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/ApiService.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/ApiService.java
new file mode 100644
index 0000000..b079014
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/ApiService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.service;
+
+import org.apache.iotdb.collector.api.filter.ApiOriginFilter;
+import org.apache.iotdb.collector.config.ApiServiceOptions;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.DispatcherType;
+
+import java.util.EnumSet;
+
+public class ApiService implements IService {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ApiService.class);
+
+  private Server server;
+
+  @Override
+  public void start() {
+    server = new Server(ApiServiceOptions.PORT.value());
+    server.setHandler(constructServletContextHandler());
+    try {
+      server.start();
+      LOGGER.info(
+          "[ApiService] Started successfully. Listening on port {}",
+          ApiServiceOptions.PORT.value());
+    } catch (final Exception e) {
+      LOGGER.warn("[ApiService] Failed to start: {}", e.getMessage(), e);
+      server.destroy();
+    }
+  }
+
+  private ServletContextHandler constructServletContextHandler() {
+    final ServletContextHandler context =
+        new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+    context.addFilter(
+        ApiOriginFilter.class, "/*", EnumSet.of(DispatcherType.INCLUDE, 
DispatcherType.REQUEST));
+    final ServletHolder holder = context.addServlet(ServletContainer.class, 
"/*");
+    holder.setInitOrder(1);
+    holder.setInitParameter(
+        "jersey.config.server.provider.packages",
+        "io.swagger.jaxrs.listing, io.swagger.sample.resource, 
org.apache.iotdb.collector.api");
+    holder.setInitParameter(
+        "jersey.config.server.provider.classnames",
+        "org.glassfish.jersey.media.multipart.MultiPartFeature");
+    holder.setInitParameter("jersey.config.server.wadl.disableWadl", "true");
+    context.setContextPath("/");
+    return context;
+  }
+
+  @Override
+  public void stop() {
+    if (server == null) {
+      LOGGER.info("[ApiService] Not started yet. Nothing to stop.");
+      return;
+    }
+
+    try {
+      server.stop();
+      LOGGER.info("[ApiService] Stopped successfully.");
+    } catch (final Exception e) {
+      LOGGER.warn("[ApiService] Failed to stop: {}", e.getMessage(), e);
+    } finally {
+      server.destroy();
+    }
+  }
+
+  @Override
+  public String name() {
+    return "ApiService";
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/IService.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/IService.java
new file mode 100644
index 0000000..16358bb
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/IService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.service;
+
+public interface IService {
+
+  /** Start the service. */
+  void start();
+
+  /** Stop the service. */
+  void stop();
+
+  /**
+   * Get the name of the service.
+   *
+   * @return the name of the service
+   */
+  String name();
+}
diff --git 
a/iotdb-collector/collector-core/src/main/resources/application.properties 
b/iotdb-collector/collector-core/src/main/resources/application.properties
new file mode 100644
index 0000000..7bcf3f0
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/resources/application.properties
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+api_service_port=17070
\ No newline at end of file
diff --git a/iotdb-collector/collector-openapi/pom.xml 
b/iotdb-collector/collector-openapi/pom.xml
new file mode 100644
index 0000000..1c1fcc8
--- /dev/null
+++ b/iotdb-collector/collector-openapi/pom.xml
@@ -0,0 +1,179 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-collector</artifactId>
+        <version>1.3.2</version>
+    </parent>
+    <artifactId>collector-openapi</artifactId>
+    <name>IoTDB: Collector: OpenAPI</name>
+    <dependencies>
+        <dependency>
+            <groupId>io.swagger</groupId>
+            <artifactId>swagger-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.swagger</groupId>
+            <artifactId>swagger-models</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-json-provider</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jsr310</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.validation</groupId>
+            <artifactId>jakarta.validation-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.ws.rs</groupId>
+            <artifactId>jakarta.ws.rs-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.swagger</groupId>
+            <artifactId>swagger-jaxrs</artifactId>
+        </dependency>
+        <!-- Just needed for unused import code -->
+        <dependency>
+            <groupId>org.glassfish.jersey.media</groupId>
+            <artifactId>jersey-media-multipart</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.servlet</groupId>
+            <artifactId>jakarta.servlet-api</artifactId>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.openapitools</groupId>
+                <artifactId>openapi-generator-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-java-rest-codes-common</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            
<inputSpec>${project.basedir}/src/main/openapi3/ping.yaml</inputSpec>
+                            
<output>${project.build.directory}/generated-sources/java</output>
+                            
<apiPackage>org.apache.iotdb.collector.api</apiPackage>
+                            
<modelPackage>org.apache.iotdb.collector.api.model</modelPackage>
+                            
<invokerPackage>org.apache.iotdb.collector.api.invoker</invokerPackage>
+                            <generatorName>jaxrs-jersey</generatorName>
+                            <groupId>org.apache.iotdb.</groupId>
+                            
<artifactId>iotdb-collector-rest-service</artifactId>
+                            
<artifactVersion>${project.version}</artifactVersion>
+                            <addCompileSourceRoot>true</addCompileSourceRoot>
+                            <configOptions>
+                                <licenseName>Apache License 2.0</licenseName>
+                                <groupId>org.apache.iotdb</groupId>
+                                
<artifactId>iotdb-collector-rest-service</artifactId>
+                                
<artifactVersion>${project.version}</artifactVersion>
+                                <dateLibrary>java8</dateLibrary>
+                                <useGzipFeature>true</useGzipFeature>
+                            </configOptions>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>generate-java-rest-codes-v1</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            
<inputSpec>${project.basedir}/src/main/openapi3/iotdb_collector_rest_v1.yaml</inputSpec>
+                            
<output>${project.build.directory}/generated-sources/java</output>
+                            
<apiPackage>org.apache.iotdb.collector.api.v1</apiPackage>
+                            
<modelPackage>org.apache.iotdb.collector.api.v1.model</modelPackage>
+                            
<invokerPackage>org.apache.iotdb.collector.api.v1.invoker</invokerPackage>
+                            <generatorName>jaxrs-jersey</generatorName>
+                            <groupId>org.apache.iotdb</groupId>
+                            
<artifactId>iotdb-collector-rest-service</artifactId>
+                            
<artifactVersion>${project.version}</artifactVersion>
+                            <addCompileSourceRoot>true</addCompileSourceRoot>
+                            <configOptions>
+                                <licenseName>Apache License 2.0</licenseName>
+                                <groupId>org.apache.iotdb</groupId>
+                                
<artifactId>iotdb-collector-rest-service</artifactId>
+                                
<artifactVersion>${project.version}</artifactVersion>
+                                <dateLibrary>java8</dateLibrary>
+                                <useGzipFeature>true</useGzipFeature>
+                            </configOptions>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            <sources>
+                                
<source>${project.basedir}/target/generated-sources/java/src/gen/java</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>**/impl/**</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <usedDependencies combine.children="append">
+                        <!-- We just need this dependency to prevent the 
compiler from freaking out on unused imports -->
+                        
<usedDependency>org.glassfish.jersey.media:jersey-media-multipart</usedDependency>
+                    </usedDependencies>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/iotdb-collector/collector-openapi/src/main/openapi3/iotdb_collector_rest_v1.yaml
 
b/iotdb-collector/collector-openapi/src/main/openapi3/iotdb_collector_rest_v1.yaml
new file mode 100644
index 0000000..acb16b2
--- /dev/null
+++ 
b/iotdb-collector/collector-openapi/src/main/openapi3/iotdb_collector_rest_v1.yaml
@@ -0,0 +1,253 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+openapi: 3.0.0
+info:
+  title: iotdb-collector-rest
+  description: IoTDB Rest API for Collector
+  license:
+    name: Apache 2.0
+    url: https://www.apache.org/licenses/LICENSE-2.0.html
+  version: 1.0.0
+servers:
+  - url: http://127.0.0.1:17070/
+    description: api
+security:
+  - basic: []
+paths:
+  /admin/v1/createPipe:
+    post:
+      summary: createPipe
+      description: createPipe
+      operationId: createPipe
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreatePipeRequest'
+      responses:
+        "200":
+          description: ExecutionStatus
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ExecutionStatus'
+
+  /admin/v1/alterPipe:
+    post:
+      summary: alterPipe
+      description: alterPipe
+      operationId: alterPipe
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/AlterPipeRequest'
+      responses:
+        "200":
+          description: ExecutionStatus
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ExecutionStatus'
+
+  /admin/v1/startPipe:
+    post:
+      summary: startPipe
+      description: startPipe
+      operationId: startPipe
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/StartPipeRequest'
+      responses:
+        "200":
+          description: ExecutionStatus
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ExecutionStatus'
+
+  /admin/v1/stopPipe:
+    post:
+      summary: stopPipe
+      description: stopPipe
+      operationId: stopPipe
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/StopPipeRequest'
+      responses:
+        "200":
+          description: ExecutionStatus
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ExecutionStatus'
+
+  /admin/v1/dropPipe:
+    post:
+      summary: dropPipe
+      description: dropPipe
+      operationId: dropPipe
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/DropPipeRequest'
+      responses:
+        "200":
+          description: ExecutionStatus
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ExecutionStatus'
+
+components:
+  schemas:
+    CreatePipeRequest:
+      title: CreatePipeRequest
+      type: object
+      properties:
+        sourceAttribute:
+          type: object
+          additionalProperties:
+            type: string
+        processorAttribute:
+          type: object
+          additionalProperties:
+            type: string
+        sinkAttribute:
+          type: object
+          additionalProperties:
+            type: string
+        taskId:
+          type: string
+
+
+    AlterPipeRequest:
+      title: AlterPipeRequest
+      type: object
+      properties:
+        timestamps:
+          type: array
+          items:
+            type: integer
+            format: int64
+        measurements:
+          type: array
+          items:
+            type: string
+        dataTypes:
+          type: array
+          items:
+            type: string
+        values:
+          type: array
+          items:
+            type: array
+            items:
+              type: object
+        isAligned:
+          type: boolean
+        deviceId:
+          type: string
+
+    StartPipeRequest:
+      title: StartPipeRequest
+      type: object
+      properties:
+        timestamps:
+          type: array
+          items:
+            type: integer
+            format: int64
+        measurements:
+          type: array
+          items:
+            type: string
+        dataTypes:
+          type: array
+          items:
+            type: string
+        values:
+          type: array
+          items:
+            type: array
+            items:
+              type: object
+        isAligned:
+          type: boolean
+        deviceId:
+          type: string
+
+    StopPipeRequest:
+      title: StopPipeRequest
+      type: object
+      properties:
+        taskId:
+          type: string
+
+    DropPipeRequest:
+      title: DropPipeRequest
+      type: object
+      properties:
+        timestamps:
+          type: array
+          items:
+            type: integer
+            format: int64
+        measurements:
+          type: array
+          items:
+            type: string
+        dataTypes:
+          type: array
+          items:
+            type: string
+        values:
+          type: array
+          items:
+            type: array
+            items:
+              type: object
+        isAligned:
+          type: boolean
+        deviceId:
+          type: string
+
+    ExecutionStatus:
+      type: object
+      properties:
+        code:
+          type: integer
+          format: int32
+        message:
+          type: string
+
+  securitySchemes:
+    basic:
+      type: http
+      scheme: basic
+    APIKey:
+      type: apiKey
+      name: API Key
+      in: header
diff --git a/iotdb-collector/collector-openapi/src/main/openapi3/ping.yaml 
b/iotdb-collector/collector-openapi/src/main/openapi3/ping.yaml
new file mode 100644
index 0000000..36adab4
--- /dev/null
+++ b/iotdb-collector/collector-openapi/src/main/openapi3/ping.yaml
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+openapi: 3.0.0
+info:
+  title: ping
+  description: IoTDB Rest API for Collector
+  license:
+    name: Apache 2.0
+    url: https://www.apache.org/licenses/LICENSE-2.0.html
+  version: 1.0.0
+servers:
+- url: http://127.0.0.1:17070/
+  description: api
+security:
+- basic: []
+paths:
+  /ping:
+    get:
+      responses:
+        "200":
+          description: ExecutionStatus
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ExecutionStatus'
+      operationId: tryPing
+
+components:
+  schemas:
+    ExecutionStatus:
+      type: object
+      properties:
+        code:
+          type: integer
+          format: int32
+        message:
+          type: string
+
+  securitySchemes:
+    basic:
+      type: http
+      scheme: basic
+    APIKey:
+      type: apiKey
+      name: API Key
+      in: header
diff --git a/iotdb-collector/pom.xml b/iotdb-collector/pom.xml
new file mode 100644
index 0000000..d8b174a
--- /dev/null
+++ b/iotdb-collector/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-extras-parent</artifactId>
+        <version>1.3.2</version>
+    </parent>
+    <artifactId>iotdb-collector</artifactId>
+    <packaging>pom</packaging>
+    <name>IoTDB: Collector</name>
+    <modules>
+        <module>collector-core</module>
+        <module>collector-openapi</module>
+    </modules>
+</project>
diff --git a/pom.xml b/pom.xml
index 5d6ad9b..f46f2fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,7 @@
         <module>connectors</module>
         <module>distributions</module>
         <module>examples</module>
+        <module>iotdb-collector</module>
     </modules>
     <properties>
         <!-- Explicitly set a variable used by all dependencies to the IoTDB 
dependencies, as the release plugin will not update these on a release -->
@@ -90,7 +91,7 @@
       
https://github.com/apache/iotdb-bin-resources/tree/main/iotdb-tools-thrift
     -->
         <iotdb-tools-thrift.version>0.14.1.0</iotdb-tools-thrift.version>
-        <jackson.version>2.15.4</jackson.version>
+        <jackson.version>2.16.2</jackson.version>
         <!-- This is the last version to support the javax namespace -->
         <jakarta.servlet-api.version>4.0.4</jakarta.servlet-api.version>
         <!-- This is the last version to support the javax namespace -->
@@ -102,7 +103,7 @@
         <!-- This is the last version to support the javax namespace -->
         <jersey.version>2.40</jersey.version>
         <!-- This was the last version to support Java 8 -->
-        <jetty.version>9.4.53.v20231009</jetty.version>
+        <jetty.version>9.4.57.v20241219</jetty.version>
         <jjwt.version>0.11.5</jjwt.version>
         <jline.version>3.23.0</jline.version>
         <jna.version>5.14.0</jna.version>
@@ -169,7 +170,7 @@
         <!-- This is the last version to support the javax namespace -->
         <spring.version>5.3.33</spring.version>
         <!-- This was the last version to support Java 8 -->
-        <swagger.version>1.6.11</swagger.version>
+        <swagger.version>1.6.14</swagger.version>
         <thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
         <thrift.exec.absolute.path/>
         <!--
@@ -753,6 +754,97 @@
                 <artifactId>amqp-client</artifactId>
                 <version>5.18.0</version>
             </dependency>
+            <dependency>
+                <groupId>io.swagger</groupId>
+                <artifactId>swagger-annotations</artifactId>
+                <version>${swagger.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.swagger</groupId>
+                <artifactId>swagger-models</artifactId>
+                <version>${swagger.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.swagger</groupId>
+                <artifactId>swagger-jaxrs</artifactId>
+                <version>${swagger.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>javax.validation</groupId>
+                        <artifactId>validation-api</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>javax.ws.rs</groupId>
+                        <artifactId>jsr311-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-annotations</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.jaxrs</groupId>
+                <artifactId>jackson-jaxrs-json-provider</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.datatype</groupId>
+                <artifactId>jackson-datatype-jsr310</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>jakarta.validation</groupId>
+                <artifactId>jakarta.validation-api</artifactId>
+                <version>${jakarta.validation-api.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>jakarta.ws.rs</groupId>
+                <artifactId>jakarta.ws.rs-api</artifactId>
+                <version>${jakarta.ws.rs-api.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.glassfish.jersey.media</groupId>
+                <artifactId>jersey-media-multipart</artifactId>
+                <version>${jersey.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.jetty</groupId>
+                <artifactId>jetty-server</artifactId>
+                <version>${jetty.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>javax.servlet</groupId>
+                        <artifactId>javax.servlet-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.jetty</groupId>
+                <artifactId>jetty-servlet</artifactId>
+                <version>${jetty.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.jetty</groupId>
+                <artifactId>jetty-http</artifactId>
+                <version>${jetty.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.jetty</groupId>
+                <artifactId>jetty-util</artifactId>
+                <version>${jetty.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.glassfish.jersey.containers</groupId>
+                <artifactId>jersey-container-servlet-core</artifactId>
+                <version>${jersey.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.glassfish.jersey.inject</groupId>
+                <artifactId>jersey-hk2</artifactId>
+                <version>${jersey.version}</version>
+            </dependency>
             <!-- Conflict:
         json-smart (pulls in 9.3),
         cglib (pulls in 7.1)
@@ -1412,6 +1504,11 @@
                     <artifactId>xml-format-maven-plugin</artifactId>
                     <version>3.2.2</version>
                 </plugin>
+                <plugin>
+                    <groupId>org.openapitools</groupId>
+                    <artifactId>openapi-generator-maven-plugin</artifactId>
+                    <version>${openapi.generator.version}</version>
+                </plugin>
             </plugins>
         </pluginManagement>
         <plugins>

Reply via email to