This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git


The following commit(s) were added to refs/heads/master by this push:
     new e8c86d7  Collector: Fixed task not being able to be dropped when the 
task is stopped & Modify PipeParameters & Supports registration of plugins and 
reflection usage & Add SQLite support for persistence of plugin and task meta. 
(#57)
e8c86d7 is described below

commit e8c86d719706d7a287f0b3f4e8bf3d6ecbea25d1
Author: 0xB <[email protected]>
AuthorDate: Mon Apr 7 10:11:49 2025 +0800

    Collector: Fixed task not being able to be dropped when the task is stopped 
& Modify PipeParameters & Supports registration of plugins and reflection usage 
& Add SQLite support for persistence of plugin and task meta. (#57)
---
 iotdb-collector/collector-core/pom.xml             |   4 +
 .../org/apache/iotdb/collector/Application.java    |   2 +
 .../api/v1/plugin/impl/PluginApiServiceImpl.java   |  42 +++--
 .../PluginApiServiceRequestValidationHandler.java  |  19 +-
 .../api/v1/task/impl/TaskApiServiceImpl.java       |   5 +-
 .../org/apache/iotdb/collector/config/Options.java |   1 +
 .../PluginRuntimeOptions.java}                     |  23 ++-
 .../iotdb/collector/persistence/DBConstant.java    |  49 +++++
 .../Persistence.java}                              |  27 ++-
 .../collector/persistence/PluginPersistence.java   | 152 +++++++++++++++
 .../collector/persistence/TaskPersistence.java     | 206 +++++++++++++++++++++
 .../api/customizer/CollectorParameters.java}       |  18 +-
 .../plugin/builtin/source/HttpPullSource.java      |   2 +-
 .../plugin/builtin/source/HttpPushSource.java      |   2 +-
 .../collector/runtime/plugin/PluginRuntime.java    | 161 ++++++++++++++--
 .../plugin/constructor/PluginConstructor.java      |  15 +-
 .../plugin/constructor/SinkConstructor.java        |   2 +-
 .../plugin/constructor/SourceConstructor.java      |   2 +-
 .../runtime/plugin/load/PluginClassLoader.java     |  65 +++++++
 .../plugin/load/PluginClassLoaderManager.java      |  55 ++++++
 .../runtime/plugin/meta/PluginMetaKeeper.java      |  55 ++++--
 .../runtime/plugin/utils/PluginFileUtils.java      |  94 ++++++++++
 .../apache/iotdb/collector/runtime/task/Task.java  |  35 +---
 .../task/TaskDispatch.java}                        |  35 +++-
 .../iotdb/collector/runtime/task/TaskRuntime.java  |  43 ++++-
 .../task/TaskStateEnum.java}                       |  16 +-
 .../runtime/task/processor/ProcessorConsumer.java  |   9 +
 .../runtime/task/processor/ProcessorTask.java      |   7 +-
 .../collector/runtime/task/sink/SinkConsumer.java  |   9 +
 .../collector/runtime/task/sink/SinkTask.java      |   7 +-
 .../collector/runtime/task/source/SourceTask.java  |  13 +-
 .../runtime/task/source/pull/PullSourceTask.java   |  14 +-
 .../runtime/task/source/push/PushSourceTask.java   |  44 ++++-
 .../collector/service/PersistenceService.java      |  86 +++++++++
 .../src/main/resources/application.properties      |  14 ++
 .../src/main/openapi3/v1/plugin.yaml               |  82 +-------
 pom.xml                                            |   6 +
 37 files changed, 1230 insertions(+), 191 deletions(-)

diff --git a/iotdb-collector/collector-core/pom.xml 
b/iotdb-collector/collector-core/pom.xml
index 7b68e68..c6517b8 100644
--- a/iotdb-collector/collector-core/pom.xml
+++ b/iotdb-collector/collector-core/pom.xml
@@ -107,6 +107,10 @@
             <groupId>com.google.code.findbugs</groupId>
             <artifactId>jsr305</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.xerial</groupId>
+            <artifactId>sqlite-jdbc</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
index 96ecdaf..650caee 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.collector;
 import org.apache.iotdb.collector.config.Configuration;
 import org.apache.iotdb.collector.service.ApiService;
 import org.apache.iotdb.collector.service.IService;
+import org.apache.iotdb.collector.service.PersistenceService;
 import org.apache.iotdb.collector.service.RuntimeService;
 
 import org.slf4j.Logger;
@@ -39,6 +40,7 @@ public class Application {
   private Application() {
     services.add(new RuntimeService());
     services.add(new ApiService());
+    services.add(new PersistenceService());
   }
 
   public static void main(String[] args) {
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
index b64f017..041dde1 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
@@ -20,11 +20,9 @@
 package org.apache.iotdb.collector.api.v1.plugin.impl;
 
 import org.apache.iotdb.collector.api.v1.plugin.PluginApiService;
-import org.apache.iotdb.collector.api.v1.plugin.model.AlterPluginRequest;
 import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest;
 import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
-import org.apache.iotdb.collector.api.v1.plugin.model.StartPluginRequest;
-import org.apache.iotdb.collector.api.v1.plugin.model.StopPluginRequest;
+import org.apache.iotdb.collector.service.RuntimeService;
 
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.SecurityContext;
@@ -34,30 +32,34 @@ public class PluginApiServiceImpl extends PluginApiService {
   @Override
   public Response createPlugin(
       final CreatePluginRequest createPluginRequest, final SecurityContext 
securityContext) {
-    return Response.ok("create plugin").build();
-  }
+    
PluginApiServiceRequestValidationHandler.validateCreatePluginRequest(createPluginRequest);
 
-  @Override
-  public Response alterPlugin(
-      final AlterPluginRequest alterPluginRequest, final SecurityContext 
securityContext) {
-    return Response.ok("alter plugin").build();
+    return RuntimeService.plugin().isPresent()
+        ? RuntimeService.plugin()
+            .get()
+            .createPlugin(
+                createPluginRequest.getPluginName().toUpperCase(),
+                createPluginRequest.getClassName(),
+                createPluginRequest.getJarName(),
+                null,
+                true)
+        : Response.ok("create plugin").build();
   }
 
   @Override
-  public Response startPlugin(
-      final StartPluginRequest startPluginRequest, final SecurityContext 
securityContext) {
-    return Response.ok("start plugin").build();
-  }
+  public Response dropPlugin(
+      final DropPluginRequest dropPluginRequest, final SecurityContext 
securityContext) {
+    
PluginApiServiceRequestValidationHandler.validateDropPluginRequest(dropPluginRequest);
 
-  @Override
-  public Response stopPlugin(
-      final StopPluginRequest stopPluginRequest, final SecurityContext 
securityContext) {
-    return Response.ok("stop plugin").build();
+    return RuntimeService.plugin().isPresent()
+        ? 
RuntimeService.plugin().get().dropPlugin(dropPluginRequest.getPluginName().toUpperCase())
+        : Response.ok("drop plugin").build();
   }
 
   @Override
-  public Response dropPlugin(
-      final DropPluginRequest dropPluginRequest, final SecurityContext 
securityContext) {
-    return Response.ok("drop plugin").build();
+  public Response showPlugin(final SecurityContext securityContext) {
+    return RuntimeService.plugin().isPresent()
+        ? RuntimeService.plugin().get().showPlugin()
+        : Response.ok("show plugin").build();
   }
 }
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
index 36adce3..6f7d983 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
@@ -19,4 +19,21 @@
 
 package org.apache.iotdb.collector.api.v1.plugin.impl;
 
-public class PluginApiServiceRequestValidationHandler {}
+import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest;
+import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
+
+import java.util.Objects;
+
+public class PluginApiServiceRequestValidationHandler {
+  private PluginApiServiceRequestValidationHandler() {}
+
+  public static void validateCreatePluginRequest(final CreatePluginRequest 
createPluginRequest) {
+    Objects.requireNonNull(createPluginRequest.getPluginName(), "plugin name 
cannot be null");
+    Objects.requireNonNull(createPluginRequest.getClassName(), "class name 
cannot be null");
+    Objects.requireNonNull(createPluginRequest.getJarName(), "jar name cannot 
be null");
+  }
+
+  public static void validateDropPluginRequest(final DropPluginRequest 
dropPluginRequest) {
+    Objects.requireNonNull(dropPluginRequest.getPluginName(), "plugin name 
cannot be null");
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
index 053b618..cd563da 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.collector.api.v1.task.model.CreateTaskRequest;
 import org.apache.iotdb.collector.api.v1.task.model.DropTaskRequest;
 import org.apache.iotdb.collector.api.v1.task.model.StartTaskRequest;
 import org.apache.iotdb.collector.api.v1.task.model.StopTaskRequest;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
 import org.apache.iotdb.collector.service.RuntimeService;
 
 import javax.ws.rs.core.Response;
@@ -42,9 +43,11 @@ public class TaskApiServiceImpl extends TaskApiService {
             .get()
             .createTask(
                 createTaskRequest.getTaskId(),
+                TaskStateEnum.RUNNING,
                 createTaskRequest.getSourceAttribute(),
                 createTaskRequest.getProcessorAttribute(),
-                createTaskRequest.getSinkAttribute())
+                createTaskRequest.getSinkAttribute(),
+                true)
         : Response.serverError().entity("Task runtime is down").build();
   }
 
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
index e067086..16fd814 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
@@ -36,6 +36,7 @@ public class Options {
     try {
       Class.forName(ApiServiceOptions.class.getName());
       Class.forName(TaskRuntimeOptions.class.getName());
+      Class.forName(PluginRuntimeOptions.class.getName());
     } catch (final ClassNotFoundException e) {
       throw new RuntimeException("Failed to load options", e);
     }
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java
similarity index 54%
copy from 
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
copy to 
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java
index 36adce3..92eb04b 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java
@@ -17,6 +17,25 @@
  * under the License.
  */
 
-package org.apache.iotdb.collector.api.v1.plugin.impl;
+package org.apache.iotdb.collector.config;
 
-public class PluginApiServiceRequestValidationHandler {}
+import java.io.File;
+
+public class PluginRuntimeOptions extends Options {
+  public static final Option<String> PLUGIN_LIB_DIR =
+      new Option<String>("plugin_lib_dir", "ext" + File.separator + "plugin") {
+        @Override
+        public void setValue(final String valueString) {
+          value = valueString;
+        }
+      };
+
+  public static final Option<String> PLUGIN_INSTALL_LIB_DIR =
+      new Option<String>(
+          "plugin_install_lib_dir", PLUGIN_LIB_DIR.value() + File.separator + 
"install") {
+        @Override
+        public void setValue(final String valueString) {
+          value = valueString;
+        }
+      };
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java
new file mode 100644
index 0000000..2e8c3bb
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.persistence;
+
+public class DBConstant {
+
+  public static final String CREATE_PLUGIN_TABLE_SQL =
+      "CREATE TABLE IF NOT EXISTS plugin\n"
+          + "(\n"
+          + "    plugin_name TEXT PRIMARY KEY,\n"
+          + "    class_name  TEXT NOT NULL,\n"
+          + "    jar_name    TEXT NOT NULL,\n"
+          + "    jar_md5    TEXT NOT NULL,\n"
+          + "    create_time TEXT NOT NULL\n"
+          + ");";
+  public static final String CREATE_TASK_TABLE_SQL =
+      "CREATE TABLE IF NOT EXISTS task\n"
+          + "(\n"
+          + "    task_id             TEXT PRIMARY KEY,\n"
+          + "    task_state          INT  NOT NULL,\n"
+          + "    source_attribute    BLOB NOT NULL,\n"
+          + "    processor_attribute BLOB NOT NULL,\n"
+          + "    sink_attribute      BLOB NOT NULL,\n"
+          + "    create_time         TEXT NOT NULL\n"
+          + ");";
+
+  public static final String PLUGIN_DATABASE_FILE_PATH = "ext/db/plugin.db";
+  public static final String TASK_DATABASE_FILE_PATH = "ext/db/task.db";
+
+  public static final String PLUGIN_DATABASE_URL = "jdbc:sqlite:" + 
PLUGIN_DATABASE_FILE_PATH;
+  public static final String TASK_DATABASE_URL = "jdbc:sqlite:" + 
TASK_DATABASE_FILE_PATH;
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java
similarity index 56%
copy from 
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
copy to 
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java
index 36adce3..b2abcb8 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java
@@ -17,6 +17,29 @@
  * under the License.
  */
 
-package org.apache.iotdb.collector.api.v1.plugin.impl;
+package org.apache.iotdb.collector.persistence;
 
-public class PluginApiServiceRequestValidationHandler {}
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public abstract class Persistence {
+
+  private final String databaseUrl;
+
+  public Persistence(final String databaseUrl) {
+    this.databaseUrl = databaseUrl;
+    initDatabaseFileIfPossible();
+    initTableIfPossible();
+  }
+
+  protected abstract void initDatabaseFileIfPossible();
+
+  protected abstract void initTableIfPossible();
+
+  public abstract void tryResume();
+
+  protected Connection getConnection() throws SQLException {
+    return DriverManager.getConnection(databaseUrl);
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java
new file mode 100644
index 0000000..d1d2b20
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.persistence;
+
+import org.apache.iotdb.collector.runtime.plugin.utils.PluginFileUtils;
+import org.apache.iotdb.collector.service.RuntimeService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Objects;
+
+public class PluginPersistence extends Persistence {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PluginPersistence.class);
+
+  public PluginPersistence(String databaseUrl) {
+    super(databaseUrl);
+  }
+
+  @Override
+  protected void initDatabaseFileIfPossible() {
+    try {
+      final Path pluginDatabaseFilePath = 
Paths.get(DBConstant.PLUGIN_DATABASE_FILE_PATH);
+      if (!Files.exists(pluginDatabaseFilePath)) {
+        Files.createFile(pluginDatabaseFilePath);
+      }
+    } catch (final IOException e) {
+      LOGGER.warn("Failed to create plugin database file", e);
+    }
+  }
+
+  @Override
+  protected void initTableIfPossible() {
+    try (final Connection connection = getConnection()) {
+      final PreparedStatement statement =
+          connection.prepareStatement(DBConstant.CREATE_PLUGIN_TABLE_SQL);
+      statement.executeUpdate();
+    } catch (final SQLException e) {
+      LOGGER.warn("Failed to create plugin database", e);
+    }
+  }
+
+  @Override
+  public void tryResume() {
+    final String queryAllPluginSQL =
+        "SELECT plugin_name, class_name, jar_name, jar_md5 FROM plugin";
+
+    try (final Connection connection = getConnection()) {
+      final PreparedStatement statement = 
connection.prepareStatement(queryAllPluginSQL);
+      final ResultSet pluginResultSet = statement.executeQuery();
+
+      while (pluginResultSet.next()) {
+        final String pluginName = pluginResultSet.getString("plugin_name");
+        final String className = pluginResultSet.getString("class_name");
+        final String jarName = pluginResultSet.getString("jar_name");
+        final String jarMd5 = pluginResultSet.getString("jar_md5");
+
+        if (!isPluginJarFileWithMD5NameExists(pluginName, jarName, jarMd5)) {
+          tryDeletePlugin(pluginName);
+          continue;
+        }
+
+        tryRecoverPlugin(pluginName, className, jarName, jarMd5);
+      }
+    } catch (final SQLException e) {
+      LOGGER.warn("Failed to resume plugin persistence message, because {}", 
e.getMessage());
+    }
+  }
+
+  private boolean isPluginJarFileWithMD5NameExists(
+      final String pluginName, final String jarName, final String jarMd5) {
+    final Path pluginJarFileWithMD5Path =
+        Paths.get(
+            PluginFileUtils.getPluginJarFileWithMD5FilePath(
+                pluginName, 
PluginFileUtils.getPluginJarFileNameWithMD5(jarName, jarMd5)));
+
+    return Files.exists(pluginJarFileWithMD5Path);
+  }
+
+  private void tryRecoverPlugin(
+      final String pluginName, final String className, final String jarName, 
final String jarMD5) {
+    final Response response =
+        RuntimeService.plugin().isPresent()
+            ? RuntimeService.plugin()
+                .get()
+                .createPlugin(pluginName, className, jarName, jarMD5, false)
+            : null;
+
+    if (Objects.isNull(response) || response.getStatus() != 
Response.Status.OK.getStatusCode()) {
+      LOGGER.warn("Failed to recover plugin message from plugin {}: {}", 
pluginName, response);
+    }
+  }
+
+  public void tryPersistencePlugin(
+      final String pluginName, final String className, final String jarName, 
final String jarMD5) {
+    final String sql =
+        "INSERT INTO plugin(plugin_name, class_name, jar_name, jar_md5, 
create_time) VALUES(?,?,?,?,?)";
+
+    try (final Connection connection = getConnection()) {
+      final PreparedStatement statement = connection.prepareStatement(sql);
+      statement.setString(1, pluginName);
+      statement.setString(2, className);
+      statement.setString(3, jarName);
+      statement.setString(4, jarMD5);
+      statement.setString(5, String.valueOf(new 
Timestamp(System.currentTimeMillis())));
+      statement.executeUpdate();
+    } catch (final SQLException e) {
+      LOGGER.warn("Failed to persistence plugin message, because {}", 
e.getMessage());
+    }
+  }
+
+  public void tryDeletePlugin(final String pluginName) {
+    final String sql = "DELETE FROM plugin WHERE plugin_name = ?";
+
+    try (final Connection connection = getConnection()) {
+      final PreparedStatement statement = connection.prepareStatement(sql);
+      statement.setString(1, pluginName);
+      statement.executeUpdate();
+    } catch (final SQLException e) {
+      LOGGER.warn("Failed to delete plugin persistence message, because {}", 
e.getMessage());
+    }
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java
new file mode 100644
index 0000000..bf62519
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.persistence;
+
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
+import org.apache.iotdb.collector.service.RuntimeService;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class TaskPersistence extends Persistence {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskPersistence.class);
+
+  public TaskPersistence(String databaseUrl) {
+    super(databaseUrl);
+  }
+
+  @Override
+  protected void initDatabaseFileIfPossible() {
+    try {
+      final Path taskDatabaseFilePath = 
Paths.get(DBConstant.TASK_DATABASE_FILE_PATH);
+      if (!Files.exists(taskDatabaseFilePath)) {
+        Files.createFile(taskDatabaseFilePath);
+      }
+    } catch (final IOException e) {
+      LOGGER.warn("Failed to create task database file", e);
+    }
+  }
+
+  @Override
+  protected void initTableIfPossible() {
+    try (final Connection connection = getConnection()) {
+      final PreparedStatement statement =
+          connection.prepareStatement(DBConstant.CREATE_TASK_TABLE_SQL);
+      statement.executeUpdate();
+    } catch (final SQLException e) {
+      LOGGER.warn("Failed to create task database", e);
+    }
+  }
+
+  @Override
+  public void tryResume() {
+    final String queryAllTaskSQL =
+        "SELECT task_id, task_state, source_attribute, processor_attribute, 
sink_attribute, create_time FROM task";
+
+    try (final Connection connection = getConnection()) {
+      final PreparedStatement statement = 
connection.prepareStatement(queryAllTaskSQL);
+      final ResultSet taskResultSet = statement.executeQuery();
+
+      while (taskResultSet.next()) {
+        final String taskId = taskResultSet.getString(1);
+        final TaskStateEnum taskState = 
TaskStateEnum.values()[taskResultSet.getInt(2)];
+        final byte[] sourceAttribute = taskResultSet.getBytes(3);
+        final byte[] processorAttribute = taskResultSet.getBytes(4);
+        final byte[] sinkAttribute = taskResultSet.getBytes(5);
+
+        tryRecoverTask(
+            taskId,
+            taskState,
+            deserialize(sourceAttribute),
+            deserialize(processorAttribute),
+            deserialize(sinkAttribute));
+      }
+    } catch (final SQLException e) {
+      LOGGER.warn("Failed to resume task persistence message, because {}", 
e.getMessage());
+    }
+  }
+
+  public void tryRecoverTask(
+      final String taskId,
+      final TaskStateEnum taskState,
+      final Map<String, String> sourceAttribute,
+      final Map<String, String> processorAttribute,
+      final Map<String, String> sinkAttribute) {
+    final Response response =
+        RuntimeService.task().isPresent()
+            ? RuntimeService.task()
+                .get()
+                .createTask(
+                    taskId, taskState, sourceAttribute, processorAttribute, 
sinkAttribute, false)
+            : null;
+
+    if (Objects.isNull(response) || response.getStatus() != 
Response.Status.OK.getStatusCode()) {
+      LOGGER.warn("Failed to recover task persistence message, because {}", 
response);
+    }
+  }
+
+  private Map<String, String> deserialize(final byte[] buffer) {
+    final Map<String, String> attribute = new HashMap<>();
+    final ByteBuffer attributeBuffer = ByteBuffer.wrap(buffer);
+
+    final int size = ReadWriteIOUtils.readInt(attributeBuffer);
+    for (int i = 0; i < size; i++) {
+      final String key = ReadWriteIOUtils.readString(attributeBuffer);
+      final String value = ReadWriteIOUtils.readString(attributeBuffer);
+
+      attribute.put(key, value);
+    }
+
+    return attribute;
+  }
+
+  public void tryPersistenceTask(
+      final String taskId,
+      final TaskStateEnum taskState,
+      final Map<String, String> sourceAttribute,
+      final Map<String, String> processorAttribute,
+      final Map<String, String> sinkAttribute) {
+    final String insertSQL =
+        "INSERT INTO task(task_id, task_state , source_attribute, 
processor_attribute, sink_attribute, create_time) values(?, ?, ?, ?, ?, ?)";
+
+    try (final Connection connection = getConnection()) {
+      final PreparedStatement statement = 
connection.prepareStatement(insertSQL);
+
+      final byte[] sourceAttributeBuffer = serialize(sourceAttribute);
+      final byte[] processorAttributeBuffer = serialize(processorAttribute);
+      final byte[] sinkAttributeBuffer = serialize(sinkAttribute);
+
+      statement.setString(1, taskId);
+      statement.setInt(2, taskState.getTaskState());
+      statement.setBytes(3, sourceAttributeBuffer);
+      statement.setBytes(4, processorAttributeBuffer);
+      statement.setBytes(5, sinkAttributeBuffer);
+      statement.setString(6, String.valueOf(new 
Timestamp(System.currentTimeMillis())));
+      statement.executeUpdate();
+    } catch (final SQLException | IOException e) {
+      LOGGER.warn("Failed to persistence task message, because {}", 
e.getMessage());
+    }
+  }
+
+  private byte[] serialize(final Map<String, String> attribute) throws 
IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(attribute.size(), outputStream);
+      for (final Map.Entry<String, String> entry : attribute.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size())
+          .array();
+    }
+  }
+
+  public void tryDeleteTask(final String taskId) {
+    final String deleteSQL = "DELETE FROM task WHERE task_id = ?";
+
+    try (final Connection connection = getConnection()) {
+      final PreparedStatement statement = 
connection.prepareStatement(deleteSQL);
+      statement.setString(1, taskId);
+      statement.executeUpdate();
+    } catch (final SQLException e) {
+      LOGGER.warn("Failed to delete task persistence message, because {}", 
e.getMessage());
+    }
+  }
+
+  public void tryAlterTaskState(final String taskId, final TaskStateEnum 
taskState) {
+    final String alterSQL = "UPDATE task SET task_state = ? WHERE task_id = ?";
+
+    try (final Connection connection = getConnection()) {
+      final PreparedStatement statement = 
connection.prepareStatement(alterSQL);
+      statement.setInt(1, taskState.getTaskState());
+      statement.setString(2, taskId);
+      statement.executeUpdate();
+    } catch (SQLException e) {
+      LOGGER.warn("Failed to alter task persistence message, because {}", 
e.getMessage());
+    }
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java
similarity index 62%
copy from 
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
copy to 
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java
index 36adce3..32ea664 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java
@@ -17,6 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.collector.api.v1.plugin.impl;
+package org.apache.iotdb.collector.plugin.api.customizer;
 
-public class PluginApiServiceRequestValidationHandler {}
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import java.util.Map;
+
+public class CollectorParameters extends PipeParameters {
+  public CollectorParameters(final Map<String, String> attributes) {
+    super(attributes);
+    this.attributes.forEach(
+        (key, value) -> {
+          if (!"taskId".equals(key)) {
+            attributes.put(key, value.replace("_", "-"));
+          }
+        });
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
index c8bfd93..afb8d15 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
@@ -56,7 +56,7 @@ public class HttpPullSource extends PullSource {
 
   @Override
   public Event supply() {
-    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(100));
+    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
     final Event event = new DemoEvent(String.valueOf(new 
Random().nextInt(1000)));
     LOGGER.info("{} created successfully ...", event);
     return event;
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
index 233335c..fa81fc2 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
@@ -66,7 +66,7 @@ public class HttpPushSource extends PushSource {
         final Event event = new DemoEvent(String.valueOf(new 
Random().nextInt(1000)));
         LOGGER.info("{} created successfully ...", event);
         supply(event);
-        TimeUnit.SECONDS.sleep(100);
+        TimeUnit.SECONDS.sleep(2);
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
index adbad7d..627ace6 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
@@ -22,24 +22,46 @@ package org.apache.iotdb.collector.runtime.plugin;
 import 
org.apache.iotdb.collector.runtime.plugin.constructor.ProcessorConstructor;
 import org.apache.iotdb.collector.runtime.plugin.constructor.SinkConstructor;
 import org.apache.iotdb.collector.runtime.plugin.constructor.SourceConstructor;
+import org.apache.iotdb.collector.runtime.plugin.load.PluginClassLoader;
+import org.apache.iotdb.collector.runtime.plugin.load.PluginClassLoaderManager;
+import org.apache.iotdb.collector.runtime.plugin.meta.PluginMeta;
 import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper;
+import org.apache.iotdb.collector.runtime.plugin.utils.PluginFileUtils;
+import org.apache.iotdb.collector.service.PersistenceService;
+import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.PipeSink;
 import org.apache.iotdb.pipe.api.PipeSource;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Objects;
+
 public class PluginRuntime implements AutoCloseable {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PluginRuntime.class);
+
   private final PluginMetaKeeper metaKeeper;
   private final SourceConstructor sourceConstructor;
   private final ProcessorConstructor processorConstructor;
   private final SinkConstructor sinkConstructor;
+  private final PluginClassLoaderManager classLoaderManager;
 
   public PluginRuntime() {
     this.metaKeeper = new PluginMetaKeeper();
     this.sourceConstructor = new SourceConstructor(metaKeeper);
     this.processorConstructor = new ProcessorConstructor(metaKeeper);
     this.sinkConstructor = new SinkConstructor(metaKeeper);
+    this.classLoaderManager = new PluginClassLoaderManager();
   }
 
   public PipeSource constructSource(final PipeParameters sourceParameters) {
@@ -66,24 +88,139 @@ public class PluginRuntime implements AutoCloseable {
     return sinkConstructor.reflectPlugin(sinkParameters);
   }
 
-  public boolean createPlugin() {
-    return true;
+  public PluginClassLoader getClassLoader(final String pluginName) throws 
IOException {
+    return classLoaderManager.getPluginClassLoader(pluginName);
   }
 
-  public boolean alterPlugin() {
-    return true;
-  }
-
-  public boolean startPlugin() {
-    return true;
+  public synchronized Response createPlugin(
+      final String pluginName,
+      final String className,
+      final String jarName,
+      final String jarMD5FromDB,
+      final boolean isRestRequest) {
+    try {
+      // validate whether the plugin jar file exists
+      if (isRestRequest && !PluginFileUtils.isPluginJarFileExist(jarName)) {
+        final String errorMessage =
+            String.format(
+                "Failed to register Plugin %s, because the plugin jar file %s 
is not found",
+                pluginName, jarName);
+        LOGGER.warn(errorMessage);
+        return Response.serverError().entity(errorMessage).build();
+      }
+
+      // validate whether the plugin has been loaded
+      final PluginMeta information = metaKeeper.getPipePluginMeta(pluginName);
+      if (Objects.nonNull(information)) {
+        // validate whether the plugin is builtin plugin
+        if (information.isBuiltin()) {
+          final String errorMessage =
+              String.format(
+                  "Failed to register Plugin %s, because the given Plugin name 
is the same as a built-in Plugin name.",
+                  pluginName);
+          LOGGER.warn(errorMessage);
+          return Response.serverError().entity(errorMessage).build();
+        }
+
+        // otherwise the plugin has been registered
+        final String errorMessage =
+            String.format(
+                "Failed to register Plugin %s, because the Plugin has been 
registered.",
+                pluginName);
+        LOGGER.warn(errorMessage);
+        return Response.serverError().entity(errorMessage).build();
+      }
+
+      // get the plugin jar md5
+      final String jarMD5 =
+          jarMD5FromDB == null
+              ? DigestUtils.md5Hex(
+                  
Files.newInputStream(Paths.get(PluginFileUtils.getPluginJarFilePath(jarName))))
+              : jarMD5FromDB;
+
+      // If the {pluginName} directory already exists, delete the directory 
and the files under it,
+      // recreate the directory, and move the files to the new directory. If 
an exception occurs in
+      // the middle, delete the created directory.
+      final String pluginJarFileNameWithMD5 =
+          PluginFileUtils.getPluginJarFileNameWithMD5(jarName, jarMD5);
+      PluginFileUtils.savePluginToInstallDir(pluginName, jarName, 
pluginJarFileNameWithMD5);
+
+      // create and save plugin class loader
+      final PluginClassLoader classLoader =
+          classLoaderManager.createPluginClassLoader(
+              PluginFileUtils.getPluginInstallDirPath(pluginName));
+
+      final Class<?> pluginClass = Class.forName(className, true, classLoader);
+      @SuppressWarnings("unused") // ensure that it is a PipePlugin class
+      final PipePlugin ignored = (PipePlugin) 
pluginClass.getDeclaredConstructor().newInstance();
+
+      classLoaderManager.addPluginClassLoader(pluginName, classLoader);
+      metaKeeper.addPipePluginMeta(
+          pluginName, new PluginMeta(pluginName, className, false, jarName, 
jarMD5));
+      metaKeeper.addJarNameAndMd5(jarName, jarMD5);
+
+      // storage registered plugin info
+      if (isRestRequest) {
+        PersistenceService.plugin()
+            .ifPresent(
+                pluginPersistence ->
+                    pluginPersistence.tryPersistencePlugin(pluginName, 
className, jarName, jarMD5));
+      }
+
+      final String successMessage = String.format("Successfully register 
Plugin %s", pluginName);
+      LOGGER.info(successMessage);
+      return Response.ok().entity(successMessage).build();
+    } catch (final Exception e) {
+      final String errorMessage =
+          String.format("Failed to register Plugin %s, because %s", 
pluginName, e);
+      LOGGER.warn(errorMessage);
+      return Response.serverError().entity(errorMessage).build();
+    }
   }
 
-  public boolean stopPlugin() {
-    return true;
+  public synchronized Response dropPlugin(final String pluginName) {
+    try {
+      final PluginMeta information = metaKeeper.getPipePluginMeta(pluginName);
+      if (Objects.nonNull(information) && information.isBuiltin()) {
+        final String errorMessage =
+            String.format("Failed to deregister builtin Plugin %s.", 
pluginName);
+        LOGGER.warn(errorMessage);
+        return Response.serverError().entity(errorMessage).build();
+      }
+
+      // if it is needed to delete jar file of the plugin, delete both jar 
file and md5
+      final String installedFileName =
+          FilenameUtils.getBaseName(information.getJarName())
+              + "-"
+              + information.getJarMD5()
+              + "."
+              + FilenameUtils.getExtension(information.getJarName());
+      
PluginFileUtils.removePluginFileUnderLibRoot(information.getPluginName(), 
installedFileName);
+
+      // remove anyway
+      metaKeeper.removeJarNameAndMd5IfPossible(pluginName);
+      metaKeeper.removePipePluginMeta(pluginName);
+      classLoaderManager.removePluginClassLoader(pluginName);
+
+      // remove plugin info from sqlite
+      PersistenceService.plugin()
+          .ifPresent(pluginPersistence -> 
pluginPersistence.tryDeletePlugin(pluginName));
+
+      final String successMessage = String.format("Successfully deregister 
Plugin %s", pluginName);
+      LOGGER.info(successMessage);
+      return Response.ok().entity(successMessage).build();
+    } catch (final IOException e) {
+      final String errorMessage =
+          String.format(
+              "Failed to deregister builtin Plugin %s, because %s", 
pluginName, e.getMessage());
+      LOGGER.warn(errorMessage);
+      return Response.serverError().entity(errorMessage).build();
+    }
   }
 
-  public boolean dropPlugin() {
-    return true;
+  public Response showPlugin() {
+    final Iterable<PluginMeta> pluginMetas = metaKeeper.getAllPipePluginMeta();
+    return Response.ok().entity(pluginMetas).build();
   }
 
   @Override
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
index c64ef39..72bccb9 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.collector.runtime.plugin.constructor;
 
 import org.apache.iotdb.collector.runtime.plugin.meta.PluginMeta;
 import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper;
+import org.apache.iotdb.collector.service.RuntimeService;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -28,7 +29,6 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Supplier;
@@ -81,13 +81,14 @@ public abstract class PluginConstructor {
       final Class<?> pluginClass =
           information.isBuiltin()
               ? 
pluginMetaKeeper.getBuiltinPluginClass(information.getPluginName())
-              : Class.forName(information.getClassName()); // TODO
+              : Class.forName(
+                  information.getClassName(),
+                  true,
+                  RuntimeService.plugin().isPresent()
+                      ? 
RuntimeService.plugin().get().getClassLoader(pluginName)
+                      : null);
       return (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
-    } catch (InstantiationException
-        | InvocationTargetException
-        | NoSuchMethodException
-        | IllegalAccessException
-        | ClassNotFoundException e) {
+    } catch (final Exception e) {
       String errorMessage =
           String.format(
               "Failed to reflect PipePlugin %s(%s) instance, because %s",
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
index 066cf08..f5dd6be 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
@@ -38,7 +38,7 @@ public class SinkConstructor extends PluginConstructor {
 
   @Override
   public final PipeSink reflectPlugin(PipeParameters sinkParameters) {
-    if (sinkParameters.hasAttribute("sink")) {
+    if (!sinkParameters.hasAttribute("sink")) {
       throw new IllegalArgumentException("sink attribute is required");
     }
 
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
index f4c2cd0..2899862 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
@@ -42,7 +42,7 @@ public class SourceConstructor extends PluginConstructor {
 
   @Override
   public final PipeSource reflectPlugin(PipeParameters sourceParameters) {
-    if (sourceParameters.hasAttribute("source")) {
+    if (!sourceParameters.hasAttribute("source")) {
       throw new IllegalArgumentException("source attribute is required");
     }
 
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java
new file mode 100644
index 0000000..7e93a1b
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.plugin.load;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@ThreadSafe
+public class PluginClassLoader extends URLClassLoader {
+
+  private final String libRoot;
+  private volatile boolean deprecated;
+
+  public PluginClassLoader(final String libRoot) throws IOException {
+    super(new URL[0]);
+    this.libRoot = libRoot;
+    this.deprecated = false;
+    addUrls();
+  }
+
+  private void addUrls() throws IOException {
+    try (final Stream<Path> pathStream = Files.walk(new 
File(libRoot).toPath())) {
+      for (final Path path :
+          pathStream.filter(path -> 
!path.toFile().isDirectory()).collect(Collectors.toList())) {
+        super.addURL(path.toUri().toURL());
+      }
+    }
+  }
+
+  public synchronized void markAsDeprecated() throws IOException {
+    deprecated = true;
+    closeIfPossible();
+  }
+
+  public void closeIfPossible() throws IOException {
+    if (deprecated) {
+      close();
+    }
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java
new file mode 100644
index 0000000..c60a868
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.plugin.load;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@NotThreadSafe
+public class PluginClassLoaderManager {
+
+  private final Map<String, PluginClassLoader> pluginNameToClassLoaderMap;
+
+  public PluginClassLoaderManager() {
+    this.pluginNameToClassLoaderMap = new ConcurrentHashMap<>();
+  }
+
+  public void removePluginClassLoader(final String pluginName) throws 
IOException {
+    final PluginClassLoader classLoader = 
pluginNameToClassLoaderMap.remove(pluginName);
+    if (classLoader != null) {
+      classLoader.markAsDeprecated();
+    }
+  }
+
+  public PluginClassLoader getPluginClassLoader(final String pluginName) 
throws IOException {
+    return pluginNameToClassLoaderMap.get(pluginName.toUpperCase());
+  }
+
+  public void addPluginClassLoader(final String pluginName, final 
PluginClassLoader classLoader) {
+    pluginNameToClassLoaderMap.put(pluginName.toUpperCase(), classLoader);
+  }
+
+  public PluginClassLoader createPluginClassLoader(final String pluginDirPath) 
throws IOException {
+    return new PluginClassLoader(pluginDirPath);
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
index 4d9ddfd..b6b1f90 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
@@ -19,30 +19,32 @@
 
 package org.apache.iotdb.collector.runtime.plugin.meta;
 
+import org.apache.iotdb.collector.plugin.builtin.BuiltinPlugin;
+
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class PluginMetaKeeper {
 
-  protected final Map<String, PluginMeta> pipePluginNameToMetaMap = new 
ConcurrentHashMap<>();
-  protected final Map<String, Class<?>> builtinPipePluginNameToClassMap = new 
ConcurrentHashMap<>();
+  private final Map<String, PluginMeta> pipePluginNameToMetaMap = new 
ConcurrentHashMap<>();
+  private final Map<String, Class<?>> builtinPipePluginNameToClassMap = new 
ConcurrentHashMap<>();
+  private final Map<String, String> jarNameToMd5Map = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> jarNameToReferenceCountMap = new 
ConcurrentHashMap<>();
 
   public PluginMetaKeeper() {
     loadBuiltinPlugins();
   }
 
   private void loadBuiltinPlugins() {
-    //    for (final BuiltinPipePlugin builtinPipePlugin : 
BuiltinPipePlugin.values()) {
-    //      final String pipePluginName = 
builtinPipePlugin.getPipePluginName();
-    //      final Class<?> pipePluginClass = 
builtinPipePlugin.getPipePluginClass();
-    //      final String className = builtinPipePlugin.getClassName();
-    //
-    //      addPipePluginMeta(pipePluginName, new PluginMeta(pipePluginName, 
className));
-    //      addBuiltinPluginClass(pipePluginName, pipePluginClass);
-    //      addPipePluginVisibility(
-    //          pipePluginName, 
VisibilityUtils.calculateFromPluginClass(pipePluginClass));
-    //    }
+    for (final BuiltinPlugin builtinPlugin : BuiltinPlugin.values()) {
+      final String pluginName = builtinPlugin.getPluginName();
+      final Class<?> pluginClass = builtinPlugin.getPluginClass();
+      final String className = builtinPlugin.getClassName();
+
+      addPipePluginMeta(pluginName, new PluginMeta(pluginName, className));
+      addBuiltinPluginClass(pluginName, pluginClass);
+    }
   }
 
   public void addPipePluginMeta(String pluginName, PluginMeta pluginMeta) {
@@ -82,6 +84,35 @@ public class PluginMetaKeeper {
     return null;
   }
 
+  public boolean containsJar(final String jarName) {
+    return jarNameToMd5Map.containsKey(jarName);
+  }
+
+  public boolean jarNameExistsAndMatchesMd5(final String jarName, final String 
md5) {
+    return jarNameToMd5Map.containsKey(jarName) && 
jarNameToMd5Map.get(jarName).equals(md5);
+  }
+
+  public void addJarNameAndMd5(final String jarName, final String md5) {
+    if (jarNameToReferenceCountMap.containsKey(jarName)) {
+      jarNameToReferenceCountMap.put(jarName, 
jarNameToReferenceCountMap.get(jarName) + 1);
+    } else {
+      jarNameToReferenceCountMap.put(jarName, 1);
+      jarNameToMd5Map.put(jarName, md5);
+    }
+  }
+
+  public void removeJarNameAndMd5IfPossible(final String jarName) {
+    if (jarNameToReferenceCountMap.containsKey(jarName)) {
+      final int count = jarNameToReferenceCountMap.get(jarName);
+      if (count == 1) {
+        jarNameToReferenceCountMap.remove(jarName);
+        jarNameToMd5Map.remove(jarName);
+      } else {
+        jarNameToReferenceCountMap.put(jarName, count - 1);
+      }
+    }
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java
new file mode 100644
index 0000000..ebc693d
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.plugin.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
+import static 
org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_INSTALL_LIB_DIR;
+import static 
org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_LIB_DIR;
+
+public class PluginFileUtils {
+
+  public static void savePluginToInstallDir(
+      final String pluginName, final String jarName, final String 
jarNameWithMD5)
+      throws IOException {
+    final Path pluginInstallPath =
+        Paths.get(PLUGIN_INSTALL_LIB_DIR.value() + File.separator + 
pluginName);
+    final Path pluginJarInstallPath =
+        Paths.get(getPluginJarFileWithMD5FilePath(pluginName, jarNameWithMD5));
+
+    if (!Files.exists(pluginInstallPath)) {
+      FileUtils.forceMkdir(pluginInstallPath.toFile());
+    }
+    if (Files.exists(pluginJarInstallPath)) {
+      return;
+    }
+
+    FileUtils.moveFile(
+        new File(getPluginJarFilePath(jarName)),
+        pluginJarInstallPath.toFile(),
+        StandardCopyOption.REPLACE_EXISTING);
+  }
+
+  public static boolean isPluginJarFileExist(final String jarName) {
+    return Files.exists(Paths.get(getPluginJarFilePath(jarName)));
+  }
+
+  public static String getPluginJarFilePath(final String jarName) {
+    return PLUGIN_LIB_DIR.value() + File.separator + jarName;
+  }
+
+  public static String getPluginJarFileWithMD5FilePath(
+      final String pluginName, final String jarNameWithMD5) {
+    return getPluginInstallDirPath(pluginName) + File.separator + 
jarNameWithMD5;
+  }
+
+  public static String getPluginInstallDirPath(final String pluginName) {
+    return PLUGIN_INSTALL_LIB_DIR.value() + File.separator + pluginName;
+  }
+
+  public static String getPluginJarFileNameWithMD5(final String jarName, final 
String jarMD5) {
+    return FilenameUtils.getBaseName(jarName)
+        + "-"
+        + jarMD5
+        + "."
+        + FilenameUtils.getExtension(jarName);
+  }
+
+  public static void removePluginFileUnderLibRoot(final String pluginName, 
final String fileName)
+      throws IOException {
+    final Path pluginDirPath = Paths.get(getPluginInstallFilePath(pluginName, 
fileName));
+
+    Files.deleteIfExists(pluginDirPath);
+    Files.deleteIfExists(pluginDirPath.getParent());
+  }
+
+  public static String getPluginInstallFilePath(final String pluginName, final 
String fileName) {
+    return PLUGIN_INSTALL_LIB_DIR.value() + File.separator + pluginName + 
File.separator + fileName;
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
index ac270ce..2dbcc71 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
@@ -19,22 +19,18 @@
 
 package org.apache.iotdb.collector.runtime.task;
 
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.collector.plugin.api.customizer.CollectorParameters;
 
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.LockSupport;
 
 public abstract class Task {
 
   protected final String taskId;
-  protected final PipeParameters parameters;
+  protected final CollectorParameters parameters;
 
   protected final int parallelism;
 
-  private static final long CHECK_RUNNING_INTERVAL_NANOS = 100_000_000L;
-  protected final AtomicBoolean isRunning = new AtomicBoolean(false);
-  protected final AtomicBoolean isDropped = new AtomicBoolean(false);
+  protected final TaskDispatch dispatch;
 
   protected Task(
       final String taskId,
@@ -42,49 +38,36 @@ public abstract class Task {
       final String parallelismKey,
       final int parallelismValue) {
     this.taskId = taskId;
-    this.parameters = new PipeParameters(attributes);
+    this.parameters = new CollectorParameters(attributes);
 
     this.parallelism = parameters.getIntOrDefault(parallelismKey, 
parallelismValue);
-  }
-
-  public void resume() {
-    isRunning.set(true);
-  }
-
-  public void pause() {
-    isRunning.set(false);
-  }
 
-  protected void waitUntilRunningOrDropped() {
-    while (!isRunning.get() && !isDropped.get()) {
-      LockSupport.parkNanos(CHECK_RUNNING_INTERVAL_NANOS);
-    }
+    this.dispatch = new TaskDispatch();
   }
 
   public final synchronized void create() throws Exception {
-    resume();
+    dispatch.resume();
     createInternal();
   }
 
   public abstract void createInternal() throws Exception;
 
   public final synchronized void start() throws Exception {
-    resume();
+    dispatch.resume();
     startInternal();
   }
 
   public abstract void startInternal() throws Exception;
 
   public final synchronized void stop() throws Exception {
-    pause();
+    dispatch.pause();
     stopInternal();
   }
 
   public abstract void stopInternal() throws Exception;
 
   public final synchronized void drop() throws Exception {
-    pause();
-    isDropped.set(true);
+    dispatch.remove();
     dropInternal();
   }
 
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java
similarity index 50%
copy from 
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
copy to 
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java
index 36adce3..9a904c0 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java
@@ -17,6 +17,37 @@
  * under the License.
  */
 
-package org.apache.iotdb.collector.api.v1.plugin.impl;
+package org.apache.iotdb.collector.runtime.task;
 
-public class PluginApiServiceRequestValidationHandler {}
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+public class TaskDispatch {
+
+  private static final long CHECK_RUNNING_INTERVAL_NANOS = 100_000_000L;
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
+  private final AtomicBoolean isDropped = new AtomicBoolean(false);
+
+  public void resume() {
+    isRunning.set(true);
+  }
+
+  public void pause() {
+    isRunning.set(false);
+  }
+
+  public void remove() {
+    pause();
+    isDropped.set(true);
+  }
+
+  public boolean isRunning() {
+    return isRunning.get();
+  }
+
+  public void waitUntilRunningOrDropped() {
+    while (!isRunning.get() && !isDropped.get()) {
+      LockSupport.parkNanos(CHECK_RUNNING_INTERVAL_NANOS);
+    }
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
index ec3a4f1..f96a40f 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.collector.runtime.task;
 import org.apache.iotdb.collector.runtime.task.processor.ProcessorTask;
 import org.apache.iotdb.collector.runtime.task.sink.SinkTask;
 import org.apache.iotdb.collector.runtime.task.source.SourceTask;
+import org.apache.iotdb.collector.service.PersistenceService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import javax.ws.rs.core.Response;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class TaskRuntime implements AutoCloseable {
@@ -39,9 +41,11 @@ public class TaskRuntime implements AutoCloseable {
 
   public synchronized Response createTask(
       final String taskId,
+      final TaskStateEnum taskState,
       final Map<String, String> sourceAttribute,
       final Map<String, String> processorAttribute,
-      final Map<String, String> sinkAttribute) {
+      final Map<String, String> sinkAttribute,
+      final boolean isRestRequest) {
     try {
       if (tasks.containsKey(taskId)) {
         return Response.status(Response.Status.CONFLICT)
@@ -53,14 +57,28 @@ public class TaskRuntime implements AutoCloseable {
       final ProcessorTask processorTask =
           new ProcessorTask(taskId, processorAttribute, 
sinkTask.makeProducer());
       final SourceTask sourceTask =
-          SourceTask.construct(taskId, sourceAttribute, 
processorTask.makeProducer());
+          SourceTask.construct(taskId, sourceAttribute, 
processorTask.makeProducer(), taskState);
 
       final TaskCombiner taskCombiner = new TaskCombiner(sourceTask, 
processorTask, sinkTask);
-      tasks.put(taskId, taskCombiner);
       taskCombiner.create();
 
+      tasks.put(taskId, taskCombiner);
+
+      // storage task info to sqlite
+      if (isRestRequest) {
+        PersistenceService.task()
+            .ifPresent(
+                taskPersistence ->
+                    taskPersistence.tryPersistenceTask(
+                        taskId,
+                        TaskStateEnum.RUNNING,
+                        sourceAttribute,
+                        processorAttribute,
+                        sinkAttribute));
+      }
+
       LOGGER.info("Successfully created task {}", taskId);
-      return Response.status(Response.Status.CREATED)
+      return Response.status(Response.Status.OK)
           .entity(String.format("Successfully created task %s", taskId))
           .build();
     } catch (final Exception e) {
@@ -81,6 +99,10 @@ public class TaskRuntime implements AutoCloseable {
     try {
       tasks.get(taskId).start();
 
+      PersistenceService.task()
+          .ifPresent(
+              taskPersistence -> taskPersistence.tryAlterTaskState(taskId, 
TaskStateEnum.RUNNING));
+
       LOGGER.info("Task {} start successfully", taskId);
       return Response.status(Response.Status.OK)
           .entity(String.format("task %s start successfully", taskId))
@@ -103,6 +125,10 @@ public class TaskRuntime implements AutoCloseable {
     try {
       tasks.get(taskId).stop();
 
+      PersistenceService.task()
+          .ifPresent(
+              taskPersistence -> taskPersistence.tryAlterTaskState(taskId, 
TaskStateEnum.STOPPED));
+
       LOGGER.info("Task {} stop successfully", taskId);
       return Response.status(Response.Status.OK)
           .entity(String.format("task %s stop successfully", taskId))
@@ -116,14 +142,19 @@ public class TaskRuntime implements AutoCloseable {
   }
 
   public synchronized Response dropTask(final String taskId) {
-    if (!tasks.containsKey(taskId)) {
+    if (Objects.isNull(tasks.get(taskId)) || !tasks.containsKey(taskId)) {
       return Response.status(Response.Status.NOT_FOUND)
           .entity(String.format("task %s not found", taskId))
           .build();
     }
 
     try {
-      tasks.remove(taskId).drop();
+      final TaskCombiner task = tasks.get(taskId);
+      task.drop();
+      tasks.remove(taskId);
+
+      // remove task info from sqlite
+      PersistenceService.task().ifPresent(taskPersistence -> 
taskPersistence.tryDeleteTask(taskId));
 
       LOGGER.info("Task {} drop successfully", taskId);
       return Response.status(Response.Status.OK)
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java
similarity index 74%
copy from 
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
copy to 
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java
index 36adce3..159016d 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java
@@ -16,7 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.collector.runtime.task;
 
-package org.apache.iotdb.collector.api.v1.plugin.impl;
+public enum TaskStateEnum {
+  RUNNING(0),
+  STOPPED(1);
 
-public class PluginApiServiceRequestValidationHandler {}
+  private final int taskState;
+
+  TaskStateEnum(final int taskState) {
+    this.taskState = taskState;
+  }
+
+  public int getTaskState() {
+    return taskState;
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
index c153506..9990871 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.collector.runtime.task.processor;
 
+import org.apache.iotdb.collector.runtime.task.TaskDispatch;
 import org.apache.iotdb.collector.runtime.task.event.EventContainer;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -33,6 +34,8 @@ class ProcessorConsumer implements 
WorkHandler<EventContainer> {
   private final PipeProcessor processor;
   private final EventCollector eventCollector;
 
+  private TaskDispatch dispatch;
+
   ProcessorConsumer(final PipeProcessor processor, final EventCollector 
eventCollector) {
     this.processor = processor;
     this.eventCollector = eventCollector;
@@ -44,6 +47,8 @@ class ProcessorConsumer implements 
WorkHandler<EventContainer> {
 
   @Override
   public void onEvent(final EventContainer eventContainer) throws Exception {
+    dispatch.waitUntilRunningOrDropped();
+
     // TODO: retry strategy
     final Event event = eventContainer.getEvent();
     if (event instanceof TabletInsertionEvent) {
@@ -54,4 +59,8 @@ class ProcessorConsumer implements 
WorkHandler<EventContainer> {
       processor.process(event, eventCollector);
     }
   }
+
+  public void setDispatch(final TaskDispatch dispatch) {
+    this.dispatch = dispatch;
+  }
 }
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
index e671ea0..37fbf4e 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
@@ -99,6 +99,7 @@ public class ProcessorTask extends Task {
     for (int i = 0; i < parallelism; i++) {
       processorConsumers[i] =
           new ProcessorConsumer(pluginRuntime.constructProcessor(parameters), 
sinkProducer);
+      processorConsumers[i].setDispatch(dispatch);
       try {
         processorConsumers[i].consumer().validate(new 
PipeParameterValidator(parameters));
         processorConsumers[i]
@@ -118,16 +119,18 @@ public class ProcessorTask extends Task {
     disruptor.handleEventsWithWorkerPool(processorConsumers);
 
     disruptor.setDefaultExceptionHandler(new ProcessorExceptionHandler());
+
+    disruptor.start();
   }
 
   @Override
   public void startInternal() {
-    disruptor.start();
+    // do nothing
   }
 
   @Override
   public void stopInternal() {
-    disruptor.halt();
+    // do nothing
   }
 
   @Override
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
index 2fe13f0..c17691e 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.collector.runtime.task.sink;
 
+import org.apache.iotdb.collector.runtime.task.TaskDispatch;
 import org.apache.iotdb.collector.runtime.task.event.EventContainer;
 import org.apache.iotdb.pipe.api.PipeSink;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -31,6 +32,8 @@ class SinkConsumer implements WorkHandler<EventContainer> {
 
   private final PipeSink sink;
 
+  private TaskDispatch dispatch;
+
   SinkConsumer(final PipeSink sink) {
     this.sink = sink;
   }
@@ -41,6 +44,8 @@ class SinkConsumer implements WorkHandler<EventContainer> {
 
   @Override
   public void onEvent(EventContainer eventContainer) throws Exception {
+    dispatch.waitUntilRunningOrDropped();
+
     // TODO: retry strategy
     final Event event = eventContainer.getEvent();
     if (event instanceof TabletInsertionEvent) {
@@ -51,4 +56,8 @@ class SinkConsumer implements WorkHandler<EventContainer> {
       sink.transfer(event);
     }
   }
+
+  public void setDispatch(final TaskDispatch dispatch) {
+    this.dispatch = dispatch;
+  }
 }
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
index e3e66db..d540b92 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
@@ -89,6 +89,7 @@ public class SinkTask extends Task {
     consumers = new SinkConsumer[parallelism];
     for (int i = 0; i < parallelism; i++) {
       consumers[i] = new SinkConsumer(pluginRuntime.constructSink(parameters));
+      consumers[i].setDispatch(dispatch);
       try {
         consumers[i].consumer().validate(new 
PipeParameterValidator(parameters));
         consumers[i]
@@ -109,16 +110,18 @@ public class SinkTask extends Task {
     disruptor.handleEventsWithWorkerPool(consumers);
 
     disruptor.setDefaultExceptionHandler(new SinkExceptionHandler());
+
+    disruptor.start();
   }
 
   @Override
   public void startInternal() {
-    disruptor.start();
+    // do nothing
   }
 
   @Override
   public void stopInternal() {
-    disruptor.halt();
+    // do nothing
   }
 
   @Override
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
index ca0c8c4..2cf5f67 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.collector.runtime.task.source;
 
 import org.apache.iotdb.collector.runtime.plugin.PluginRuntime;
 import org.apache.iotdb.collector.runtime.task.Task;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
 import org.apache.iotdb.collector.runtime.task.event.EventCollector;
 import org.apache.iotdb.collector.runtime.task.source.pull.PullSourceTask;
 import org.apache.iotdb.collector.runtime.task.source.push.PushSourceTask;
@@ -34,20 +35,24 @@ import static 
org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_SOURCE_P
 public abstract class SourceTask extends Task {
 
   protected final EventCollector processorProducer;
+  protected TaskStateEnum taskState;
 
   protected SourceTask(
       final String taskId,
       final Map<String, String> attributes,
-      final EventCollector processorProducer) {
+      final EventCollector processorProducer,
+      final TaskStateEnum taskState) {
     super(
         taskId, attributes, TASK_SOURCE_PARALLELISM_NUM.key(), 
TASK_SOURCE_PARALLELISM_NUM.value());
     this.processorProducer = processorProducer;
+    this.taskState = taskState;
   }
 
   public static SourceTask construct(
       final String taskId,
       final Map<String, String> attributes,
-      final EventCollector processorProducer)
+      final EventCollector processorProducer,
+      final TaskStateEnum taskState)
       throws Exception {
     final PluginRuntime pluginRuntime =
         RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() : 
null;
@@ -57,10 +62,10 @@ public abstract class SourceTask extends Task {
 
     final PipeParameters parameters = new PipeParameters(attributes);
     if (pluginRuntime.isPullSource(parameters)) {
-      return new PullSourceTask(taskId, attributes, processorProducer);
+      return new PullSourceTask(taskId, attributes, processorProducer, 
taskState);
     }
     if (pluginRuntime.isPushSource(parameters)) {
-      return new PushSourceTask(taskId, attributes, processorProducer);
+      return new PushSourceTask(taskId, attributes, processorProducer, 
taskState);
     }
     throw new IllegalArgumentException("Unsupported source type");
   }
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
index b47054a..d426526 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.collector.runtime.task.source.pull;
 import org.apache.iotdb.collector.plugin.api.PullSource;
 import 
org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration;
 import org.apache.iotdb.collector.runtime.plugin.PluginRuntime;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
 import org.apache.iotdb.collector.runtime.task.event.EventCollector;
 import org.apache.iotdb.collector.runtime.task.source.SourceTask;
 import org.apache.iotdb.collector.service.RuntimeService;
@@ -50,8 +51,9 @@ public class PullSourceTask extends SourceTask {
   public PullSourceTask(
       final String taskId,
       final Map<String, String> attributes,
-      final EventCollector processorProducer) {
-    super(taskId, attributes, processorProducer);
+      final EventCollector processorProducer,
+      final TaskStateEnum taskState) {
+    super(taskId, attributes, processorProducer, taskState);
   }
 
   @Override
@@ -101,14 +103,14 @@ public class PullSourceTask extends SourceTask {
           .get(taskId)
           .submit(
               () -> {
-                while (!isDropped.get()) {
+                while (dispatch.isRunning() && 
TaskStateEnum.RUNNING.equals(taskState)) {
                   try {
                     consumers[finalI].onScheduler();
                   } catch (final Exception e) {
                     LOGGER.warn("Failed to pull source", e);
                   }
 
-                  waitUntilRunningOrDropped();
+                  dispatch.waitUntilRunningOrDropped();
                 }
               });
     }
@@ -116,12 +118,12 @@ public class PullSourceTask extends SourceTask {
 
   @Override
   public void startInternal() {
-    // do nothing
+    this.taskState = TaskStateEnum.RUNNING;
   }
 
   @Override
   public void stopInternal() {
-    // do nothing
+    this.taskState = TaskStateEnum.STOPPED;
   }
 
   @Override
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
index 938f29f..7419ae7 100644
--- 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.collector.runtime.task.source.push;
 import org.apache.iotdb.collector.plugin.api.PushSource;
 import 
org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration;
 import org.apache.iotdb.collector.runtime.plugin.PluginRuntime;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
 import org.apache.iotdb.collector.runtime.task.event.EventCollector;
 import org.apache.iotdb.collector.runtime.task.source.SourceTask;
 import org.apache.iotdb.collector.service.RuntimeService;
@@ -41,8 +42,9 @@ public class PushSourceTask extends SourceTask {
   public PushSourceTask(
       final String taskId,
       final Map<String, String> sourceParams,
-      final EventCollector processorProducer) {
-    super(taskId, sourceParams, processorProducer);
+      final EventCollector processorProducer,
+      final TaskStateEnum taskState) {
+    super(taskId, sourceParams, processorProducer, taskState);
   }
 
   @Override
@@ -64,7 +66,9 @@ public class PushSourceTask extends SourceTask {
         pushSources[i].customize(
             parameters,
             new CollectorSourceRuntimeConfiguration(taskId, creationTime, 
parallelism, i));
-        pushSources[i].start();
+        if (TaskStateEnum.RUNNING.equals(taskState)) {
+          pushSources[i].start();
+        }
       } catch (final Exception e) {
         try {
           pushSources[i].close();
@@ -78,12 +82,42 @@ public class PushSourceTask extends SourceTask {
 
   @Override
   public void startInternal() {
-    // do nothing
+    if (this.taskState.equals(TaskStateEnum.RUNNING)) {
+      return;
+    }
+
+    if (pushSources != null) {
+      for (int i = 0; i < parallelism; i++) {
+        try {
+          pushSources[i].start();
+        } catch (final Exception e) {
+          LOGGER.warn("Failed to restart push source", e);
+          return;
+        }
+      }
+    }
+
+    this.taskState = TaskStateEnum.RUNNING;
   }
 
   @Override
   public void stopInternal() {
-    // do nothing
+    if (this.taskState.equals(TaskStateEnum.STOPPED)) {
+      return;
+    }
+
+    if (pushSources != null) {
+      for (int i = 0; i < parallelism; i++) {
+        try {
+          pushSources[i].close();
+        } catch (final Exception e) {
+          LOGGER.warn("Failed to stop source", e);
+          return;
+        }
+      }
+
+      this.taskState = TaskStateEnum.STOPPED;
+    }
   }
 
   @Override
diff --git 
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java
 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java
new file mode 100644
index 0000000..a5eb42e
--- /dev/null
+++ 
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.service;
+
+import org.apache.iotdb.collector.persistence.DBConstant;
+import org.apache.iotdb.collector.persistence.PluginPersistence;
+import org.apache.iotdb.collector.persistence.TaskPersistence;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_INSTALL_LIB_DIR;
+
+public class PersistenceService implements IService {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PersistenceService.class);
+
+  private static final AtomicReference<PluginPersistence> PLUGIN = new 
AtomicReference<>();
+  private static final AtomicReference<TaskPersistence> TASK = new 
AtomicReference<>();
+
+  @Override
+  public void start() {
+    initPluginDir();
+
+    PLUGIN.set(new PluginPersistence(DBConstant.PLUGIN_DATABASE_URL));
+    TASK.set(new TaskPersistence(DBConstant.TASK_DATABASE_URL));
+
+    PLUGIN.get().tryResume();
+    TASK.get().tryResume();
+  }
+
+  private void initPluginDir() {
+    final Path pluginInstallPath = Paths.get(PLUGIN_INSTALL_LIB_DIR.value());
+    try {
+      if (!Files.exists(pluginInstallPath)) {
+        FileUtils.forceMkdir(pluginInstallPath.toFile());
+      }
+    } catch (final IOException e) {
+      LOGGER.warn("Failed to create plugin install directory", e);
+    }
+  }
+
+  public static Optional<PluginPersistence> plugin() {
+    return Optional.of(PLUGIN.get());
+  }
+
+  public static Optional<TaskPersistence> task() {
+    return Optional.of(TASK.get());
+  }
+
+  @Override
+  public void stop() {
+    PLUGIN.set(null);
+    TASK.set(null);
+  }
+
+  @Override
+  public String name() {
+    return "PersistenceService";
+  }
+}
diff --git 
a/iotdb-collector/collector-core/src/main/resources/application.properties 
b/iotdb-collector/collector-core/src/main/resources/application.properties
index 244ed55..925ef65 100644
--- a/iotdb-collector/collector-core/src/main/resources/application.properties
+++ b/iotdb-collector/collector-core/src/main/resources/application.properties
@@ -54,3 +54,17 @@ task_processor_ring_buffer_size=1024
 # Effective mode: on every start
 # Data type: int
 task_sink_ring_buffer_size=1024
+
+####################
+### Plugin Configuration
+####################
+
+# The location of plugin jar file
+# Effective mode: on every start
+# Data type: string
+plugin_lib_dir=ext/plugin
+
+# Installation location of plugin jar file
+# Effective mode: on every start
+# Data type: string
+plugin_install_lib_dir=ext/plugin/install
\ No newline at end of file
diff --git a/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml 
b/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
index 25ea5b5..eec0e5f 100644
--- a/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
+++ b/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
@@ -41,50 +41,21 @@ paths:
         "200":
           $ref: '#/components/responses/SuccessExecutionStatus'
 
-  /plugin/v1/alter:
-    post:
-      operationId: alterPlugin
-      requestBody:
-        content:
-          application/json:
-            schema:
-              $ref: '#/components/schemas/AlterPluginRequest'
-      responses:
-        "200":
-          $ref: '#/components/responses/SuccessExecutionStatus'
-
-  /plugin/v1/start:
-    post:
-      operationId: startPlugin
-      requestBody:
-        content:
-          application/json:
-            schema:
-              $ref: '#/components/schemas/StartPluginRequest'
-      responses:
-        "200":
-          $ref: '#/components/responses/SuccessExecutionStatus'
-
-  /plugin/v1/stop:
+  /plugin/v1/drop:
     post:
-      operationId: stopPlugin
+      operationId: dropPlugin
       requestBody:
         content:
           application/json:
             schema:
-              $ref: '#/components/schemas/StopPluginRequest'
+              $ref: '#/components/schemas/DropPluginRequest'
       responses:
         "200":
           $ref: '#/components/responses/SuccessExecutionStatus'
 
-  /plugin/v1/drop:
+  /plugin/v1/show:
     post:
-      operationId: dropPlugin
-      requestBody:
-        content:
-          application/json:
-            schema:
-              $ref: '#/components/schemas/DropPluginRequest'
+      operationId: showPlugin
       responses:
         "200":
           $ref: '#/components/responses/SuccessExecutionStatus'
@@ -93,51 +64,16 @@ components:
   schemas:
     CreatePluginRequest:
       properties:
-        sourceAttribute:
-          type: object
-          additionalProperties:
-            type: string
-        processorAttribute:
-          type: object
-          additionalProperties:
-            type: string
-        sinkAttribute:
-          type: object
-          additionalProperties:
-            type: string
-        taskId:
-          type: string
-
-    AlterPluginRequest:
-      properties:
-        sourceAttribute:
-          type: object
-          additionalProperties:
-            type: string
-        processorAttribute:
-          type: object
-          additionalProperties:
-            type: string
-        sinkAttribute:
-          type: object
-          additionalProperties:
-            type: string
-        taskId:
+        pluginName:
           type: string
-
-    StartPluginRequest:
-      properties:
-        taskId:
+        className:
           type: string
-
-    StopPluginRequest:
-      properties:
-        taskId:
+        jarName:
           type: string
 
     DropPluginRequest:
       properties:
-        taskId:
+        pluginName:
           type: string
 
     ExecutionStatus:
diff --git a/pom.xml b/pom.xml
index e19571c..41af70a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -171,6 +171,7 @@
         <spring-boot.version>2.7.18</spring-boot.version>
         <!-- This is the last version to support the javax namespace -->
         <spring.version>5.3.39</spring.version>
+        <sqlite.version>3.49.1.0</sqlite.version>
         <!-- This was the last version to support Java 8 -->
         <swagger.version>1.6.14</swagger.version>
         <thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
@@ -862,6 +863,11 @@
                 <artifactId>disruptor</artifactId>
                 <version>${disruptor.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.xerial</groupId>
+                <artifactId>sqlite-jdbc</artifactId>
+                <version>${sqlite.version}</version>
+            </dependency>
             <!-- Conflict:
         json-smart (pulls in 9.3),
         cglib (pulls in 7.1)


Reply via email to