This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git
The following commit(s) were added to refs/heads/master by this push:
new a812c4c Complete the migration of the collector node project (#43)
a812c4c is described below
commit a812c4c819a32a1f248515684da3717ce7b86b30
Author: 0xB <[email protected]>
AuthorDate: Fri Feb 21 15:15:13 2025 +0800
Complete the migration of the collector node project (#43)
* Complete the migration of the collector node project
* fix compile-check fail
---
iotdb-collector/collector-core/pom.xml | 119 ++++++++++
.../org/apache/iotdb/collector/Application.java | 93 ++++++++
.../iotdb/collector/agent/CollectorAgent.java | 50 ++++
.../agent/collect/CollectorEventCollector.java | 48 ++++
.../executor/CollectorProcessorTaskExecutor.java | 68 ++++++
.../agent/executor/CollectorSinkTaskExecutor.java | 67 ++++++
.../executor/CollectorSourceTaskExecutor.java | 67 ++++++
.../agent/executor/CollectorTaskExecutor.java | 64 ++++++
.../agent/executor/CollectorTaskExecutorAgent.java | 53 +++++
.../agent/plugin/CollectorPluginAgent.java | 39 ++++
.../agent/plugin/CollectorPluginConstructor.java | 77 +++++++
.../agent/task/CollectorProcessorTask.java | 88 +++++++
.../collector/agent/task/CollectorSinkTask.java | 87 +++++++
.../collector/agent/task/CollectorSourceTask.java | 69 ++++++
.../iotdb/collector/agent/task/CollectorTask.java | 37 +++
.../collector/agent/task/CollectorTaskAgent.java | 117 ++++++++++
.../collector/api/filter/ApiOriginFilter.java | 52 +++++
.../collector/api/impl/PingApiServiceImpl.java | 38 ++++
.../api/v1/handler/RequestValidationHandler.java | 37 +++
.../collector/api/v1/impl/AdminApiServiceImpl.java | 98 ++++++++
.../iotdb/collector/config/ApiServiceOptions.java | 31 +++
.../iotdb/collector/config/Configuration.java | 88 +++++++
.../org/apache/iotdb/collector/config/Options.java | 103 +++++++++
.../iotdb/collector/config/TrimProperties.java | 51 +++++
.../collector/plugin/BuiltinCollectorPlugin.java | 74 ++++++
.../builtin/processor/DoNothingProcessor.java | 64 ++++++
.../collector/plugin/builtin/sink/SessionSink.java | 68 ++++++
.../plugin/builtin/source/HttpSource.java | 79 +++++++
.../plugin/builtin/source/event/SourceEvent.java | 45 ++++
.../apache/iotdb/collector/service/ApiService.java | 96 ++++++++
.../apache/iotdb/collector/service/IService.java | 36 +++
.../src/main/resources/application.properties | 20 ++
iotdb-collector/collector-openapi/pom.xml | 179 +++++++++++++++
.../src/main/openapi3/iotdb_collector_rest_v1.yaml | 253 +++++++++++++++++++++
.../collector-openapi/src/main/openapi3/ping.yaml | 63 +++++
iotdb-collector/pom.xml | 36 +++
pom.xml | 103 ++++++++-
37 files changed, 2754 insertions(+), 3 deletions(-)
diff --git a/iotdb-collector/collector-core/pom.xml
b/iotdb-collector/collector-core/pom.xml
new file mode 100644
index 0000000..ba372d0
--- /dev/null
+++ b/iotdb-collector/collector-core/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-collector</artifactId>
+ <version>1.3.2</version>
+ </parent>
+ <artifactId>collector-core</artifactId>
+ <name>IoTDB: Collector: Core</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>collector-openapi</artifactId>
+ <version>1.3.2</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>node-commons</artifactId>
+ <version>1.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>service-rpc</artifactId>
+ <version>1.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>pipe-api</artifactId>
+ <version>1.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.servlet</groupId>
+ <artifactId>jakarta.servlet-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.ws.rs</groupId>
+ <artifactId>jakarta.ws.rs-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <usedDependencies>
+ <!-- For some reason the plugin complains if this
artifact is included -->
+
<usedDependency>org.eclipse.jetty:jetty-http</usedDependency>
+
<usedDependency>org.eclipse.jetty:jetty-util</usedDependency>
+
<usedDependency>org.glassfish.jersey.inject:jersey-hk2</usedDependency>
+ </usedDependencies>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
new file mode 100644
index 0000000..6cf6456
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector;
+
+import org.apache.iotdb.collector.config.Configuration;
+import org.apache.iotdb.collector.service.ApiService;
+import org.apache.iotdb.collector.service.IService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+
+public class Application {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Application.class);
+
+ private final Configuration configuration = new Configuration();
+ private final LinkedList<IService> services = new LinkedList<>();
+
+ private Application() {
+ services.add(new ApiService());
+ }
+
+ public static void main(String[] args) {
+ LOGGER.info("[Application] Starting ...");
+ final long startTime = System.currentTimeMillis();
+
+ final Application application = new Application();
+
+ application.logAllOptions();
+ application.registerShutdownHook();
+ application.startServices();
+
+ LOGGER.info(
+ "[Application] Successfully started in {}ms",
System.currentTimeMillis() - startTime);
+ }
+
+ private void logAllOptions() {
+ configuration.logAllOptions();
+ }
+
+ private void registerShutdownHook() {
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ LOGGER.warn("[Application] Exiting ...");
+
+ for (final IService service : services) {
+ try {
+ service.stop();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "[{}] Unexpected exception occurred when stopping:
{}",
+ service.name(),
+ e.getMessage(),
+ e);
+ }
+ }
+
+ LOGGER.warn(
+ "[Application] JVM report: total memory {}, free memory
{}, used memory {}",
+ Runtime.getRuntime().totalMemory(),
+ Runtime.getRuntime().freeMemory(),
+ Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory());
+ LOGGER.warn("[Application] Exited.");
+ }));
+ }
+
+ private void startServices() {
+ for (final IService service : services) {
+ service.start();
+ }
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
new file mode 100644
index 0000000..bca4106
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent;
+
+import org.apache.iotdb.collector.agent.executor.CollectorTaskExecutorAgent;
+import org.apache.iotdb.collector.agent.plugin.CollectorPluginAgent;
+import org.apache.iotdb.collector.agent.task.CollectorTaskAgent;
+
+public class CollectorAgent {
+
+ private final CollectorTaskAgent collectorTaskAgent =
CollectorTaskAgent.instance();
+ private final CollectorTaskExecutorAgent collectorTaskExecutorAgent =
+ CollectorTaskExecutorAgent.instance();
+ private final CollectorPluginAgent collectorPluginAgent =
CollectorPluginAgent.instance();
+
+ private CollectorAgent() {}
+
+ public static CollectorTaskAgent task() {
+ return CollectorAgentHolder.INSTANCE.collectorTaskAgent;
+ }
+
+ public static CollectorTaskExecutorAgent executor() {
+ return CollectorAgentHolder.INSTANCE.collectorTaskExecutorAgent;
+ }
+
+ public static CollectorPluginAgent plugin() {
+ return CollectorAgentHolder.INSTANCE.collectorPluginAgent;
+ }
+
+ private static class CollectorAgentHolder {
+ private static final CollectorAgent INSTANCE = new CollectorAgent();
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java
new file mode 100644
index 0000000..056685e
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.collect;
+
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+public class CollectorEventCollector implements EventCollector {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CollectorEventCollector.class);
+
+ private final BlockingQueue<Event> pendingQueue;
+
+ public CollectorEventCollector(final BlockingQueue<Event> pendingQueue) {
+ this.pendingQueue = pendingQueue;
+ }
+
+ @Override
+ public void collect(final Event event) {
+ try {
+ pendingQueue.put(event);
+ } catch (final InterruptedException e) {
+ LOGGER.warn("collect event failed because {}", e.getMessage(), e);
+ }
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
new file mode 100644
index 0000000..385fd60
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorProcessorTaskExecutor extends CollectorTaskExecutor {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(CollectorProcessorTaskExecutor.class);
+
+ private static final Map<String, ExecutorService> PROCESSOR_EXECUTOR = new
ConcurrentHashMap<>();
+ private static final Map<String, CollectorTask> PROCESSOR_TASK_MAP = new
ConcurrentHashMap<>();
+
+ public boolean validateIfAbsent(final String taskId) {
+ return !PROCESSOR_EXECUTOR.containsKey(taskId) &&
!PROCESSOR_TASK_MAP.containsKey(taskId);
+ }
+
+ @Override
+ public Optional<ExecutorService> getExecutor(final String taskId) {
+ return Optional.of(
+
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-processor-executor-"
+ taskId));
+ }
+
+ @Override
+ public void recordExecution(
+ final CollectorTask collectorTask, final ExecutorService
executorService) {
+ final String taskId = collectorTask.getTaskId();
+ PROCESSOR_EXECUTOR.putIfAbsent(taskId, executorService);
+ PROCESSOR_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+ LOGGER.info("register collector processor task {}", taskId);
+ }
+
+ @Override
+ public void eraseExecution(final String taskId) {
+ PROCESSOR_TASK_MAP.remove(taskId).stop();
+ PROCESSOR_EXECUTOR.remove(taskId).shutdownNow();
+
+ LOGGER.info("deregister collector processor task {}", taskId);
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
new file mode 100644
index 0000000..a490e61
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorSinkTaskExecutor extends CollectorTaskExecutor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CollectorSinkTaskExecutor.class);
+
+ private static final Map<String, ExecutorService> SINK_EXECUTOR = new
ConcurrentHashMap<>();
+ private static final Map<String, CollectorTask> SINK_TASK_MAP = new
ConcurrentHashMap<>();
+
+ public boolean validateIfAbsent(final String taskId) {
+ return !SINK_EXECUTOR.containsKey(taskId) &&
!SINK_TASK_MAP.containsKey(taskId);
+ }
+
+ @Override
+ public Optional<ExecutorService> getExecutor(final String taskId) {
+ return Optional.of(
+
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-sink-executor-" +
taskId));
+ }
+
+ @Override
+ public void recordExecution(
+ final CollectorTask collectorTask, final ExecutorService
executorService) {
+ final String taskId = collectorTask.getTaskId();
+ SINK_EXECUTOR.putIfAbsent(taskId, executorService);
+ SINK_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+ LOGGER.info("register collector sink task {}", taskId);
+ }
+
+ @Override
+ public void eraseExecution(final String taskId) {
+ SINK_TASK_MAP.remove(taskId).stop();
+ SINK_EXECUTOR.remove(taskId).shutdownNow();
+
+ LOGGER.info("deregister collector sink task {}", taskId);
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
new file mode 100644
index 0000000..fde8236
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorSourceTaskExecutor extends CollectorTaskExecutor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CollectorSourceTaskExecutor.class);
+
+ private static final Map<String, ExecutorService> SOURCE_EXECUTOR = new
ConcurrentHashMap<>();
+ private static final Map<String, CollectorTask> SOURCE_TASK_MAP = new
ConcurrentHashMap<>();
+
+ public boolean validateIfAbsent(final String taskId) {
+ return !SOURCE_EXECUTOR.containsKey(taskId) &&
!SOURCE_TASK_MAP.containsKey(taskId);
+ }
+
+ @Override
+ public Optional<ExecutorService> getExecutor(final String taskId) {
+ return Optional.of(
+
IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-source-executor-" +
taskId));
+ }
+
+ @Override
+ public void recordExecution(
+ final CollectorTask collectorTask, final ExecutorService
executorService) {
+ final String taskId = collectorTask.getTaskId();
+ SOURCE_EXECUTOR.put(taskId, executorService);
+ SOURCE_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+ LOGGER.info("register collector source task {}", taskId);
+ }
+
+ @Override
+ public void eraseExecution(String taskId) {
+ SOURCE_TASK_MAP.remove(taskId).stop();
+ SOURCE_EXECUTOR.remove(taskId).shutdownNow();
+
+ LOGGER.info("deregister collector source task {}", taskId);
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
new file mode 100644
index 0000000..8100766
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+public abstract class CollectorTaskExecutor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CollectorTaskExecutor.class);
+
+ public void register(final CollectorTask collectorTask) {
+ if (validateIfAbsent(collectorTask.getTaskId())) {
+ getExecutor(collectorTask.getTaskId())
+ .ifPresent(
+ executor -> {
+ executor.submit(collectorTask);
+ LOGGER.info("register success {}", collectorTask.getTaskId());
+ recordExecution(collectorTask, executor);
+ });
+ } else {
+ LOGGER.warn("task {} has existed", collectorTask.getTaskId());
+ }
+ }
+
+ public abstract boolean validateIfAbsent(final String taskId);
+
+ public abstract Optional<ExecutorService> getExecutor(final String taskId);
+
+ public abstract void recordExecution(
+ final CollectorTask collectorTask, final ExecutorService
executorService);
+
+ public void deregister(final String taskId) {
+ if (!validateIfAbsent(taskId)) {
+ eraseExecution(taskId);
+ } else {
+ LOGGER.warn("task {} has not existed", taskId);
+ }
+ }
+
+ public abstract void eraseExecution(final String taskId);
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
new file mode 100644
index 0000000..5adcace
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+public class CollectorTaskExecutorAgent {
+
+ private final CollectorSourceTaskExecutor sourceTaskExecutor;
+ private final CollectorProcessorTaskExecutor processorTaskExecutor;
+ private final CollectorSinkTaskExecutor sinkTaskExecutor;
+
+ private CollectorTaskExecutorAgent() {
+ sourceTaskExecutor = new CollectorSourceTaskExecutor();
+ processorTaskExecutor = new CollectorProcessorTaskExecutor();
+ sinkTaskExecutor = new CollectorSinkTaskExecutor();
+ }
+
+ public CollectorSourceTaskExecutor getSourceTaskExecutor() {
+ return CollectorTaskExecutorAgentHolder.INSTANCE.sourceTaskExecutor;
+ }
+
+ public CollectorProcessorTaskExecutor getProcessorTaskExecutor() {
+ return CollectorTaskExecutorAgentHolder.INSTANCE.processorTaskExecutor;
+ }
+
+ public CollectorSinkTaskExecutor getSinkTaskExecutor() {
+ return CollectorTaskExecutorAgentHolder.INSTANCE.sinkTaskExecutor;
+ }
+
+ public static CollectorTaskExecutorAgent instance() {
+ return CollectorTaskExecutorAgentHolder.INSTANCE;
+ }
+
+ private static class CollectorTaskExecutorAgentHolder {
+ private static final CollectorTaskExecutorAgent INSTANCE = new
CollectorTaskExecutorAgent();
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java
new file mode 100644
index 0000000..33ee5ca
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.plugin;
+
+public class CollectorPluginAgent {
+ private final CollectorPluginConstructor collectorPluginConstructor =
+ CollectorPluginConstructor.instance();
+
+ private CollectorPluginAgent() {}
+
+ public CollectorPluginConstructor constructor() {
+ return CollectorPluginAgentHolder.INSTANCE.collectorPluginConstructor;
+ }
+
+ public static CollectorPluginAgent instance() {
+ return new CollectorPluginAgent();
+ }
+
+ private static class CollectorPluginAgentHolder {
+ private static final CollectorPluginAgent INSTANCE = new
CollectorPluginAgent();
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
new file mode 100644
index 0000000..cce94f6
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.plugin;
+
+import org.apache.iotdb.collector.plugin.BuiltinCollectorPlugin;
+import org.apache.iotdb.collector.plugin.builtin.processor.DoNothingProcessor;
+import org.apache.iotdb.collector.plugin.builtin.sink.SessionSink;
+import org.apache.iotdb.collector.plugin.builtin.source.HttpSource;
+import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.PipeSink;
+import org.apache.iotdb.pipe.api.PipeSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class CollectorPluginConstructor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CollectorPluginConstructor.class);
+
+ protected final Map<String, Supplier<PipePlugin>> pluginConstructors = new
HashMap<>();
+
+ private CollectorPluginConstructor() {
+ initConstructors();
+ }
+
+ private void initConstructors() {
+ pluginConstructors.put(
+ BuiltinCollectorPlugin.HTTP_SOURCE.getCollectorPluginName(),
HttpSource::new);
+ pluginConstructors.put(
+ BuiltinCollectorPlugin.DO_NOTHING_PROCESSOR.getCollectorPluginName(),
+ DoNothingProcessor::new);
+ pluginConstructors.put(
+ BuiltinCollectorPlugin.IOTDB_SESSION_SINK.getCollectorPluginName(),
SessionSink::new);
+ LOGGER.info("builtin plugin has been initialized");
+ }
+
+ public PipeSource getSource(final String pluginName) {
+ return (PipeSource) pluginConstructors.get(pluginName).get();
+ }
+
+ public PipeProcessor getProcessor(final String pluginName) {
+ return (PipeProcessor) pluginConstructors.get(pluginName).get();
+ }
+
+ public PipeSink getSink(final String pluginName) {
+ return (PipeSink) pluginConstructors.get(pluginName).get();
+ }
+
+ public static CollectorPluginConstructor instance() {
+ return CollectorPluginConstructorHolder.INSTANCE;
+ }
+
+ private static class CollectorPluginConstructorHolder {
+ private static final CollectorPluginConstructor INSTANCE = new
CollectorPluginConstructor();
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
new file mode 100644
index 0000000..8d1acfc
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.collector.agent.collect.CollectorEventCollector;
+import org.apache.iotdb.commons.pipe.task.EventSupplier;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class CollectorProcessorTask extends CollectorTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CollectorProcessorTask.class);
+
+ private final Map<String, String> processorAttribute;
+ private final PipeProcessor pipeProcessor;
+ private final EventSupplier eventSupplier;
+ private final BlockingQueue<Event> pendingQueue;
+ private final CollectorEventCollector collectorEventCollector;
+ private boolean isStarted = true;
+
+ public CollectorProcessorTask(
+ final String taskId,
+ final Map<String, String> processorAttribute,
+ final PipeProcessor pipeProcessor,
+ final EventSupplier eventSupplier,
+ final BlockingQueue<Event> pendingQueue) {
+ super(taskId);
+ this.processorAttribute = processorAttribute;
+ this.pipeProcessor = pipeProcessor;
+ this.eventSupplier = eventSupplier;
+ this.pendingQueue = pendingQueue;
+ this.collectorEventCollector = new CollectorEventCollector(pendingQueue);
+ }
+
+ @Override
+ public void runMayThrow() {
+ while (isStarted) {
+ try {
+ pipeProcessor.process(eventSupplier.supply(), collectorEventCollector);
+ } catch (final Exception e) {
+ LOGGER.warn("error occur while processing event because {}",
e.getMessage());
+ }
+ }
+ }
+
+ public Map<String, String> getProcessorAttribute() {
+ return processorAttribute;
+ }
+
+ public PipeProcessor getPipeProcessor() {
+ return pipeProcessor;
+ }
+
+ public EventSupplier getEventSupplier() {
+ return eventSupplier;
+ }
+
+ public BlockingQueue<Event> getPendingQueue() {
+ return pendingQueue;
+ }
+
+ public void stop() {
+ isStarted = false;
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
new file mode 100644
index 0000000..77e65b4
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.pipe.api.PipeSink;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class CollectorSinkTask extends CollectorTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CollectorSinkTask.class);
+
+ private final Map<String, String> sinkAttribute;
+ private final PipeSink pipeSink;
+ private final BlockingQueue<Event> pendingQueue;
+ private boolean isStarted = true;
+
+ public CollectorSinkTask(
+ final String taskId,
+ final Map<String, String> sinkAttribute,
+ final PipeSink pipeSink,
+ final BlockingQueue<Event> pendingQueue) {
+ super(taskId);
+ this.sinkAttribute = sinkAttribute;
+ this.pipeSink = pipeSink;
+ this.pendingQueue = pendingQueue;
+ }
+
+ @Override
+ public void runMayThrow() {
+ try {
+ pipeSink.handshake();
+ } catch (final Exception e) {
+ LOGGER.warn("handshake fail because {}", e.getMessage());
+ }
+ isStarted = true;
+ while (isStarted) {
+ try {
+ final Event event = pendingQueue.take();
+ pipeSink.transfer(event);
+ LOGGER.info("transfer event {} success, remain number is {}", event,
pendingQueue.size());
+ } catch (final InterruptedException e) {
+ LOGGER.warn("interrupted while waiting for take a event");
+ } catch (final Exception e) {
+ LOGGER.warn("error occur while transfer event to endpoint");
+ }
+ }
+ }
+
+ public Map<String, String> getSinkAttribute() {
+ return sinkAttribute;
+ }
+
+ public PipeSink getPipeSink() {
+ return pipeSink;
+ }
+
+ public void stop() {
+ isStarted = false;
+ }
+
+ public BlockingQueue<Event> getPendingQueue() {
+ return pendingQueue;
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
new file mode 100644
index 0000000..26269c3
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.commons.pipe.task.EventSupplier;
+import org.apache.iotdb.pipe.api.PipeSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class CollectorSourceTask extends CollectorTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CollectorSourceTask.class);
+
+ private final Map<String, String> sourceAttribute;
+ private final PipeSource pipeSource;
+
+ public CollectorSourceTask(
+ final String taskId, final Map<String, String> sourceAttribute, final
PipeSource pipeSource) {
+ super(taskId);
+ this.sourceAttribute = sourceAttribute;
+ this.pipeSource = pipeSource;
+ }
+
+ @Override
+ public void runMayThrow() throws Throwable {
+ pipeSource.start();
+ }
+
+ public Map<String, String> getSourceAttribute() {
+ return sourceAttribute;
+ }
+
+ public PipeSource getPipeSource() {
+ return pipeSource;
+ }
+
+ public EventSupplier getEventSupplier() {
+ return pipeSource::supply;
+ }
+
+ @Override
+ public void stop() {
+ try {
+ pipeSource.close();
+ } catch (final Exception e) {
+ LOGGER.warn("failed to close pipe source {}", pipeSource, e);
+ }
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java
new file mode 100644
index 0000000..4f13ff9
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+
+public abstract class CollectorTask extends WrappedRunnable {
+
+ protected final String taskId;
+
+ protected CollectorTask(final String taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public abstract void stop();
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
new file mode 100644
index 0000000..9cd787d
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import
org.apache.iotdb.collector.agent.executor.CollectorProcessorTaskExecutor;
+import org.apache.iotdb.collector.agent.executor.CollectorSinkTaskExecutor;
+import org.apache.iotdb.collector.agent.executor.CollectorSourceTaskExecutor;
+import org.apache.iotdb.collector.agent.executor.CollectorTaskExecutorAgent;
+import org.apache.iotdb.collector.agent.plugin.CollectorPluginAgent;
+import org.apache.iotdb.collector.agent.plugin.CollectorPluginConstructor;
+import org.apache.iotdb.collector.plugin.BuiltinCollectorPlugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class CollectorTaskAgent {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CollectorTaskAgent.class);
+
+ private static final CollectorPluginConstructor CONSTRUCTOR =
+ CollectorPluginAgent.instance().constructor();
+ private static final CollectorSourceTaskExecutor SOURCE_TASK_EXECUTOR =
+ CollectorTaskExecutorAgent.instance().getSourceTaskExecutor();
+ private static final CollectorProcessorTaskExecutor PROCESSOR_TASK_EXECUTOR =
+ CollectorTaskExecutorAgent.instance().getProcessorTaskExecutor();
+ private static final CollectorSinkTaskExecutor SINK_TASK_EXECUTOR =
+ CollectorTaskExecutorAgent.instance().getSinkTaskExecutor();
+
+ private CollectorTaskAgent() {}
+
+ public boolean createCollectorTask(
+ final Map<String, String> sourceAttribute,
+ final Map<String, String> processorAttribute,
+ final Map<String, String> sinkAttribute,
+ final String taskId) {
+ try {
+ final CollectorSourceTask collectorSourceTask =
+ new CollectorSourceTask(
+ taskId,
+ sourceAttribute,
+ CONSTRUCTOR.getSource(
+ sourceAttribute.getOrDefault(
+ "source-plugin",
+
BuiltinCollectorPlugin.HTTP_SOURCE.getCollectorPluginName())));
+ SOURCE_TASK_EXECUTOR.register(collectorSourceTask);
+
+ final CollectorProcessorTask collectorProcessorTask =
+ new CollectorProcessorTask(
+ taskId,
+ processorAttribute,
+ CONSTRUCTOR.getProcessor(
+ processorAttribute.getOrDefault(
+ "processor-plugin",
+
BuiltinCollectorPlugin.DO_NOTHING_PROCESSOR.getCollectorPluginName())),
+ collectorSourceTask.getEventSupplier(),
+ new LinkedBlockingQueue<>());
+ PROCESSOR_TASK_EXECUTOR.register(collectorProcessorTask);
+
+ final CollectorSinkTask collectorSinkTask =
+ new CollectorSinkTask(
+ taskId,
+ sinkAttribute,
+ CONSTRUCTOR.getSink(
+ sinkAttribute.getOrDefault(
+ "sink-plugin",
+
BuiltinCollectorPlugin.IOTDB_SESSION_SINK.getCollectorPluginName())),
+ collectorProcessorTask.getPendingQueue());
+ SINK_TASK_EXECUTOR.register(collectorSinkTask);
+ } catch (final Exception e) {
+ LOGGER.warn("create collector task error", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ public boolean stopCollectorTask(final String taskId) {
+ try {
+ SOURCE_TASK_EXECUTOR.deregister(taskId);
+ PROCESSOR_TASK_EXECUTOR.deregister(taskId);
+ SINK_TASK_EXECUTOR.deregister(taskId);
+ } catch (final Exception e) {
+ LOGGER.warn("stop collector task error", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ public static CollectorTaskAgent instance() {
+ return CollectorTaskAgentHolder.INSTANCE;
+ }
+
+ private static class CollectorTaskAgentHolder {
+ private static final CollectorTaskAgent INSTANCE = new
CollectorTaskAgent();
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/filter/ApiOriginFilter.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/filter/ApiOriginFilter.java
new file mode 100644
index 0000000..008e309
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/filter/ApiOriginFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.filter;
+
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+
+public class ApiOriginFilter implements javax.servlet.Filter {
+ @Override
+ public void doFilter(
+ final ServletRequest request, final ServletResponse response, final
FilterChain chain)
+ throws IOException, ServletException {
+ final HttpServletResponse res = (HttpServletResponse) response;
+ res.addHeader("Access-Control-Allow-Origin", "*");
+ res.addHeader("Access-Control-Allow-Methods", "GET, POST");
+ res.addHeader("Access-Control-Allow-Headers", "*");
+ chain.doFilter(request, response);
+ }
+
+ @Override
+ public void destroy() {
+ // do nothing
+ }
+
+ @Override
+ public void init(final FilterConfig filterConfig) throws ServletException {
+ // do nothing
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java
new file mode 100644
index 0000000..7d73e38
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.collector.api.impl;
+
+import org.apache.iotdb.collector.api.PingApiService;
+import org.apache.iotdb.collector.api.v1.model.ExecutionStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+public class PingApiServiceImpl extends PingApiService {
+
+ @Override
+ public Response tryPing(final SecurityContext securityContext) {
+ return Response.ok()
+ .entity(
+ new ExecutionStatus()
+ .code(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ .message(TSStatusCode.SUCCESS_STATUS.name()))
+ .build();
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java
new file mode 100644
index 0000000..9456c2f
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.handler;
+
+import org.apache.iotdb.collector.api.v1.model.CreatePipeRequest;
+import org.apache.iotdb.collector.api.v1.model.StopPipeRequest;
+
+import java.util.Objects;
+
+public class RequestValidationHandler {
+ private RequestValidationHandler() {}
+
+ public static void validateCreateRequest(final CreatePipeRequest
createPipeRequest) {
+ Objects.requireNonNull(createPipeRequest.getTaskId(), "taskId cannot be
null");
+ }
+
+ public static void validateStopRequest(final StopPipeRequest
stopPipeRequest) {
+ Objects.requireNonNull(stopPipeRequest.getTaskId(), "taskId cannot be
null");
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
new file mode 100644
index 0000000..07d1f3e
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.impl;
+
+import org.apache.iotdb.collector.agent.CollectorAgent;
+import org.apache.iotdb.collector.api.v1.AdminApiService;
+import org.apache.iotdb.collector.api.v1.NotFoundException;
+import org.apache.iotdb.collector.api.v1.handler.RequestValidationHandler;
+import org.apache.iotdb.collector.api.v1.model.AlterPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.CreatePipeRequest;
+import org.apache.iotdb.collector.api.v1.model.DropPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.StartPipeRequest;
+import org.apache.iotdb.collector.api.v1.model.StopPipeRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+public class AdminApiServiceImpl extends AdminApiService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AdminApiServiceImpl.class);
+
+ @Override
+ public Response alterPipe(
+ final AlterPipeRequest alterPipeRequest, final SecurityContext
securityContext)
+ throws NotFoundException {
+ return Response.ok("alterPipe").build();
+ }
+
+ @Override
+ public Response createPipe(
+ final CreatePipeRequest createPipeRequest, final SecurityContext
securityContext)
+ throws NotFoundException {
+ RequestValidationHandler.validateCreateRequest(createPipeRequest);
+
+ final boolean createdResult =
+ CollectorAgent.task()
+ .createCollectorTask(
+ createPipeRequest.getSourceAttribute(),
+ createPipeRequest.getProcessorAttribute(),
+ createPipeRequest.getSinkAttribute(),
+ createPipeRequest.getTaskId());
+ if (createdResult) {
+ LOGGER.info("Create task successful");
+ return Response.status(Response.Status.OK).entity("create task
success").build();
+ }
+ LOGGER.warn("Create task failed");
+ return Response.status(Response.Status.BAD_REQUEST).entity("create task
fail").build();
+ }
+
+ @Override
+ public Response dropPipe(
+ final DropPipeRequest dropPipeRequest, final SecurityContext
securityContext)
+ throws NotFoundException {
+ return Response.ok("dropPipe").build();
+ }
+
+ @Override
+ public Response startPipe(
+ final StartPipeRequest startPipeRequest, final SecurityContext
securityContext)
+ throws NotFoundException {
+ return Response.ok("startPipe").build();
+ }
+
+ @Override
+ public Response stopPipe(
+ final StopPipeRequest stopPipeRequest, final SecurityContext
securityContext)
+ throws NotFoundException {
+ RequestValidationHandler.validateStopRequest(stopPipeRequest);
+
+ final boolean stopResult =
CollectorAgent.task().stopCollectorTask(stopPipeRequest.getTaskId());
+ if (stopResult) {
+ LOGGER.info("Stop task: {} successful", stopPipeRequest.getTaskId());
+ return Response.ok().entity("stop task: " + stopPipeRequest.getTaskId()
+ " success").build();
+ }
+ LOGGER.warn("Stop task: {} failed", stopPipeRequest.getTaskId());
+ return Response.status(Response.Status.BAD_REQUEST).entity("stop task
fail").build();
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
new file mode 100644
index 0000000..2e3ee55
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+public class ApiServiceOptions extends Options {
+
+ public static final Option<Integer> PORT =
+ new Option<Integer>("api_service_port", 17070) {
+ @Override
+ public void setValue(String valueString) {
+ value = Integer.parseInt(valueString);
+ }
+ };
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Configuration.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Configuration.java
new file mode 100644
index 0000000..115c1c0
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Configuration.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.Properties;
+
+public class Configuration {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Configuration.class);
+
+ private static final String CONFIG_FILE_NAME = "application.properties";
+
+ private final Options options = new Options();
+
+ public Configuration() {
+ loadProps();
+ }
+
+ private void loadProps() {
+ final Optional<URL> url = getPropsUrl();
+ if (url.isPresent()) {
+ try (final InputStream inputStream = url.get().openStream()) {
+ LOGGER.info("Start to read config file {}", url.get());
+ final Properties properties = new Properties();
+ properties.load(new InputStreamReader(inputStream,
StandardCharsets.UTF_8));
+ final TrimProperties trimProperties = new TrimProperties();
+ trimProperties.putAll(properties);
+ options.loadProperties(trimProperties);
+ } catch (final FileNotFoundException e) {
+ LOGGER.error("Fail to find config file, reject startup.", e);
+ System.exit(-1);
+ } catch (final IOException e) {
+ LOGGER.error("IO exception when reading config file, reject startup.",
e);
+ System.exit(-1);
+ } catch (final Exception e) {
+ LOGGER.error("Unexpected exception when reading config file, reject
startup.", e);
+ System.exit(-1);
+ }
+ } else {
+ LOGGER.warn("{} is not found, use default configuration",
CONFIG_FILE_NAME);
+ }
+ }
+
+ private Optional<URL> getPropsUrl() {
+ final URL url = Options.class.getResource("/" + CONFIG_FILE_NAME);
+
+ if (url != null) {
+ return Optional.of(url);
+ } else {
+ LOGGER.warn(
+ "Cannot find IOTDB_COLLECTOR_HOME or IOTDB_COLLECTOR_CONF
environment variable when loading "
+ + "config file {}, use default configuration",
+ CONFIG_FILE_NAME);
+ return Optional.empty();
+ }
+ }
+
+ public void logAllOptions() {
+ options.logAllOptions();
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
new file mode 100644
index 0000000..8ad2887
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Options {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Options.class);
+
+ private static final Map<String, Option<?>> OPTIONS = new
ConcurrentHashMap<>();
+
+ public abstract static class Option<T> {
+
+ private final String key;
+ private final T defaultValue;
+ protected T value;
+
+ Option(final String key, final T defaultValue) {
+ this.key = key;
+ this.defaultValue = defaultValue;
+
+ OPTIONS.put(key, this);
+ }
+
+ public String key() {
+ return key;
+ }
+
+ public boolean hasDefaultValue() {
+ return defaultValue != null;
+ }
+
+ public T defaultValue() {
+ return defaultValue;
+ }
+
+ public T value() {
+ return value == null ? defaultValue : value;
+ }
+
+ public abstract void setValue(final String valueString);
+
+ @Override
+ public String toString() {
+ return key + " = " + value();
+ }
+ }
+
+ public void loadProperties(final TrimProperties properties) {
+ properties
+ .stringPropertyNames()
+ .forEach(
+ key -> {
+ final Option<?> option = OPTIONS.get(key);
+ if (option != null) {
+ try {
+ option.setValue(properties.getProperty(key));
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Unexpected exception when setting value for option: {},
given value: {}",
+ key,
+ properties.getProperty(key),
+ e);
+ }
+ }
+ });
+ }
+
+ public Optional<Option<?>> getOption(final String key) {
+ return Optional.ofNullable(OPTIONS.get(key));
+ }
+
+ public void logAllOptions() {
+ LOGGER.info("========================== OPTIONS
==========================");
+ for (final Option<?> option : OPTIONS.values()) {
+ LOGGER.info(option.toString());
+ }
+
LOGGER.info("=============================================================");
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TrimProperties.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TrimProperties.java
new file mode 100644
index 0000000..1cf766d
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TrimProperties.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+import java.util.Optional;
+import java.util.Properties;
+
+public class TrimProperties extends Properties {
+ @Override
+ public synchronized Object get(Object key) {
+ Object value = super.get(key);
+ if (value instanceof String) {
+ return ((String) value).trim();
+ }
+ return value;
+ }
+
+ @Override
+ public synchronized Object put(Object key, Object value) {
+ if (value instanceof String) {
+ value = ((String) value).trim();
+ }
+ return super.put(key, value);
+ }
+
+ @Override
+ public synchronized String getProperty(String key, String defaultValue) {
+ String val = getProperty(key);
+ if (defaultValue != null) {
+ defaultValue = defaultValue.trim();
+ }
+ return Optional.ofNullable(val).map(String::trim).orElse(defaultValue);
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinCollectorPlugin.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinCollectorPlugin.java
new file mode 100644
index 0000000..2d676e5
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinCollectorPlugin.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin;
+
+import org.apache.iotdb.collector.plugin.builtin.processor.DoNothingProcessor;
+import org.apache.iotdb.collector.plugin.builtin.sink.SessionSink;
+import org.apache.iotdb.collector.plugin.builtin.source.HttpSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public enum BuiltinCollectorPlugin {
+
+ // Sources
+ HTTP_SOURCE("http-source", HttpSource.class),
+
+ // Processors
+ DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
+
+ // Sinks
+ IOTDB_SESSION_SINK("iotdb-session-sink", SessionSink.class);
+
+ private final String collectorPluginName;
+ private final Class<?> collectorPluginClass;
+ private final String className;
+
+ BuiltinCollectorPlugin(String collectorPluginName, Class<?>
collectorPluginClass) {
+ this.collectorPluginName = collectorPluginName;
+ this.collectorPluginClass = collectorPluginClass;
+ this.className = collectorPluginClass.getName();
+ }
+
+ public String getCollectorPluginName() {
+ return collectorPluginName;
+ }
+
+ public Class<?> getCollectorPluginClass() {
+ return collectorPluginClass;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public static final Set<String> SHOW_COLLECTOR_PLUGINS_BLACKLIST =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(
+ // Sources
+ HTTP_SOURCE.getCollectorPluginName().toUpperCase(),
+ // Processors
+ DO_NOTHING_PROCESSOR.getCollectorPluginName().toUpperCase(),
+ // Sinks
+ IOTDB_SESSION_SINK.getCollectorPluginName().toUpperCase())));
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java
new file mode 100644
index 0000000..436fc20
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.builtin.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DoNothingProcessor implements PipeProcessor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DoNothingProcessor.class);
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ // Do Nothing
+ }
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeProcessorRuntimeConfiguration configuration)
+ throws Exception {
+ // Do Nothing
+ }
+
+ @Override
+ public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
+ throws Exception {
+ // Do Nothing
+ }
+
+ @Override
+ public void process(Event event, EventCollector eventCollector) throws
Exception {
+ LOGGER.info("DoNothingProcessor process event: {}", event);
+ eventCollector.collect(event);
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Do Nothing
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/SessionSink.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/SessionSink.java
new file mode 100644
index 0000000..e2a5260
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/SessionSink.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.builtin.sink;
+
+import org.apache.iotdb.collector.plugin.builtin.source.event.SourceEvent;
+import org.apache.iotdb.pipe.api.PipeSink;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeSinkRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SessionSink implements PipeSink {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SessionSink.class);
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {}
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
+ throws Exception {}
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeSinkRuntimeConfiguration configuration)
+ throws Exception {}
+
+ @Override
+ public void handshake() throws Exception {
+ LOGGER.info("SessionSink handshake successfully");
+ }
+
+ @Override
+ public void heartbeat() throws Exception {}
+
+ @Override
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {}
+
+ @Override
+ public void transfer(Event event) throws Exception {
+ final SourceEvent sourceEvent = (SourceEvent) event;
+ LOGGER.info("SessionSink transfer successfully {}", sourceEvent);
+ }
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpSource.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpSource.java
new file mode 100644
index 0000000..65ba79b
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpSource.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.builtin.source;
+
+import org.apache.iotdb.collector.plugin.builtin.source.event.SourceEvent;
+import org.apache.iotdb.pipe.api.PipeSource;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class HttpSource implements PipeSource {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HttpSource.class);
+
+ private static final BlockingQueue<Event> queue = new
LinkedBlockingQueue<>();
+ private boolean isStarted = true;
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {}
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeExtractorRuntimeConfiguration configuration)
+ throws Exception {}
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeSourceRuntimeConfiguration configuration)
+ throws Exception {}
+
+ @Override
+ public void start() {
+ isStarted = true;
+ while (isStarted) {
+ Event event = new SourceEvent(String.valueOf(new
Random().nextInt(1000)));
+ try {
+ queue.put(event);
+ Thread.sleep(1000);
+ LOGGER.info("event: {} created success", event);
+ } catch (final InterruptedException e) {
+ LOGGER.warn("failed to create event because {}", e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public Event supply() throws Exception {
+ return queue.take();
+ }
+
+ @Override
+ public void close() throws Exception {
+ isStarted = false;
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SourceEvent.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SourceEvent.java
new file mode 100644
index 0000000..813e25e
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SourceEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.builtin.source.event;
+
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class SourceEvent implements Event {
+ private String name;
+
+ public SourceEvent() {}
+
+ public SourceEvent(final String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return "SourceEvent [name=" + name + "]";
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/ApiService.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/ApiService.java
new file mode 100644
index 0000000..b079014
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/ApiService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.service;
+
+import org.apache.iotdb.collector.api.filter.ApiOriginFilter;
+import org.apache.iotdb.collector.config.ApiServiceOptions;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.DispatcherType;
+
+import java.util.EnumSet;
+
+public class ApiService implements IService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ApiService.class);
+
+ private Server server;
+
+ @Override
+ public void start() {
+ server = new Server(ApiServiceOptions.PORT.value());
+ server.setHandler(constructServletContextHandler());
+ try {
+ server.start();
+ LOGGER.info(
+ "[ApiService] Started successfully. Listening on port {}",
+ ApiServiceOptions.PORT.value());
+ } catch (final Exception e) {
+ LOGGER.warn("[ApiService] Failed to start: {}", e.getMessage(), e);
+ server.destroy();
+ }
+ }
+
+ private ServletContextHandler constructServletContextHandler() {
+ final ServletContextHandler context =
+ new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+ context.addFilter(
+ ApiOriginFilter.class, "/*", EnumSet.of(DispatcherType.INCLUDE,
DispatcherType.REQUEST));
+ final ServletHolder holder = context.addServlet(ServletContainer.class,
"/*");
+ holder.setInitOrder(1);
+ holder.setInitParameter(
+ "jersey.config.server.provider.packages",
+ "io.swagger.jaxrs.listing, io.swagger.sample.resource,
org.apache.iotdb.collector.api");
+ holder.setInitParameter(
+ "jersey.config.server.provider.classnames",
+ "org.glassfish.jersey.media.multipart.MultiPartFeature");
+ holder.setInitParameter("jersey.config.server.wadl.disableWadl", "true");
+ context.setContextPath("/");
+ return context;
+ }
+
+ @Override
+ public void stop() {
+ if (server == null) {
+ LOGGER.info("[ApiService] Not started yet. Nothing to stop.");
+ return;
+ }
+
+ try {
+ server.stop();
+ LOGGER.info("[ApiService] Stopped successfully.");
+ } catch (final Exception e) {
+ LOGGER.warn("[ApiService] Failed to stop: {}", e.getMessage(), e);
+ } finally {
+ server.destroy();
+ }
+ }
+
+ @Override
+ public String name() {
+ return "ApiService";
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/IService.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/IService.java
new file mode 100644
index 0000000..16358bb
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/IService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.service;
+
+public interface IService {
+
+ /** Start the service. */
+ void start();
+
+ /** Stop the service. */
+ void stop();
+
+ /**
+ * Get the name of the service.
+ *
+ * @return the name of the service
+ */
+ String name();
+}
diff --git
a/iotdb-collector/collector-core/src/main/resources/application.properties
b/iotdb-collector/collector-core/src/main/resources/application.properties
new file mode 100644
index 0000000..7bcf3f0
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/resources/application.properties
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+api_service_port=17070
\ No newline at end of file
diff --git a/iotdb-collector/collector-openapi/pom.xml
b/iotdb-collector/collector-openapi/pom.xml
new file mode 100644
index 0000000..1c1fcc8
--- /dev/null
+++ b/iotdb-collector/collector-openapi/pom.xml
@@ -0,0 +1,179 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-collector</artifactId>
+ <version>1.3.2</version>
+ </parent>
+ <artifactId>collector-openapi</artifactId>
+ <name>IoTDB: Collector: OpenAPI</name>
+ <dependencies>
+ <dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-models</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-json-provider</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.validation</groupId>
+ <artifactId>jakarta.validation-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.ws.rs</groupId>
+ <artifactId>jakarta.ws.rs-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-jaxrs</artifactId>
+ </dependency>
+ <!-- Just needed for unused import code -->
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.servlet</groupId>
+ <artifactId>jakarta.servlet-api</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.openapitools</groupId>
+ <artifactId>openapi-generator-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-java-rest-codes-common</id>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ <phase>generate-sources</phase>
+ <configuration>
+
<inputSpec>${project.basedir}/src/main/openapi3/ping.yaml</inputSpec>
+
<output>${project.build.directory}/generated-sources/java</output>
+
<apiPackage>org.apache.iotdb.collector.api</apiPackage>
+
<modelPackage>org.apache.iotdb.collector.api.model</modelPackage>
+
<invokerPackage>org.apache.iotdb.collector.api.invoker</invokerPackage>
+ <generatorName>jaxrs-jersey</generatorName>
+ <groupId>org.apache.iotdb.</groupId>
+
<artifactId>iotdb-collector-rest-service</artifactId>
+
<artifactVersion>${project.version}</artifactVersion>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <configOptions>
+ <licenseName>Apache License 2.0</licenseName>
+ <groupId>org.apache.iotdb</groupId>
+
<artifactId>iotdb-collector-rest-service</artifactId>
+
<artifactVersion>${project.version}</artifactVersion>
+ <dateLibrary>java8</dateLibrary>
+ <useGzipFeature>true</useGzipFeature>
+ </configOptions>
+ </configuration>
+ </execution>
+ <execution>
+ <id>generate-java-rest-codes-v1</id>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ <phase>generate-sources</phase>
+ <configuration>
+
<inputSpec>${project.basedir}/src/main/openapi3/iotdb_collector_rest_v1.yaml</inputSpec>
+
<output>${project.build.directory}/generated-sources/java</output>
+
<apiPackage>org.apache.iotdb.collector.api.v1</apiPackage>
+
<modelPackage>org.apache.iotdb.collector.api.v1.model</modelPackage>
+
<invokerPackage>org.apache.iotdb.collector.api.v1.invoker</invokerPackage>
+ <generatorName>jaxrs-jersey</generatorName>
+ <groupId>org.apache.iotdb</groupId>
+
<artifactId>iotdb-collector-rest-service</artifactId>
+
<artifactVersion>${project.version}</artifactVersion>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <configOptions>
+ <licenseName>Apache License 2.0</licenseName>
+ <groupId>org.apache.iotdb</groupId>
+
<artifactId>iotdb-collector-rest-service</artifactId>
+
<artifactVersion>${project.version}</artifactVersion>
+ <dateLibrary>java8</dateLibrary>
+ <useGzipFeature>true</useGzipFeature>
+ </configOptions>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <phase>generate-sources</phase>
+ <configuration>
+ <sources>
+
<source>${project.basedir}/target/generated-sources/java/src/gen/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/impl/**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <usedDependencies combine.children="append">
+ <!-- We just need this dependency to prevent the
compiler from freaking out on unused imports -->
+
<usedDependency>org.glassfish.jersey.media:jersey-media-multipart</usedDependency>
+ </usedDependencies>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/iotdb-collector/collector-openapi/src/main/openapi3/iotdb_collector_rest_v1.yaml
b/iotdb-collector/collector-openapi/src/main/openapi3/iotdb_collector_rest_v1.yaml
new file mode 100644
index 0000000..acb16b2
--- /dev/null
+++
b/iotdb-collector/collector-openapi/src/main/openapi3/iotdb_collector_rest_v1.yaml
@@ -0,0 +1,253 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+openapi: 3.0.0
+info:
+ title: iotdb-collector-rest
+ description: IoTDB Rest API for Collector
+ license:
+ name: Apache 2.0
+ url: https://www.apache.org/licenses/LICENSE-2.0.html
+ version: 1.0.0
+servers:
+ - url: http://127.0.0.1:17070/
+ description: api
+security:
+ - basic: []
+paths:
+ /admin/v1/createPipe:
+ post:
+ summary: createPipe
+ description: createPipe
+ operationId: createPipe
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/CreatePipeRequest'
+ responses:
+ "200":
+ description: ExecutionStatus
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ExecutionStatus'
+
+ /admin/v1/alterPipe:
+ post:
+ summary: alterPipe
+ description: alterPipe
+ operationId: alterPipe
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AlterPipeRequest'
+ responses:
+ "200":
+ description: ExecutionStatus
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ExecutionStatus'
+
+ /admin/v1/startPipe:
+ post:
+ summary: startPipe
+ description: startPipe
+ operationId: startPipe
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/StartPipeRequest'
+ responses:
+ "200":
+ description: ExecutionStatus
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ExecutionStatus'
+
+ /admin/v1/stopPipe:
+ post:
+ summary: stopPipe
+ description: stopPipe
+ operationId: stopPipe
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/StopPipeRequest'
+ responses:
+ "200":
+ description: ExecutionStatus
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ExecutionStatus'
+
+ /admin/v1/dropPipe:
+ post:
+ summary: dropPipe
+ description: dropPipe
+ operationId: dropPipe
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DropPipeRequest'
+ responses:
+ "200":
+ description: ExecutionStatus
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ExecutionStatus'
+
+components:
+ schemas:
+ CreatePipeRequest:
+ title: CreatePipeRequest
+ type: object
+ properties:
+ sourceAttribute:
+ type: object
+ additionalProperties:
+ type: string
+ processorAttribute:
+ type: object
+ additionalProperties:
+ type: string
+ sinkAttribute:
+ type: object
+ additionalProperties:
+ type: string
+ taskId:
+ type: string
+
+
+ AlterPipeRequest:
+ title: AlterPipeRequest
+ type: object
+ properties:
+ timestamps:
+ type: array
+ items:
+ type: integer
+ format: int64
+ measurements:
+ type: array
+ items:
+ type: string
+ dataTypes:
+ type: array
+ items:
+ type: string
+ values:
+ type: array
+ items:
+ type: array
+ items:
+ type: object
+ isAligned:
+ type: boolean
+ deviceId:
+ type: string
+
+ StartPipeRequest:
+ title: StartPipeRequest
+ type: object
+ properties:
+ timestamps:
+ type: array
+ items:
+ type: integer
+ format: int64
+ measurements:
+ type: array
+ items:
+ type: string
+ dataTypes:
+ type: array
+ items:
+ type: string
+ values:
+ type: array
+ items:
+ type: array
+ items:
+ type: object
+ isAligned:
+ type: boolean
+ deviceId:
+ type: string
+
+ StopPipeRequest:
+ title: StopPipeRequest
+ type: object
+ properties:
+ taskId:
+ type: string
+
+ DropPipeRequest:
+ title: DropPipeRequest
+ type: object
+ properties:
+ timestamps:
+ type: array
+ items:
+ type: integer
+ format: int64
+ measurements:
+ type: array
+ items:
+ type: string
+ dataTypes:
+ type: array
+ items:
+ type: string
+ values:
+ type: array
+ items:
+ type: array
+ items:
+ type: object
+ isAligned:
+ type: boolean
+ deviceId:
+ type: string
+
+ ExecutionStatus:
+ type: object
+ properties:
+ code:
+ type: integer
+ format: int32
+ message:
+ type: string
+
+ securitySchemes:
+ basic:
+ type: http
+ scheme: basic
+ APIKey:
+ type: apiKey
+ name: API Key
+ in: header
diff --git a/iotdb-collector/collector-openapi/src/main/openapi3/ping.yaml
b/iotdb-collector/collector-openapi/src/main/openapi3/ping.yaml
new file mode 100644
index 0000000..36adab4
--- /dev/null
+++ b/iotdb-collector/collector-openapi/src/main/openapi3/ping.yaml
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+openapi: 3.0.0
+info:
+ title: ping
+ description: IoTDB Rest API for Collector
+ license:
+ name: Apache 2.0
+ url: https://www.apache.org/licenses/LICENSE-2.0.html
+ version: 1.0.0
+servers:
+- url: http://127.0.0.1:17070/
+ description: api
+security:
+- basic: []
+paths:
+ /ping:
+ get:
+ responses:
+ "200":
+ description: ExecutionStatus
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ExecutionStatus'
+ operationId: tryPing
+
+components:
+ schemas:
+ ExecutionStatus:
+ type: object
+ properties:
+ code:
+ type: integer
+ format: int32
+ message:
+ type: string
+
+ securitySchemes:
+ basic:
+ type: http
+ scheme: basic
+ APIKey:
+ type: apiKey
+ name: API Key
+ in: header
diff --git a/iotdb-collector/pom.xml b/iotdb-collector/pom.xml
new file mode 100644
index 0000000..d8b174a
--- /dev/null
+++ b/iotdb-collector/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-extras-parent</artifactId>
+ <version>1.3.2</version>
+ </parent>
+ <artifactId>iotdb-collector</artifactId>
+ <packaging>pom</packaging>
+ <name>IoTDB: Collector</name>
+ <modules>
+ <module>collector-core</module>
+ <module>collector-openapi</module>
+ </modules>
+</project>
diff --git a/pom.xml b/pom.xml
index 5d6ad9b..f46f2fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,7 @@
<module>connectors</module>
<module>distributions</module>
<module>examples</module>
+ <module>iotdb-collector</module>
</modules>
<properties>
<!-- Explicitly set a variable used by all dependencies to the IoTDB
dependencies, as the release plugin will not update these on a release -->
@@ -90,7 +91,7 @@
https://github.com/apache/iotdb-bin-resources/tree/main/iotdb-tools-thrift
-->
<iotdb-tools-thrift.version>0.14.1.0</iotdb-tools-thrift.version>
- <jackson.version>2.15.4</jackson.version>
+ <jackson.version>2.16.2</jackson.version>
<!-- This is the last version to support the javax namespace -->
<jakarta.servlet-api.version>4.0.4</jakarta.servlet-api.version>
<!-- This is the last version to support the javax namespace -->
@@ -102,7 +103,7 @@
<!-- This is the last version to support the javax namespace -->
<jersey.version>2.40</jersey.version>
<!-- This was the last version to support Java 8 -->
- <jetty.version>9.4.53.v20231009</jetty.version>
+ <jetty.version>9.4.57.v20241219</jetty.version>
<jjwt.version>0.11.5</jjwt.version>
<jline.version>3.23.0</jline.version>
<jna.version>5.14.0</jna.version>
@@ -169,7 +170,7 @@
<!-- This is the last version to support the javax namespace -->
<spring.version>5.3.33</spring.version>
<!-- This was the last version to support Java 8 -->
- <swagger.version>1.6.11</swagger.version>
+ <swagger.version>1.6.14</swagger.version>
<thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
<thrift.exec.absolute.path/>
<!--
@@ -753,6 +754,97 @@
<artifactId>amqp-client</artifactId>
<version>5.18.0</version>
</dependency>
+ <dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-annotations</artifactId>
+ <version>${swagger.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-models</artifactId>
+ <version>${swagger.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-jaxrs</artifactId>
+ <version>${swagger.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>jsr311-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-json-provider</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.validation</groupId>
+ <artifactId>jakarta.validation-api</artifactId>
+ <version>${jakarta.validation-api.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.ws.rs</groupId>
+ <artifactId>jakarta.ws.rs-api</artifactId>
+ <version>${jakarta.ws.rs-api.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${jetty.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
<!-- Conflict:
json-smart (pulls in 9.3),
cglib (pulls in 7.1)
@@ -1412,6 +1504,11 @@
<artifactId>xml-format-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
+ <plugin>
+ <groupId>org.openapitools</groupId>
+ <artifactId>openapi-generator-maven-plugin</artifactId>
+ <version>${openapi.generator.version}</version>
+ </plugin>
</plugins>
</pluginManagement>
<plugins>