This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git
The following commit(s) were added to refs/heads/master by this push:
new e8c86d7 Collector: Fixed task not being able to be dropped when the
task is stopped & Modify PipeParameters & Supports registration of plugins and
reflection usage & Add SQLite support for persistence of plugin and task meta.
(#57)
e8c86d7 is described below
commit e8c86d719706d7a287f0b3f4e8bf3d6ecbea25d1
Author: 0xB <[email protected]>
AuthorDate: Mon Apr 7 10:11:49 2025 +0800
Collector: Fixed task not being able to be dropped when the task is stopped
& Modify PipeParameters & Supports registration of plugins and reflection usage
& Add SQLite support for persistence of plugin and task meta. (#57)
---
iotdb-collector/collector-core/pom.xml | 4 +
.../org/apache/iotdb/collector/Application.java | 2 +
.../api/v1/plugin/impl/PluginApiServiceImpl.java | 42 +++--
.../PluginApiServiceRequestValidationHandler.java | 19 +-
.../api/v1/task/impl/TaskApiServiceImpl.java | 5 +-
.../org/apache/iotdb/collector/config/Options.java | 1 +
.../PluginRuntimeOptions.java} | 23 ++-
.../iotdb/collector/persistence/DBConstant.java | 49 +++++
.../Persistence.java} | 27 ++-
.../collector/persistence/PluginPersistence.java | 152 +++++++++++++++
.../collector/persistence/TaskPersistence.java | 206 +++++++++++++++++++++
.../api/customizer/CollectorParameters.java} | 18 +-
.../plugin/builtin/source/HttpPullSource.java | 2 +-
.../plugin/builtin/source/HttpPushSource.java | 2 +-
.../collector/runtime/plugin/PluginRuntime.java | 161 ++++++++++++++--
.../plugin/constructor/PluginConstructor.java | 15 +-
.../plugin/constructor/SinkConstructor.java | 2 +-
.../plugin/constructor/SourceConstructor.java | 2 +-
.../runtime/plugin/load/PluginClassLoader.java | 65 +++++++
.../plugin/load/PluginClassLoaderManager.java | 55 ++++++
.../runtime/plugin/meta/PluginMetaKeeper.java | 55 ++++--
.../runtime/plugin/utils/PluginFileUtils.java | 94 ++++++++++
.../apache/iotdb/collector/runtime/task/Task.java | 35 +---
.../task/TaskDispatch.java} | 35 +++-
.../iotdb/collector/runtime/task/TaskRuntime.java | 43 ++++-
.../task/TaskStateEnum.java} | 16 +-
.../runtime/task/processor/ProcessorConsumer.java | 9 +
.../runtime/task/processor/ProcessorTask.java | 7 +-
.../collector/runtime/task/sink/SinkConsumer.java | 9 +
.../collector/runtime/task/sink/SinkTask.java | 7 +-
.../collector/runtime/task/source/SourceTask.java | 13 +-
.../runtime/task/source/pull/PullSourceTask.java | 14 +-
.../runtime/task/source/push/PushSourceTask.java | 44 ++++-
.../collector/service/PersistenceService.java | 86 +++++++++
.../src/main/resources/application.properties | 14 ++
.../src/main/openapi3/v1/plugin.yaml | 82 +-------
pom.xml | 6 +
37 files changed, 1230 insertions(+), 191 deletions(-)
diff --git a/iotdb-collector/collector-core/pom.xml
b/iotdb-collector/collector-core/pom.xml
index 7b68e68..c6517b8 100644
--- a/iotdb-collector/collector-core/pom.xml
+++ b/iotdb-collector/collector-core/pom.xml
@@ -107,6 +107,10 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.xerial</groupId>
+ <artifactId>sqlite-jdbc</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
index 96ecdaf..650caee 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.collector;
import org.apache.iotdb.collector.config.Configuration;
import org.apache.iotdb.collector.service.ApiService;
import org.apache.iotdb.collector.service.IService;
+import org.apache.iotdb.collector.service.PersistenceService;
import org.apache.iotdb.collector.service.RuntimeService;
import org.slf4j.Logger;
@@ -39,6 +40,7 @@ public class Application {
private Application() {
services.add(new RuntimeService());
services.add(new ApiService());
+ services.add(new PersistenceService());
}
public static void main(String[] args) {
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
index b64f017..041dde1 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
@@ -20,11 +20,9 @@
package org.apache.iotdb.collector.api.v1.plugin.impl;
import org.apache.iotdb.collector.api.v1.plugin.PluginApiService;
-import org.apache.iotdb.collector.api.v1.plugin.model.AlterPluginRequest;
import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest;
import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
-import org.apache.iotdb.collector.api.v1.plugin.model.StartPluginRequest;
-import org.apache.iotdb.collector.api.v1.plugin.model.StopPluginRequest;
+import org.apache.iotdb.collector.service.RuntimeService;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
@@ -34,30 +32,34 @@ public class PluginApiServiceImpl extends PluginApiService {
@Override
public Response createPlugin(
final CreatePluginRequest createPluginRequest, final SecurityContext
securityContext) {
- return Response.ok("create plugin").build();
- }
+
PluginApiServiceRequestValidationHandler.validateCreatePluginRequest(createPluginRequest);
- @Override
- public Response alterPlugin(
- final AlterPluginRequest alterPluginRequest, final SecurityContext
securityContext) {
- return Response.ok("alter plugin").build();
+ return RuntimeService.plugin().isPresent()
+ ? RuntimeService.plugin()
+ .get()
+ .createPlugin(
+ createPluginRequest.getPluginName().toUpperCase(),
+ createPluginRequest.getClassName(),
+ createPluginRequest.getJarName(),
+ null,
+ true)
+ : Response.ok("create plugin").build();
}
@Override
- public Response startPlugin(
- final StartPluginRequest startPluginRequest, final SecurityContext
securityContext) {
- return Response.ok("start plugin").build();
- }
+ public Response dropPlugin(
+ final DropPluginRequest dropPluginRequest, final SecurityContext
securityContext) {
+
PluginApiServiceRequestValidationHandler.validateDropPluginRequest(dropPluginRequest);
- @Override
- public Response stopPlugin(
- final StopPluginRequest stopPluginRequest, final SecurityContext
securityContext) {
- return Response.ok("stop plugin").build();
+ return RuntimeService.plugin().isPresent()
+ ?
RuntimeService.plugin().get().dropPlugin(dropPluginRequest.getPluginName().toUpperCase())
+ : Response.ok("drop plugin").build();
}
@Override
- public Response dropPlugin(
- final DropPluginRequest dropPluginRequest, final SecurityContext
securityContext) {
- return Response.ok("drop plugin").build();
+ public Response showPlugin(final SecurityContext securityContext) {
+ return RuntimeService.plugin().isPresent()
+ ? RuntimeService.plugin().get().showPlugin()
+ : Response.ok("show plugin").build();
}
}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
index 36adce3..6f7d983 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
@@ -19,4 +19,21 @@
package org.apache.iotdb.collector.api.v1.plugin.impl;
-public class PluginApiServiceRequestValidationHandler {}
+import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest;
+import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
+
+import java.util.Objects;
+
+public class PluginApiServiceRequestValidationHandler {
+ private PluginApiServiceRequestValidationHandler() {}
+
+ public static void validateCreatePluginRequest(final CreatePluginRequest
createPluginRequest) {
+ Objects.requireNonNull(createPluginRequest.getPluginName(), "plugin name
cannot be null");
+ Objects.requireNonNull(createPluginRequest.getClassName(), "class name
cannot be null");
+ Objects.requireNonNull(createPluginRequest.getJarName(), "jar name cannot
be null");
+ }
+
+ public static void validateDropPluginRequest(final DropPluginRequest
dropPluginRequest) {
+ Objects.requireNonNull(dropPluginRequest.getPluginName(), "plugin name
cannot be null");
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
index 053b618..cd563da 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.collector.api.v1.task.model.CreateTaskRequest;
import org.apache.iotdb.collector.api.v1.task.model.DropTaskRequest;
import org.apache.iotdb.collector.api.v1.task.model.StartTaskRequest;
import org.apache.iotdb.collector.api.v1.task.model.StopTaskRequest;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
import org.apache.iotdb.collector.service.RuntimeService;
import javax.ws.rs.core.Response;
@@ -42,9 +43,11 @@ public class TaskApiServiceImpl extends TaskApiService {
.get()
.createTask(
createTaskRequest.getTaskId(),
+ TaskStateEnum.RUNNING,
createTaskRequest.getSourceAttribute(),
createTaskRequest.getProcessorAttribute(),
- createTaskRequest.getSinkAttribute())
+ createTaskRequest.getSinkAttribute(),
+ true)
: Response.serverError().entity("Task runtime is down").build();
}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
index e067086..16fd814 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
@@ -36,6 +36,7 @@ public class Options {
try {
Class.forName(ApiServiceOptions.class.getName());
Class.forName(TaskRuntimeOptions.class.getName());
+ Class.forName(PluginRuntimeOptions.class.getName());
} catch (final ClassNotFoundException e) {
throw new RuntimeException("Failed to load options", e);
}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java
similarity index 54%
copy from
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
copy to
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java
index 36adce3..92eb04b 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java
@@ -17,6 +17,25 @@
* under the License.
*/
-package org.apache.iotdb.collector.api.v1.plugin.impl;
+package org.apache.iotdb.collector.config;
-public class PluginApiServiceRequestValidationHandler {}
+import java.io.File;
+
+public class PluginRuntimeOptions extends Options {
+ public static final Option<String> PLUGIN_LIB_DIR =
+ new Option<String>("plugin_lib_dir", "ext" + File.separator + "plugin") {
+ @Override
+ public void setValue(final String valueString) {
+ value = valueString;
+ }
+ };
+
+ public static final Option<String> PLUGIN_INSTALL_LIB_DIR =
+ new Option<String>(
+ "plugin_install_lib_dir", PLUGIN_LIB_DIR.value() + File.separator +
"install") {
+ @Override
+ public void setValue(final String valueString) {
+ value = valueString;
+ }
+ };
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java
new file mode 100644
index 0000000..2e8c3bb
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.persistence;
+
+public class DBConstant {
+
+ public static final String CREATE_PLUGIN_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS plugin\n"
+ + "(\n"
+ + " plugin_name TEXT PRIMARY KEY,\n"
+ + " class_name TEXT NOT NULL,\n"
+ + " jar_name TEXT NOT NULL,\n"
+ + " jar_md5 TEXT NOT NULL,\n"
+ + " create_time TEXT NOT NULL\n"
+ + ");";
+ public static final String CREATE_TASK_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS task\n"
+ + "(\n"
+ + " task_id TEXT PRIMARY KEY,\n"
+ + " task_state INT NOT NULL,\n"
+ + " source_attribute BLOB NOT NULL,\n"
+ + " processor_attribute BLOB NOT NULL,\n"
+ + " sink_attribute BLOB NOT NULL,\n"
+ + " create_time TEXT NOT NULL\n"
+ + ");";
+
+ public static final String PLUGIN_DATABASE_FILE_PATH = "ext/db/plugin.db";
+ public static final String TASK_DATABASE_FILE_PATH = "ext/db/task.db";
+
+ public static final String PLUGIN_DATABASE_URL = "jdbc:sqlite:" +
PLUGIN_DATABASE_FILE_PATH;
+ public static final String TASK_DATABASE_URL = "jdbc:sqlite:" +
TASK_DATABASE_FILE_PATH;
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java
similarity index 56%
copy from
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
copy to
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java
index 36adce3..b2abcb8 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java
@@ -17,6 +17,29 @@
* under the License.
*/
-package org.apache.iotdb.collector.api.v1.plugin.impl;
+package org.apache.iotdb.collector.persistence;
-public class PluginApiServiceRequestValidationHandler {}
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public abstract class Persistence {
+
+ private final String databaseUrl;
+
+ public Persistence(final String databaseUrl) {
+ this.databaseUrl = databaseUrl;
+ initDatabaseFileIfPossible();
+ initTableIfPossible();
+ }
+
+ protected abstract void initDatabaseFileIfPossible();
+
+ protected abstract void initTableIfPossible();
+
+ public abstract void tryResume();
+
+ protected Connection getConnection() throws SQLException {
+ return DriverManager.getConnection(databaseUrl);
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java
new file mode 100644
index 0000000..d1d2b20
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.persistence;
+
+import org.apache.iotdb.collector.runtime.plugin.utils.PluginFileUtils;
+import org.apache.iotdb.collector.service.RuntimeService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Objects;
+
+public class PluginPersistence extends Persistence {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PluginPersistence.class);
+
+ public PluginPersistence(String databaseUrl) {
+ super(databaseUrl);
+ }
+
+ @Override
+ protected void initDatabaseFileIfPossible() {
+ try {
+ final Path pluginDatabaseFilePath =
Paths.get(DBConstant.PLUGIN_DATABASE_FILE_PATH);
+ if (!Files.exists(pluginDatabaseFilePath)) {
+ Files.createFile(pluginDatabaseFilePath);
+ }
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to create plugin database file", e);
+ }
+ }
+
+ @Override
+ protected void initTableIfPossible() {
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement =
+ connection.prepareStatement(DBConstant.CREATE_PLUGIN_TABLE_SQL);
+ statement.executeUpdate();
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to create plugin database", e);
+ }
+ }
+
+ @Override
+ public void tryResume() {
+ final String queryAllPluginSQL =
+ "SELECT plugin_name, class_name, jar_name, jar_md5 FROM plugin";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement =
connection.prepareStatement(queryAllPluginSQL);
+ final ResultSet pluginResultSet = statement.executeQuery();
+
+ while (pluginResultSet.next()) {
+ final String pluginName = pluginResultSet.getString("plugin_name");
+ final String className = pluginResultSet.getString("class_name");
+ final String jarName = pluginResultSet.getString("jar_name");
+ final String jarMd5 = pluginResultSet.getString("jar_md5");
+
+ if (!isPluginJarFileWithMD5NameExists(pluginName, jarName, jarMd5)) {
+ tryDeletePlugin(pluginName);
+ continue;
+ }
+
+ tryRecoverPlugin(pluginName, className, jarName, jarMd5);
+ }
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to resume plugin persistence message, because {}",
e.getMessage());
+ }
+ }
+
+ private boolean isPluginJarFileWithMD5NameExists(
+ final String pluginName, final String jarName, final String jarMd5) {
+ final Path pluginJarFileWithMD5Path =
+ Paths.get(
+ PluginFileUtils.getPluginJarFileWithMD5FilePath(
+ pluginName,
PluginFileUtils.getPluginJarFileNameWithMD5(jarName, jarMd5)));
+
+ return Files.exists(pluginJarFileWithMD5Path);
+ }
+
+ private void tryRecoverPlugin(
+ final String pluginName, final String className, final String jarName,
final String jarMD5) {
+ final Response response =
+ RuntimeService.plugin().isPresent()
+ ? RuntimeService.plugin()
+ .get()
+ .createPlugin(pluginName, className, jarName, jarMD5, false)
+ : null;
+
+ if (Objects.isNull(response) || response.getStatus() !=
Response.Status.OK.getStatusCode()) {
+ LOGGER.warn("Failed to recover plugin message from plugin {}: {}",
pluginName, response);
+ }
+ }
+
+ public void tryPersistencePlugin(
+ final String pluginName, final String className, final String jarName,
final String jarMD5) {
+ final String sql =
+ "INSERT INTO plugin(plugin_name, class_name, jar_name, jar_md5,
create_time) VALUES(?,?,?,?,?)";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement = connection.prepareStatement(sql);
+ statement.setString(1, pluginName);
+ statement.setString(2, className);
+ statement.setString(3, jarName);
+ statement.setString(4, jarMD5);
+ statement.setString(5, String.valueOf(new
Timestamp(System.currentTimeMillis())));
+ statement.executeUpdate();
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to persistence plugin message, because {}",
e.getMessage());
+ }
+ }
+
+ public void tryDeletePlugin(final String pluginName) {
+ final String sql = "DELETE FROM plugin WHERE plugin_name = ?";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement = connection.prepareStatement(sql);
+ statement.setString(1, pluginName);
+ statement.executeUpdate();
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to delete plugin persistence message, because {}",
e.getMessage());
+ }
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java
new file mode 100644
index 0000000..bf62519
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.persistence;
+
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
+import org.apache.iotdb.collector.service.RuntimeService;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class TaskPersistence extends Persistence {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TaskPersistence.class);
+
+ public TaskPersistence(String databaseUrl) {
+ super(databaseUrl);
+ }
+
+ @Override
+ protected void initDatabaseFileIfPossible() {
+ try {
+ final Path taskDatabaseFilePath =
Paths.get(DBConstant.TASK_DATABASE_FILE_PATH);
+ if (!Files.exists(taskDatabaseFilePath)) {
+ Files.createFile(taskDatabaseFilePath);
+ }
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to create task database file", e);
+ }
+ }
+
+ @Override
+ protected void initTableIfPossible() {
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement =
+ connection.prepareStatement(DBConstant.CREATE_TASK_TABLE_SQL);
+ statement.executeUpdate();
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to create task database", e);
+ }
+ }
+
+ @Override
+ public void tryResume() {
+ final String queryAllTaskSQL =
+ "SELECT task_id, task_state, source_attribute, processor_attribute,
sink_attribute, create_time FROM task";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement =
connection.prepareStatement(queryAllTaskSQL);
+ final ResultSet taskResultSet = statement.executeQuery();
+
+ while (taskResultSet.next()) {
+ final String taskId = taskResultSet.getString(1);
+ final TaskStateEnum taskState =
TaskStateEnum.values()[taskResultSet.getInt(2)];
+ final byte[] sourceAttribute = taskResultSet.getBytes(3);
+ final byte[] processorAttribute = taskResultSet.getBytes(4);
+ final byte[] sinkAttribute = taskResultSet.getBytes(5);
+
+ tryRecoverTask(
+ taskId,
+ taskState,
+ deserialize(sourceAttribute),
+ deserialize(processorAttribute),
+ deserialize(sinkAttribute));
+ }
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to resume task persistence message, because {}",
e.getMessage());
+ }
+ }
+
+ public void tryRecoverTask(
+ final String taskId,
+ final TaskStateEnum taskState,
+ final Map<String, String> sourceAttribute,
+ final Map<String, String> processorAttribute,
+ final Map<String, String> sinkAttribute) {
+ final Response response =
+ RuntimeService.task().isPresent()
+ ? RuntimeService.task()
+ .get()
+ .createTask(
+ taskId, taskState, sourceAttribute, processorAttribute,
sinkAttribute, false)
+ : null;
+
+ if (Objects.isNull(response) || response.getStatus() !=
Response.Status.OK.getStatusCode()) {
+ LOGGER.warn("Failed to recover task persistence message, because {}",
response);
+ }
+ }
+
+ private Map<String, String> deserialize(final byte[] buffer) {
+ final Map<String, String> attribute = new HashMap<>();
+ final ByteBuffer attributeBuffer = ByteBuffer.wrap(buffer);
+
+ final int size = ReadWriteIOUtils.readInt(attributeBuffer);
+ for (int i = 0; i < size; i++) {
+ final String key = ReadWriteIOUtils.readString(attributeBuffer);
+ final String value = ReadWriteIOUtils.readString(attributeBuffer);
+
+ attribute.put(key, value);
+ }
+
+ return attribute;
+ }
+
+ public void tryPersistenceTask(
+ final String taskId,
+ final TaskStateEnum taskState,
+ final Map<String, String> sourceAttribute,
+ final Map<String, String> processorAttribute,
+ final Map<String, String> sinkAttribute) {
+ final String insertSQL =
+ "INSERT INTO task(task_id, task_state , source_attribute,
processor_attribute, sink_attribute, create_time) values(?, ?, ?, ?, ?, ?)";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement =
connection.prepareStatement(insertSQL);
+
+ final byte[] sourceAttributeBuffer = serialize(sourceAttribute);
+ final byte[] processorAttributeBuffer = serialize(processorAttribute);
+ final byte[] sinkAttributeBuffer = serialize(sinkAttribute);
+
+ statement.setString(1, taskId);
+ statement.setInt(2, taskState.getTaskState());
+ statement.setBytes(3, sourceAttributeBuffer);
+ statement.setBytes(4, processorAttributeBuffer);
+ statement.setBytes(5, sinkAttributeBuffer);
+ statement.setString(6, String.valueOf(new
Timestamp(System.currentTimeMillis())));
+ statement.executeUpdate();
+ } catch (final SQLException | IOException e) {
+ LOGGER.warn("Failed to persistence task message, because {}",
e.getMessage());
+ }
+ }
+
+ private byte[] serialize(final Map<String, String> attribute) throws
IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(attribute.size(), outputStream);
+ for (final Map.Entry<String, String> entry : attribute.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size())
+ .array();
+ }
+ }
+
+ public void tryDeleteTask(final String taskId) {
+ final String deleteSQL = "DELETE FROM task WHERE task_id = ?";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement =
connection.prepareStatement(deleteSQL);
+ statement.setString(1, taskId);
+ statement.executeUpdate();
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to delete task persistence message, because {}",
e.getMessage());
+ }
+ }
+
+ public void tryAlterTaskState(final String taskId, final TaskStateEnum
taskState) {
+ final String alterSQL = "UPDATE task SET task_state = ? WHERE task_id = ?";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement =
connection.prepareStatement(alterSQL);
+ statement.setInt(1, taskState.getTaskState());
+ statement.setString(2, taskId);
+ statement.executeUpdate();
+ } catch (SQLException e) {
+ LOGGER.warn("Failed to alter task persistence message, because {}",
e.getMessage());
+ }
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java
similarity index 62%
copy from
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
copy to
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java
index 36adce3..32ea664 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java
@@ -17,6 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.collector.api.v1.plugin.impl;
+package org.apache.iotdb.collector.plugin.api.customizer;
-public class PluginApiServiceRequestValidationHandler {}
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import java.util.Map;
+
+public class CollectorParameters extends PipeParameters {
+ public CollectorParameters(final Map<String, String> attributes) {
+ super(attributes);
+ this.attributes.forEach(
+ (key, value) -> {
+ if (!"taskId".equals(key)) {
+ attributes.put(key, value.replace("_", "-"));
+ }
+ });
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
index c8bfd93..afb8d15 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
@@ -56,7 +56,7 @@ public class HttpPullSource extends PullSource {
@Override
public Event supply() {
- LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(100));
+ LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
final Event event = new DemoEvent(String.valueOf(new
Random().nextInt(1000)));
LOGGER.info("{} created successfully ...", event);
return event;
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
index 233335c..fa81fc2 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
@@ -66,7 +66,7 @@ public class HttpPushSource extends PushSource {
final Event event = new DemoEvent(String.valueOf(new
Random().nextInt(1000)));
LOGGER.info("{} created successfully ...", event);
supply(event);
- TimeUnit.SECONDS.sleep(100);
+ TimeUnit.SECONDS.sleep(2);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
index adbad7d..627ace6 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
@@ -22,24 +22,46 @@ package org.apache.iotdb.collector.runtime.plugin;
import
org.apache.iotdb.collector.runtime.plugin.constructor.ProcessorConstructor;
import org.apache.iotdb.collector.runtime.plugin.constructor.SinkConstructor;
import org.apache.iotdb.collector.runtime.plugin.constructor.SourceConstructor;
+import org.apache.iotdb.collector.runtime.plugin.load.PluginClassLoader;
+import org.apache.iotdb.collector.runtime.plugin.load.PluginClassLoaderManager;
+import org.apache.iotdb.collector.runtime.plugin.meta.PluginMeta;
import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper;
+import org.apache.iotdb.collector.runtime.plugin.utils.PluginFileUtils;
+import org.apache.iotdb.collector.service.PersistenceService;
+import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.PipeSink;
import org.apache.iotdb.pipe.api.PipeSource;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Objects;
+
public class PluginRuntime implements AutoCloseable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PluginRuntime.class);
+
private final PluginMetaKeeper metaKeeper;
private final SourceConstructor sourceConstructor;
private final ProcessorConstructor processorConstructor;
private final SinkConstructor sinkConstructor;
+ private final PluginClassLoaderManager classLoaderManager;
public PluginRuntime() {
this.metaKeeper = new PluginMetaKeeper();
this.sourceConstructor = new SourceConstructor(metaKeeper);
this.processorConstructor = new ProcessorConstructor(metaKeeper);
this.sinkConstructor = new SinkConstructor(metaKeeper);
+ this.classLoaderManager = new PluginClassLoaderManager();
}
public PipeSource constructSource(final PipeParameters sourceParameters) {
@@ -66,24 +88,139 @@ public class PluginRuntime implements AutoCloseable {
return sinkConstructor.reflectPlugin(sinkParameters);
}
- public boolean createPlugin() {
- return true;
+ public PluginClassLoader getClassLoader(final String pluginName) throws
IOException {
+ return classLoaderManager.getPluginClassLoader(pluginName);
}
- public boolean alterPlugin() {
- return true;
- }
-
- public boolean startPlugin() {
- return true;
+ public synchronized Response createPlugin(
+ final String pluginName,
+ final String className,
+ final String jarName,
+ final String jarMD5FromDB,
+ final boolean isRestRequest) {
+ try {
+ // validate whether the plugin jar file exists
+ if (isRestRequest && !PluginFileUtils.isPluginJarFileExist(jarName)) {
+ final String errorMessage =
+ String.format(
+ "Failed to register Plugin %s, because the plugin jar file %s
is not found",
+ pluginName, jarName);
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
+
+ // validate whether the plugin has been loaded
+ final PluginMeta information = metaKeeper.getPipePluginMeta(pluginName);
+ if (Objects.nonNull(information)) {
+ // validate whether the plugin is builtin plugin
+ if (information.isBuiltin()) {
+ final String errorMessage =
+ String.format(
+ "Failed to register Plugin %s, because the given Plugin name
is the same as a built-in Plugin name.",
+ pluginName);
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
+
+ // otherwise the plugin has been registered
+ final String errorMessage =
+ String.format(
+ "Failed to register Plugin %s, because the Plugin has been
registered.",
+ pluginName);
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
+
+ // get the plugin jar md5
+ final String jarMD5 =
+ jarMD5FromDB == null
+ ? DigestUtils.md5Hex(
+
Files.newInputStream(Paths.get(PluginFileUtils.getPluginJarFilePath(jarName))))
+ : jarMD5FromDB;
+
+ // If the {pluginName} directory already exists, delete the directory
and the files under it,
+ // recreate the directory, and move the files to the new directory. If
an exception occurs in
+ // the middle, delete the created directory.
+ final String pluginJarFileNameWithMD5 =
+ PluginFileUtils.getPluginJarFileNameWithMD5(jarName, jarMD5);
+ PluginFileUtils.savePluginToInstallDir(pluginName, jarName,
pluginJarFileNameWithMD5);
+
+ // create and save plugin class loader
+ final PluginClassLoader classLoader =
+ classLoaderManager.createPluginClassLoader(
+ PluginFileUtils.getPluginInstallDirPath(pluginName));
+
+ final Class<?> pluginClass = Class.forName(className, true, classLoader);
+ @SuppressWarnings("unused") // ensure that it is a PipePlugin class
+ final PipePlugin ignored = (PipePlugin)
pluginClass.getDeclaredConstructor().newInstance();
+
+ classLoaderManager.addPluginClassLoader(pluginName, classLoader);
+ metaKeeper.addPipePluginMeta(
+ pluginName, new PluginMeta(pluginName, className, false, jarName,
jarMD5));
+ metaKeeper.addJarNameAndMd5(jarName, jarMD5);
+
+ // storage registered plugin info
+ if (isRestRequest) {
+ PersistenceService.plugin()
+ .ifPresent(
+ pluginPersistence ->
+ pluginPersistence.tryPersistencePlugin(pluginName,
className, jarName, jarMD5));
+ }
+
+ final String successMessage = String.format("Successfully register
Plugin %s", pluginName);
+ LOGGER.info(successMessage);
+ return Response.ok().entity(successMessage).build();
+ } catch (final Exception e) {
+ final String errorMessage =
+ String.format("Failed to register Plugin %s, because %s",
pluginName, e);
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
}
- public boolean stopPlugin() {
- return true;
+ public synchronized Response dropPlugin(final String pluginName) {
+ try {
+ final PluginMeta information = metaKeeper.getPipePluginMeta(pluginName);
+ if (Objects.nonNull(information) && information.isBuiltin()) {
+ final String errorMessage =
+ String.format("Failed to deregister builtin Plugin %s.",
pluginName);
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
+
+ // if it is needed to delete jar file of the plugin, delete both jar
file and md5
+ final String installedFileName =
+ FilenameUtils.getBaseName(information.getJarName())
+ + "-"
+ + information.getJarMD5()
+ + "."
+ + FilenameUtils.getExtension(information.getJarName());
+
PluginFileUtils.removePluginFileUnderLibRoot(information.getPluginName(),
installedFileName);
+
+ // remove anyway
+ metaKeeper.removeJarNameAndMd5IfPossible(pluginName);
+ metaKeeper.removePipePluginMeta(pluginName);
+ classLoaderManager.removePluginClassLoader(pluginName);
+
+ // remove plugin info from sqlite
+ PersistenceService.plugin()
+ .ifPresent(pluginPersistence ->
pluginPersistence.tryDeletePlugin(pluginName));
+
+ final String successMessage = String.format("Successfully deregister
Plugin %s", pluginName);
+ LOGGER.info(successMessage);
+ return Response.ok().entity(successMessage).build();
+ } catch (final IOException e) {
+ final String errorMessage =
+ String.format(
+ "Failed to deregister builtin Plugin %s, because %s",
pluginName, e.getMessage());
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
}
- public boolean dropPlugin() {
- return true;
+ public Response showPlugin() {
+ final Iterable<PluginMeta> pluginMetas = metaKeeper.getAllPipePluginMeta();
+ return Response.ok().entity(pluginMetas).build();
}
@Override
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
index c64ef39..72bccb9 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.collector.runtime.plugin.constructor;
import org.apache.iotdb.collector.runtime.plugin.meta.PluginMeta;
import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper;
+import org.apache.iotdb.collector.service.RuntimeService;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -28,7 +29,6 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
@@ -81,13 +81,14 @@ public abstract class PluginConstructor {
final Class<?> pluginClass =
information.isBuiltin()
?
pluginMetaKeeper.getBuiltinPluginClass(information.getPluginName())
- : Class.forName(information.getClassName()); // TODO
+ : Class.forName(
+ information.getClassName(),
+ true,
+ RuntimeService.plugin().isPresent()
+ ?
RuntimeService.plugin().get().getClassLoader(pluginName)
+ : null);
return (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
- } catch (InstantiationException
- | InvocationTargetException
- | NoSuchMethodException
- | IllegalAccessException
- | ClassNotFoundException e) {
+ } catch (final Exception e) {
String errorMessage =
String.format(
"Failed to reflect PipePlugin %s(%s) instance, because %s",
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
index 066cf08..f5dd6be 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
@@ -38,7 +38,7 @@ public class SinkConstructor extends PluginConstructor {
@Override
public final PipeSink reflectPlugin(PipeParameters sinkParameters) {
- if (sinkParameters.hasAttribute("sink")) {
+ if (!sinkParameters.hasAttribute("sink")) {
throw new IllegalArgumentException("sink attribute is required");
}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
index f4c2cd0..2899862 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
@@ -42,7 +42,7 @@ public class SourceConstructor extends PluginConstructor {
@Override
public final PipeSource reflectPlugin(PipeParameters sourceParameters) {
- if (sourceParameters.hasAttribute("source")) {
+ if (!sourceParameters.hasAttribute("source")) {
throw new IllegalArgumentException("source attribute is required");
}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java
new file mode 100644
index 0000000..7e93a1b
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.plugin.load;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@ThreadSafe
+public class PluginClassLoader extends URLClassLoader {
+
+ private final String libRoot;
+ private volatile boolean deprecated;
+
+ public PluginClassLoader(final String libRoot) throws IOException {
+ super(new URL[0]);
+ this.libRoot = libRoot;
+ this.deprecated = false;
+ addUrls();
+ }
+
+ private void addUrls() throws IOException {
+ try (final Stream<Path> pathStream = Files.walk(new
File(libRoot).toPath())) {
+ for (final Path path :
+ pathStream.filter(path ->
!path.toFile().isDirectory()).collect(Collectors.toList())) {
+ super.addURL(path.toUri().toURL());
+ }
+ }
+ }
+
+ public synchronized void markAsDeprecated() throws IOException {
+ deprecated = true;
+ closeIfPossible();
+ }
+
+ public void closeIfPossible() throws IOException {
+ if (deprecated) {
+ close();
+ }
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java
new file mode 100644
index 0000000..c60a868
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.plugin.load;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@NotThreadSafe
+public class PluginClassLoaderManager {
+
+ private final Map<String, PluginClassLoader> pluginNameToClassLoaderMap;
+
+ public PluginClassLoaderManager() {
+ this.pluginNameToClassLoaderMap = new ConcurrentHashMap<>();
+ }
+
+ public void removePluginClassLoader(final String pluginName) throws
IOException {
+ final PluginClassLoader classLoader =
pluginNameToClassLoaderMap.remove(pluginName);
+ if (classLoader != null) {
+ classLoader.markAsDeprecated();
+ }
+ }
+
+ public PluginClassLoader getPluginClassLoader(final String pluginName)
throws IOException {
+ return pluginNameToClassLoaderMap.get(pluginName.toUpperCase());
+ }
+
+ public void addPluginClassLoader(final String pluginName, final
PluginClassLoader classLoader) {
+ pluginNameToClassLoaderMap.put(pluginName.toUpperCase(), classLoader);
+ }
+
+ public PluginClassLoader createPluginClassLoader(final String pluginDirPath)
throws IOException {
+ return new PluginClassLoader(pluginDirPath);
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
index 4d9ddfd..b6b1f90 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
@@ -19,30 +19,32 @@
package org.apache.iotdb.collector.runtime.plugin.meta;
+import org.apache.iotdb.collector.plugin.builtin.BuiltinPlugin;
+
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
public class PluginMetaKeeper {
- protected final Map<String, PluginMeta> pipePluginNameToMetaMap = new
ConcurrentHashMap<>();
- protected final Map<String, Class<?>> builtinPipePluginNameToClassMap = new
ConcurrentHashMap<>();
+ private final Map<String, PluginMeta> pipePluginNameToMetaMap = new
ConcurrentHashMap<>();
+ private final Map<String, Class<?>> builtinPipePluginNameToClassMap = new
ConcurrentHashMap<>();
+ private final Map<String, String> jarNameToMd5Map = new
ConcurrentHashMap<>();
+ private final Map<String, Integer> jarNameToReferenceCountMap = new
ConcurrentHashMap<>();
public PluginMetaKeeper() {
loadBuiltinPlugins();
}
private void loadBuiltinPlugins() {
- // for (final BuiltinPipePlugin builtinPipePlugin :
BuiltinPipePlugin.values()) {
- // final String pipePluginName =
builtinPipePlugin.getPipePluginName();
- // final Class<?> pipePluginClass =
builtinPipePlugin.getPipePluginClass();
- // final String className = builtinPipePlugin.getClassName();
- //
- // addPipePluginMeta(pipePluginName, new PluginMeta(pipePluginName,
className));
- // addBuiltinPluginClass(pipePluginName, pipePluginClass);
- // addPipePluginVisibility(
- // pipePluginName,
VisibilityUtils.calculateFromPluginClass(pipePluginClass));
- // }
+ for (final BuiltinPlugin builtinPlugin : BuiltinPlugin.values()) {
+ final String pluginName = builtinPlugin.getPluginName();
+ final Class<?> pluginClass = builtinPlugin.getPluginClass();
+ final String className = builtinPlugin.getClassName();
+
+ addPipePluginMeta(pluginName, new PluginMeta(pluginName, className));
+ addBuiltinPluginClass(pluginName, pluginClass);
+ }
}
public void addPipePluginMeta(String pluginName, PluginMeta pluginMeta) {
@@ -82,6 +84,35 @@ public class PluginMetaKeeper {
return null;
}
+ public boolean containsJar(final String jarName) {
+ return jarNameToMd5Map.containsKey(jarName);
+ }
+
+ public boolean jarNameExistsAndMatchesMd5(final String jarName, final String
md5) {
+ return jarNameToMd5Map.containsKey(jarName) &&
jarNameToMd5Map.get(jarName).equals(md5);
+ }
+
+ public void addJarNameAndMd5(final String jarName, final String md5) {
+ if (jarNameToReferenceCountMap.containsKey(jarName)) {
+ jarNameToReferenceCountMap.put(jarName,
jarNameToReferenceCountMap.get(jarName) + 1);
+ } else {
+ jarNameToReferenceCountMap.put(jarName, 1);
+ jarNameToMd5Map.put(jarName, md5);
+ }
+ }
+
+ public void removeJarNameAndMd5IfPossible(final String jarName) {
+ if (jarNameToReferenceCountMap.containsKey(jarName)) {
+ final int count = jarNameToReferenceCountMap.get(jarName);
+ if (count == 1) {
+ jarNameToReferenceCountMap.remove(jarName);
+ jarNameToMd5Map.remove(jarName);
+ } else {
+ jarNameToReferenceCountMap.put(jarName, count - 1);
+ }
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java
new file mode 100644
index 0000000..ebc693d
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.plugin.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
+import static
org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_INSTALL_LIB_DIR;
+import static
org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_LIB_DIR;
+
+public class PluginFileUtils {
+
+ public static void savePluginToInstallDir(
+ final String pluginName, final String jarName, final String
jarNameWithMD5)
+ throws IOException {
+ final Path pluginInstallPath =
+ Paths.get(PLUGIN_INSTALL_LIB_DIR.value() + File.separator +
pluginName);
+ final Path pluginJarInstallPath =
+ Paths.get(getPluginJarFileWithMD5FilePath(pluginName, jarNameWithMD5));
+
+ if (!Files.exists(pluginInstallPath)) {
+ FileUtils.forceMkdir(pluginInstallPath.toFile());
+ }
+ if (Files.exists(pluginJarInstallPath)) {
+ return;
+ }
+
+ FileUtils.moveFile(
+ new File(getPluginJarFilePath(jarName)),
+ pluginJarInstallPath.toFile(),
+ StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ public static boolean isPluginJarFileExist(final String jarName) {
+ return Files.exists(Paths.get(getPluginJarFilePath(jarName)));
+ }
+
+ public static String getPluginJarFilePath(final String jarName) {
+ return PLUGIN_LIB_DIR.value() + File.separator + jarName;
+ }
+
+ public static String getPluginJarFileWithMD5FilePath(
+ final String pluginName, final String jarNameWithMD5) {
+ return getPluginInstallDirPath(pluginName) + File.separator +
jarNameWithMD5;
+ }
+
+ public static String getPluginInstallDirPath(final String pluginName) {
+ return PLUGIN_INSTALL_LIB_DIR.value() + File.separator + pluginName;
+ }
+
+ public static String getPluginJarFileNameWithMD5(final String jarName, final
String jarMD5) {
+ return FilenameUtils.getBaseName(jarName)
+ + "-"
+ + jarMD5
+ + "."
+ + FilenameUtils.getExtension(jarName);
+ }
+
+ public static void removePluginFileUnderLibRoot(final String pluginName,
final String fileName)
+ throws IOException {
+ final Path pluginDirPath = Paths.get(getPluginInstallFilePath(pluginName,
fileName));
+
+ Files.deleteIfExists(pluginDirPath);
+ Files.deleteIfExists(pluginDirPath.getParent());
+ }
+
+ public static String getPluginInstallFilePath(final String pluginName, final
String fileName) {
+ return PLUGIN_INSTALL_LIB_DIR.value() + File.separator + pluginName +
File.separator + fileName;
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
index ac270ce..2dbcc71 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
@@ -19,22 +19,18 @@
package org.apache.iotdb.collector.runtime.task;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.collector.plugin.api.customizer.CollectorParameters;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.LockSupport;
public abstract class Task {
protected final String taskId;
- protected final PipeParameters parameters;
+ protected final CollectorParameters parameters;
protected final int parallelism;
- private static final long CHECK_RUNNING_INTERVAL_NANOS = 100_000_000L;
- protected final AtomicBoolean isRunning = new AtomicBoolean(false);
- protected final AtomicBoolean isDropped = new AtomicBoolean(false);
+ protected final TaskDispatch dispatch;
protected Task(
final String taskId,
@@ -42,49 +38,36 @@ public abstract class Task {
final String parallelismKey,
final int parallelismValue) {
this.taskId = taskId;
- this.parameters = new PipeParameters(attributes);
+ this.parameters = new CollectorParameters(attributes);
this.parallelism = parameters.getIntOrDefault(parallelismKey,
parallelismValue);
- }
-
- public void resume() {
- isRunning.set(true);
- }
-
- public void pause() {
- isRunning.set(false);
- }
- protected void waitUntilRunningOrDropped() {
- while (!isRunning.get() && !isDropped.get()) {
- LockSupport.parkNanos(CHECK_RUNNING_INTERVAL_NANOS);
- }
+ this.dispatch = new TaskDispatch();
}
public final synchronized void create() throws Exception {
- resume();
+ dispatch.resume();
createInternal();
}
public abstract void createInternal() throws Exception;
public final synchronized void start() throws Exception {
- resume();
+ dispatch.resume();
startInternal();
}
public abstract void startInternal() throws Exception;
public final synchronized void stop() throws Exception {
- pause();
+ dispatch.pause();
stopInternal();
}
public abstract void stopInternal() throws Exception;
public final synchronized void drop() throws Exception {
- pause();
- isDropped.set(true);
+ dispatch.remove();
dropInternal();
}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java
similarity index 50%
copy from
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
copy to
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java
index 36adce3..9a904c0 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java
@@ -17,6 +17,37 @@
* under the License.
*/
-package org.apache.iotdb.collector.api.v1.plugin.impl;
+package org.apache.iotdb.collector.runtime.task;
-public class PluginApiServiceRequestValidationHandler {}
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+public class TaskDispatch {
+
+ private static final long CHECK_RUNNING_INTERVAL_NANOS = 100_000_000L;
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
+ private final AtomicBoolean isDropped = new AtomicBoolean(false);
+
+ public void resume() {
+ isRunning.set(true);
+ }
+
+ public void pause() {
+ isRunning.set(false);
+ }
+
+ public void remove() {
+ pause();
+ isDropped.set(true);
+ }
+
+ public boolean isRunning() {
+ return isRunning.get();
+ }
+
+ public void waitUntilRunningOrDropped() {
+ while (!isRunning.get() && !isDropped.get()) {
+ LockSupport.parkNanos(CHECK_RUNNING_INTERVAL_NANOS);
+ }
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
index ec3a4f1..f96a40f 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.collector.runtime.task;
import org.apache.iotdb.collector.runtime.task.processor.ProcessorTask;
import org.apache.iotdb.collector.runtime.task.sink.SinkTask;
import org.apache.iotdb.collector.runtime.task.source.SourceTask;
+import org.apache.iotdb.collector.service.PersistenceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
public class TaskRuntime implements AutoCloseable {
@@ -39,9 +41,11 @@ public class TaskRuntime implements AutoCloseable {
public synchronized Response createTask(
final String taskId,
+ final TaskStateEnum taskState,
final Map<String, String> sourceAttribute,
final Map<String, String> processorAttribute,
- final Map<String, String> sinkAttribute) {
+ final Map<String, String> sinkAttribute,
+ final boolean isRestRequest) {
try {
if (tasks.containsKey(taskId)) {
return Response.status(Response.Status.CONFLICT)
@@ -53,14 +57,28 @@ public class TaskRuntime implements AutoCloseable {
final ProcessorTask processorTask =
new ProcessorTask(taskId, processorAttribute,
sinkTask.makeProducer());
final SourceTask sourceTask =
- SourceTask.construct(taskId, sourceAttribute,
processorTask.makeProducer());
+ SourceTask.construct(taskId, sourceAttribute,
processorTask.makeProducer(), taskState);
final TaskCombiner taskCombiner = new TaskCombiner(sourceTask,
processorTask, sinkTask);
- tasks.put(taskId, taskCombiner);
taskCombiner.create();
+ tasks.put(taskId, taskCombiner);
+
+ // storage task info to sqlite
+ if (isRestRequest) {
+ PersistenceService.task()
+ .ifPresent(
+ taskPersistence ->
+ taskPersistence.tryPersistenceTask(
+ taskId,
+ TaskStateEnum.RUNNING,
+ sourceAttribute,
+ processorAttribute,
+ sinkAttribute));
+ }
+
LOGGER.info("Successfully created task {}", taskId);
- return Response.status(Response.Status.CREATED)
+ return Response.status(Response.Status.OK)
.entity(String.format("Successfully created task %s", taskId))
.build();
} catch (final Exception e) {
@@ -81,6 +99,10 @@ public class TaskRuntime implements AutoCloseable {
try {
tasks.get(taskId).start();
+ PersistenceService.task()
+ .ifPresent(
+ taskPersistence -> taskPersistence.tryAlterTaskState(taskId,
TaskStateEnum.RUNNING));
+
LOGGER.info("Task {} start successfully", taskId);
return Response.status(Response.Status.OK)
.entity(String.format("task %s start successfully", taskId))
@@ -103,6 +125,10 @@ public class TaskRuntime implements AutoCloseable {
try {
tasks.get(taskId).stop();
+ PersistenceService.task()
+ .ifPresent(
+ taskPersistence -> taskPersistence.tryAlterTaskState(taskId,
TaskStateEnum.STOPPED));
+
LOGGER.info("Task {} stop successfully", taskId);
return Response.status(Response.Status.OK)
.entity(String.format("task %s stop successfully", taskId))
@@ -116,14 +142,19 @@ public class TaskRuntime implements AutoCloseable {
}
public synchronized Response dropTask(final String taskId) {
- if (!tasks.containsKey(taskId)) {
+ if (Objects.isNull(tasks.get(taskId)) || !tasks.containsKey(taskId)) {
return Response.status(Response.Status.NOT_FOUND)
.entity(String.format("task %s not found", taskId))
.build();
}
try {
- tasks.remove(taskId).drop();
+ final TaskCombiner task = tasks.get(taskId);
+ task.drop();
+ tasks.remove(taskId);
+
+ // remove task info from sqlite
+ PersistenceService.task().ifPresent(taskPersistence ->
taskPersistence.tryDeleteTask(taskId));
LOGGER.info("Task {} drop successfully", taskId);
return Response.status(Response.Status.OK)
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java
similarity index 74%
copy from
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
copy to
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java
index 36adce3..159016d 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java
@@ -16,7 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.collector.runtime.task;
-package org.apache.iotdb.collector.api.v1.plugin.impl;
+public enum TaskStateEnum {
+ RUNNING(0),
+ STOPPED(1);
-public class PluginApiServiceRequestValidationHandler {}
+ private final int taskState;
+
+ TaskStateEnum(final int taskState) {
+ this.taskState = taskState;
+ }
+
+ public int getTaskState() {
+ return taskState;
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
index c153506..9990871 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.collector.runtime.task.processor;
+import org.apache.iotdb.collector.runtime.task.TaskDispatch;
import org.apache.iotdb.collector.runtime.task.event.EventContainer;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -33,6 +34,8 @@ class ProcessorConsumer implements
WorkHandler<EventContainer> {
private final PipeProcessor processor;
private final EventCollector eventCollector;
+ private TaskDispatch dispatch;
+
ProcessorConsumer(final PipeProcessor processor, final EventCollector
eventCollector) {
this.processor = processor;
this.eventCollector = eventCollector;
@@ -44,6 +47,8 @@ class ProcessorConsumer implements
WorkHandler<EventContainer> {
@Override
public void onEvent(final EventContainer eventContainer) throws Exception {
+ dispatch.waitUntilRunningOrDropped();
+
// TODO: retry strategy
final Event event = eventContainer.getEvent();
if (event instanceof TabletInsertionEvent) {
@@ -54,4 +59,8 @@ class ProcessorConsumer implements
WorkHandler<EventContainer> {
processor.process(event, eventCollector);
}
}
+
+ public void setDispatch(final TaskDispatch dispatch) {
+ this.dispatch = dispatch;
+ }
}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
index e671ea0..37fbf4e 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
@@ -99,6 +99,7 @@ public class ProcessorTask extends Task {
for (int i = 0; i < parallelism; i++) {
processorConsumers[i] =
new ProcessorConsumer(pluginRuntime.constructProcessor(parameters),
sinkProducer);
+ processorConsumers[i].setDispatch(dispatch);
try {
processorConsumers[i].consumer().validate(new
PipeParameterValidator(parameters));
processorConsumers[i]
@@ -118,16 +119,18 @@ public class ProcessorTask extends Task {
disruptor.handleEventsWithWorkerPool(processorConsumers);
disruptor.setDefaultExceptionHandler(new ProcessorExceptionHandler());
+
+ disruptor.start();
}
@Override
public void startInternal() {
- disruptor.start();
+ // do nothing
}
@Override
public void stopInternal() {
- disruptor.halt();
+ // do nothing
}
@Override
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
index 2fe13f0..c17691e 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.collector.runtime.task.sink;
+import org.apache.iotdb.collector.runtime.task.TaskDispatch;
import org.apache.iotdb.collector.runtime.task.event.EventContainer;
import org.apache.iotdb.pipe.api.PipeSink;
import org.apache.iotdb.pipe.api.event.Event;
@@ -31,6 +32,8 @@ class SinkConsumer implements WorkHandler<EventContainer> {
private final PipeSink sink;
+ private TaskDispatch dispatch;
+
SinkConsumer(final PipeSink sink) {
this.sink = sink;
}
@@ -41,6 +44,8 @@ class SinkConsumer implements WorkHandler<EventContainer> {
@Override
public void onEvent(EventContainer eventContainer) throws Exception {
+ dispatch.waitUntilRunningOrDropped();
+
// TODO: retry strategy
final Event event = eventContainer.getEvent();
if (event instanceof TabletInsertionEvent) {
@@ -51,4 +56,8 @@ class SinkConsumer implements WorkHandler<EventContainer> {
sink.transfer(event);
}
}
+
+ public void setDispatch(final TaskDispatch dispatch) {
+ this.dispatch = dispatch;
+ }
}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
index e3e66db..d540b92 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
@@ -89,6 +89,7 @@ public class SinkTask extends Task {
consumers = new SinkConsumer[parallelism];
for (int i = 0; i < parallelism; i++) {
consumers[i] = new SinkConsumer(pluginRuntime.constructSink(parameters));
+ consumers[i].setDispatch(dispatch);
try {
consumers[i].consumer().validate(new
PipeParameterValidator(parameters));
consumers[i]
@@ -109,16 +110,18 @@ public class SinkTask extends Task {
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.setDefaultExceptionHandler(new SinkExceptionHandler());
+
+ disruptor.start();
}
@Override
public void startInternal() {
- disruptor.start();
+ // do nothing
}
@Override
public void stopInternal() {
- disruptor.halt();
+ // do nothing
}
@Override
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
index ca0c8c4..2cf5f67 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.collector.runtime.task.source;
import org.apache.iotdb.collector.runtime.plugin.PluginRuntime;
import org.apache.iotdb.collector.runtime.task.Task;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
import org.apache.iotdb.collector.runtime.task.event.EventCollector;
import org.apache.iotdb.collector.runtime.task.source.pull.PullSourceTask;
import org.apache.iotdb.collector.runtime.task.source.push.PushSourceTask;
@@ -34,20 +35,24 @@ import static
org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_SOURCE_P
public abstract class SourceTask extends Task {
protected final EventCollector processorProducer;
+ protected TaskStateEnum taskState;
protected SourceTask(
final String taskId,
final Map<String, String> attributes,
- final EventCollector processorProducer) {
+ final EventCollector processorProducer,
+ final TaskStateEnum taskState) {
super(
taskId, attributes, TASK_SOURCE_PARALLELISM_NUM.key(),
TASK_SOURCE_PARALLELISM_NUM.value());
this.processorProducer = processorProducer;
+ this.taskState = taskState;
}
public static SourceTask construct(
final String taskId,
final Map<String, String> attributes,
- final EventCollector processorProducer)
+ final EventCollector processorProducer,
+ final TaskStateEnum taskState)
throws Exception {
final PluginRuntime pluginRuntime =
RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() :
null;
@@ -57,10 +62,10 @@ public abstract class SourceTask extends Task {
final PipeParameters parameters = new PipeParameters(attributes);
if (pluginRuntime.isPullSource(parameters)) {
- return new PullSourceTask(taskId, attributes, processorProducer);
+ return new PullSourceTask(taskId, attributes, processorProducer,
taskState);
}
if (pluginRuntime.isPushSource(parameters)) {
- return new PushSourceTask(taskId, attributes, processorProducer);
+ return new PushSourceTask(taskId, attributes, processorProducer,
taskState);
}
throw new IllegalArgumentException("Unsupported source type");
}
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
index b47054a..d426526 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.collector.runtime.task.source.pull;
import org.apache.iotdb.collector.plugin.api.PullSource;
import
org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration;
import org.apache.iotdb.collector.runtime.plugin.PluginRuntime;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
import org.apache.iotdb.collector.runtime.task.event.EventCollector;
import org.apache.iotdb.collector.runtime.task.source.SourceTask;
import org.apache.iotdb.collector.service.RuntimeService;
@@ -50,8 +51,9 @@ public class PullSourceTask extends SourceTask {
public PullSourceTask(
final String taskId,
final Map<String, String> attributes,
- final EventCollector processorProducer) {
- super(taskId, attributes, processorProducer);
+ final EventCollector processorProducer,
+ final TaskStateEnum taskState) {
+ super(taskId, attributes, processorProducer, taskState);
}
@Override
@@ -101,14 +103,14 @@ public class PullSourceTask extends SourceTask {
.get(taskId)
.submit(
() -> {
- while (!isDropped.get()) {
+ while (dispatch.isRunning() &&
TaskStateEnum.RUNNING.equals(taskState)) {
try {
consumers[finalI].onScheduler();
} catch (final Exception e) {
LOGGER.warn("Failed to pull source", e);
}
- waitUntilRunningOrDropped();
+ dispatch.waitUntilRunningOrDropped();
}
});
}
@@ -116,12 +118,12 @@ public class PullSourceTask extends SourceTask {
@Override
public void startInternal() {
- // do nothing
+ this.taskState = TaskStateEnum.RUNNING;
}
@Override
public void stopInternal() {
- // do nothing
+ this.taskState = TaskStateEnum.STOPPED;
}
@Override
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
index 938f29f..7419ae7 100644
---
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.collector.runtime.task.source.push;
import org.apache.iotdb.collector.plugin.api.PushSource;
import
org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration;
import org.apache.iotdb.collector.runtime.plugin.PluginRuntime;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
import org.apache.iotdb.collector.runtime.task.event.EventCollector;
import org.apache.iotdb.collector.runtime.task.source.SourceTask;
import org.apache.iotdb.collector.service.RuntimeService;
@@ -41,8 +42,9 @@ public class PushSourceTask extends SourceTask {
public PushSourceTask(
final String taskId,
final Map<String, String> sourceParams,
- final EventCollector processorProducer) {
- super(taskId, sourceParams, processorProducer);
+ final EventCollector processorProducer,
+ final TaskStateEnum taskState) {
+ super(taskId, sourceParams, processorProducer, taskState);
}
@Override
@@ -64,7 +66,9 @@ public class PushSourceTask extends SourceTask {
pushSources[i].customize(
parameters,
new CollectorSourceRuntimeConfiguration(taskId, creationTime,
parallelism, i));
- pushSources[i].start();
+ if (TaskStateEnum.RUNNING.equals(taskState)) {
+ pushSources[i].start();
+ }
} catch (final Exception e) {
try {
pushSources[i].close();
@@ -78,12 +82,42 @@ public class PushSourceTask extends SourceTask {
@Override
public void startInternal() {
- // do nothing
+ if (this.taskState.equals(TaskStateEnum.RUNNING)) {
+ return;
+ }
+
+ if (pushSources != null) {
+ for (int i = 0; i < parallelism; i++) {
+ try {
+ pushSources[i].start();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to restart push source", e);
+ return;
+ }
+ }
+ }
+
+ this.taskState = TaskStateEnum.RUNNING;
}
@Override
public void stopInternal() {
- // do nothing
+ if (this.taskState.equals(TaskStateEnum.STOPPED)) {
+ return;
+ }
+
+ if (pushSources != null) {
+ for (int i = 0; i < parallelism; i++) {
+ try {
+ pushSources[i].close();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to stop source", e);
+ return;
+ }
+ }
+
+ this.taskState = TaskStateEnum.STOPPED;
+ }
}
@Override
diff --git
a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java
new file mode 100644
index 0000000..a5eb42e
--- /dev/null
+++
b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.service;
+
+import org.apache.iotdb.collector.persistence.DBConstant;
+import org.apache.iotdb.collector.persistence.PluginPersistence;
+import org.apache.iotdb.collector.persistence.TaskPersistence;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_INSTALL_LIB_DIR;
+
+public class PersistenceService implements IService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PersistenceService.class);
+
+ private static final AtomicReference<PluginPersistence> PLUGIN = new
AtomicReference<>();
+ private static final AtomicReference<TaskPersistence> TASK = new
AtomicReference<>();
+
+ @Override
+ public void start() {
+ initPluginDir();
+
+ PLUGIN.set(new PluginPersistence(DBConstant.PLUGIN_DATABASE_URL));
+ TASK.set(new TaskPersistence(DBConstant.TASK_DATABASE_URL));
+
+ PLUGIN.get().tryResume();
+ TASK.get().tryResume();
+ }
+
+ private void initPluginDir() {
+ final Path pluginInstallPath = Paths.get(PLUGIN_INSTALL_LIB_DIR.value());
+ try {
+ if (!Files.exists(pluginInstallPath)) {
+ FileUtils.forceMkdir(pluginInstallPath.toFile());
+ }
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to create plugin install directory", e);
+ }
+ }
+
+ public static Optional<PluginPersistence> plugin() {
+ return Optional.of(PLUGIN.get());
+ }
+
+ public static Optional<TaskPersistence> task() {
+ return Optional.of(TASK.get());
+ }
+
+ @Override
+ public void stop() {
+ PLUGIN.set(null);
+ TASK.set(null);
+ }
+
+ @Override
+ public String name() {
+ return "PersistenceService";
+ }
+}
diff --git
a/iotdb-collector/collector-core/src/main/resources/application.properties
b/iotdb-collector/collector-core/src/main/resources/application.properties
index 244ed55..925ef65 100644
--- a/iotdb-collector/collector-core/src/main/resources/application.properties
+++ b/iotdb-collector/collector-core/src/main/resources/application.properties
@@ -54,3 +54,17 @@ task_processor_ring_buffer_size=1024
# Effective mode: on every start
# Data type: int
task_sink_ring_buffer_size=1024
+
+####################
+### Plugin Configuration
+####################
+
+# The location of plugin jar file
+# Effective mode: on every start
+# Data type: string
+plugin_lib_dir=ext/plugin
+
+# Installation location of plugin jar file
+# Effective mode: on every start
+# Data type: string
+plugin_install_lib_dir=ext/plugin/install
\ No newline at end of file
diff --git a/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
b/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
index 25ea5b5..eec0e5f 100644
--- a/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
+++ b/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
@@ -41,50 +41,21 @@ paths:
"200":
$ref: '#/components/responses/SuccessExecutionStatus'
- /plugin/v1/alter:
- post:
- operationId: alterPlugin
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/AlterPluginRequest'
- responses:
- "200":
- $ref: '#/components/responses/SuccessExecutionStatus'
-
- /plugin/v1/start:
- post:
- operationId: startPlugin
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/StartPluginRequest'
- responses:
- "200":
- $ref: '#/components/responses/SuccessExecutionStatus'
-
- /plugin/v1/stop:
+ /plugin/v1/drop:
post:
- operationId: stopPlugin
+ operationId: dropPlugin
requestBody:
content:
application/json:
schema:
- $ref: '#/components/schemas/StopPluginRequest'
+ $ref: '#/components/schemas/DropPluginRequest'
responses:
"200":
$ref: '#/components/responses/SuccessExecutionStatus'
- /plugin/v1/drop:
+ /plugin/v1/show:
post:
- operationId: dropPlugin
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/DropPluginRequest'
+ operationId: showPlugin
responses:
"200":
$ref: '#/components/responses/SuccessExecutionStatus'
@@ -93,51 +64,16 @@ components:
schemas:
CreatePluginRequest:
properties:
- sourceAttribute:
- type: object
- additionalProperties:
- type: string
- processorAttribute:
- type: object
- additionalProperties:
- type: string
- sinkAttribute:
- type: object
- additionalProperties:
- type: string
- taskId:
- type: string
-
- AlterPluginRequest:
- properties:
- sourceAttribute:
- type: object
- additionalProperties:
- type: string
- processorAttribute:
- type: object
- additionalProperties:
- type: string
- sinkAttribute:
- type: object
- additionalProperties:
- type: string
- taskId:
+ pluginName:
type: string
-
- StartPluginRequest:
- properties:
- taskId:
+ className:
type: string
-
- StopPluginRequest:
- properties:
- taskId:
+ jarName:
type: string
DropPluginRequest:
properties:
- taskId:
+ pluginName:
type: string
ExecutionStatus:
diff --git a/pom.xml b/pom.xml
index e19571c..41af70a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -171,6 +171,7 @@
<spring-boot.version>2.7.18</spring-boot.version>
<!-- This is the last version to support the javax namespace -->
<spring.version>5.3.39</spring.version>
+ <sqlite.version>3.49.1.0</sqlite.version>
<!-- This was the last version to support Java 8 -->
<swagger.version>1.6.14</swagger.version>
<thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
@@ -862,6 +863,11 @@
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.xerial</groupId>
+ <artifactId>sqlite-jdbc</artifactId>
+ <version>${sqlite.version}</version>
+ </dependency>
<!-- Conflict:
json-smart (pulls in 9.3),
cglib (pulls in 7.1)