This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 66634eab443 Pipe Plugin: Pipe Task Safe Deletion, Plugin Directory
Structure Reorganization, and Isolated Pipe Plugin ClassLoader Implementation
(#12868)
66634eab443 is described below
commit 66634eab443fcd0210e97a3466e6fc0e180074f8
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jul 11 16:59:14 2024 +0800
Pipe Plugin: Pipe Task Safe Deletion, Plugin Directory Structure
Reorganization, and Isolated Pipe Plugin ClassLoader Implementation (#12868)
Currently, IOTDB allows PipePlugins that are being used by Pipes to be
deleted. When users drop a PipePlugin and then create a new PipePlugin with the
same fully qualified class name, IOTDB is unable to properly load the newly
created PipePlugin. The reason is that after IOTDB drops a PipePlugin, the
corresponding jar file is not deleted, and the class loader behavior in loading
the same fully qualified class name in the Install directory is random, as
there is no unified standard, wh [...]
Solutions:
1. **New Directory Structure**: Adopt a new directory structure
`install->pluginName->jar`. The advantage of using this new directory is that
the class loader does not need to load all jars. However, using this form will
also not support the case where different jars reference each other. For
example, if plugin A references some utility classes in the jar of plugin B,
this situation will not be allowed.
2. **Improve Drop Plugin Process**: For the process of dropping a plugin,
it is necessary to verify whether it can be deleted during the Lock process.
Therefore, it is possible to check in the Lock step whether there is a Pipe
using this plugin. If there is a Pipe using this plugin, the check fails; if
not, continue with the following process.
3. **ClassLoader for Each Create Plugin**: Create a new ClassLoader each
time a plugin is created. Each ClassLoader can only load a class with the same
fully qualified name, so using the same class loader may lead to exceptions.
However, the current strategy is to save the plugin's bytecode object class,
and we can implement on-demand loading, that is, only load into memory when it
is necessary to run the corresponding bytecode. Each plugin saves a
ClassLoader, and the one needed is l [...]
------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../persistence/pipe/PipePluginInfo.java | 32 ++++++++----
.../confignode/persistence/pipe/PipeTaskInfo.java | 58 ++++++++++++++++++++++
.../impl/pipe/plugin/DropPipePluginProcedure.java | 28 +++++++++--
.../pipe/agent/plugin/PipeDataNodePluginAgent.java | 35 +++++++------
.../db/pipe/agent/runtime/PipeAgentLauncher.java | 8 ++-
.../agent/plugin/PipeDataNodePluginAgentTest.java | 39 ++++++++++-----
.../commons/executable/ExecutableManager.java | 3 ++
.../pipe/agent/plugin/PipePluginConstructor.java | 14 ++++--
.../pipe/plugin/meta/PipePluginMetaKeeper.java | 35 ++++++-------
.../service/PipePluginClassLoaderManager.java | 42 ++++++++++------
.../service/PipePluginExecutableManager.java | 43 +++++++++++++++-
.../pipe/plugin/meta/PipePluginMetaTest.java | 2 +-
12 files changed, 250 insertions(+), 89 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index d3c46c2957d..c25d92c9b17 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -100,13 +100,6 @@ public class PipePluginInfo implements SnapshotProcessor {
"Failed to create PipePlugin [%s], the same name PipePlugin has
been created",
pluginName));
}
-
- if (pipePluginMetaKeeper.jarNameExistsAndMatchesMd5(jarName, jarMD5)) {
- throw new PipeException(
- String.format(
- "Failed to create PipePlugin [%s], the same name Jar [%s] but
different MD5 [%s] has existed",
- pluginName, jarName, jarMD5));
- }
}
public void validateBeforeDroppingPipePlugin(final String pluginName) {
@@ -187,8 +180,9 @@ public class PipePluginInfo implements SnapshotProcessor {
pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
if (createPipePluginPlan.getJarFile() != null) {
- pipePluginExecutableManager.saveToInstallDir(
+ pipePluginExecutableManager.savePluginToInstallDir(
ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
+ createPipePluginPlan.getPipePluginMeta().getPluginName(),
pipePluginMeta.getJarName());
}
@@ -208,9 +202,21 @@ public class PipePluginInfo implements SnapshotProcessor {
final String pluginName = dropPipePluginPlan.getPluginName();
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
- pipePluginMetaKeeper.removeJarNameAndMd5IfPossible(
- pipePluginMetaKeeper.getPipePluginMeta(pluginName).getJarName());
+ String jarName =
pipePluginMetaKeeper.getPipePluginMeta(pluginName).getJarName();
+ pipePluginMetaKeeper.removeJarNameAndMd5IfPossible(jarName);
pipePluginMetaKeeper.removePipePluginMeta(pluginName);
+
+ try {
+ pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName,
jarName);
+ } catch (IOException e) {
+ final String errorMessage =
+ String.format(
+ "Failed to execute dropPipePlugin(%s) on config nodes, because
of %s",
+ pluginName, e);
+ LOGGER.warn(errorMessage, e);
+ return new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(errorMessage);
+ }
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -226,10 +232,14 @@ public class PipePluginInfo implements SnapshotProcessor {
try {
final List<ByteBuffer> jarList = new ArrayList<>();
for (final String jarName : getPipePluginJarPlan.getJarNames()) {
+ String pluginName =
pipePluginMetaKeeper.getPluginNameByJarName(jarName);
+ if (pluginName == null) {
+ throw new PipeException(String.format("%s does not exist", jarName));
+ }
jarList.add(
ExecutableManager.transferToBytebuffer(
PipePluginExecutableManager.getInstance()
- .getFileStringUnderInstallByName(jarName)));
+ .getPluginInstallPath(pluginName, jarName)));
}
return new JarResp(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
} catch (final Exception e) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index f83a7eac35f..ee864076a5b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -24,6 +24,10 @@ import
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -61,6 +65,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -71,6 +76,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import static
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
+
public class PipeTaskInfo implements SnapshotProcessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskInfo.class);
@@ -344,6 +351,57 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
+ public void validatePipePluginUsageByPipe(String pluginName) {
+ acquireReadLock();
+ try {
+ validatePipePluginUsageByPipeInternal(pluginName);
+ } finally {
+ releaseReadLock();
+ }
+ }
+
+ private void validatePipePluginUsageByPipeInternal(String pluginName) {
+ Iterable<PipeMeta> pipeMetas = getPipeMetaList();
+ for (PipeMeta pipeMeta : pipeMetas) {
+ PipeParameters extractorParameters =
pipeMeta.getStaticMeta().getExtractorParameters();
+ final String extractorPluginName =
+ extractorParameters.getStringOrDefault(
+ Arrays.asList(PipeExtractorConstant.EXTRACTOR_KEY,
PipeExtractorConstant.SOURCE_KEY),
+ BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName());
+ if (pluginName.equals(extractorPluginName)) {
+ String exceptionMessage =
+ String.format(
+ "PipePlugin '%s' is already used by Pipe '%s' as a source.",
+ pluginName, pipeMeta.getStaticMeta().getPipeName());
+ throw new PipeException(exceptionMessage);
+ }
+
+ PipeParameters processorParameters =
pipeMeta.getStaticMeta().getProcessorParameters();
+ final String processorPluginName =
+ processorParameters.getString(PipeProcessorConstant.PROCESSOR_KEY);
+ if (pluginName.equals(processorPluginName)) {
+ String exceptionMessage =
+ String.format(
+ "PipePlugin '%s' is already used by Pipe '%s' as a processor.",
+ pluginName, pipeMeta.getStaticMeta().getPipeName());
+ throw new PipeException(exceptionMessage);
+ }
+
+ PipeParameters connectorParameters =
pipeMeta.getStaticMeta().getConnectorParameters();
+ final String connectorPluginName =
+ connectorParameters.getStringOrDefault(
+ Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY),
+ IOTDB_THRIFT_CONNECTOR.getPipePluginName());
+ if (pluginName.equals(connectorPluginName)) {
+ String exceptionMessage =
+ String.format(
+ "PipePlugin '%s' is already used by Pipe '%s' as a sink.",
+ pluginName, pipeMeta.getStaticMeta().getPipeName());
+ throw new PipeException(exceptionMessage);
+ }
+ }
+ }
+
/////////////////////////////// Pipe Task Management
///////////////////////////////
public TSStatus createPipe(final CreatePipePlanV2 plan) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index a69d2dc096b..ed4925316ae 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import
org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator;
+import
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
@@ -44,6 +46,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
/**
* This class extends {@link AbstractNodeProcedure} to make sure that when a
{@link
@@ -104,18 +107,34 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
private Flow executeFromLock(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: executeFromLock({})", pluginName);
+
+ final PipeTaskCoordinator pipeTaskCoordinator =
+ env.getConfigManager().getPipeManager().getPipeTaskCoordinator();
final PipePluginCoordinator pipePluginCoordinator =
env.getConfigManager().getPipeManager().getPipePluginCoordinator();
+ final AtomicReference<PipeTaskInfo> pipeTaskInfo =
pipeTaskCoordinator.tryLock();
+ if (pipeTaskInfo == null) {
+ String exceptionMessage =
+ String.format(
+ "ProcedureId %d failed to acquire pipe lock due to high
competition with other pipe operations. "
+ + "The PipeTaskInfo is frequently accessed by other
operations.",
+ getProcId());
+ LOGGER.warn(exceptionMessage);
+ setFailure(new ProcedureException(exceptionMessage));
+ return Flow.NO_MORE_STATE;
+ }
pipePluginCoordinator.lock();
try {
pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
+ pipeTaskInfo.get().validatePipePluginUsageByPipe(pluginName);
} catch (PipeException e) {
// if the pipe plugin is a built-in plugin, we should not drop it
LOGGER.warn(e.getMessage());
- setFailure(new ProcedureException(e.getMessage()));
pipePluginCoordinator.unlock();
+ pipeTaskCoordinator.unlock();
+ setFailure(new ProcedureException(e.getMessage()));
return Flow.NO_MORE_STATE;
}
@@ -124,7 +143,6 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
} catch (ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
}
-
setNextState(DropPipePluginState.DROP_ON_DATA_NODES);
return Flow.HAS_MORE_STATE;
}
@@ -132,8 +150,7 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: executeFromDropOnDataNodes({})",
pluginName);
- if
(RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName,
false))
- .getCode()
+ if
(RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName,
true)).getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
setNextState(DropPipePluginState.DROP_ON_CONFIG_NODES);
return Flow.HAS_MORE_STATE;
@@ -160,7 +177,7 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
LOGGER.info("DropPipePluginProcedure: executeFromUnlock({})", pluginName);
env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
-
+ env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
return Flow.NO_MORE_STATE;
}
@@ -184,6 +201,7 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
LOGGER.info("DropPipePluginProcedure: rollbackFromLock({})", pluginName);
env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
+ env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
}
private void rollbackFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
index 12dfeb47430..c93bb5a5882 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
@@ -77,7 +77,7 @@ public class PipeDataNodePluginAgent {
// register process from here
checkIfRegistered(pipePluginMeta);
- saveJarFileIfNeeded(pipePluginMeta.getJarName(), jarFile);
+ saveJarFileIfNeeded(pipePluginMeta.getPluginName(),
pipePluginMeta.getJarName(), jarFile);
doRegister(pipePluginMeta);
} finally {
lock.unlock();
@@ -102,7 +102,8 @@ public class PipeDataNodePluginAgent {
}
if (PipePluginExecutableManager.getInstance()
- .hasFileUnderInstallDir(pipePluginMeta.getJarName())
+ .hasPluginFileUnderInstallDir(
+ pipePluginMeta.getPluginName(), pipePluginMeta.getJarName())
&&
!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
String errMsg =
String.format(
@@ -118,9 +119,11 @@ public class PipeDataNodePluginAgent {
// we allow users to register the same pipe plugin multiple times without
any error
}
- private void saveJarFileIfNeeded(String jarName, ByteBuffer byteBuffer)
throws IOException {
+ private void saveJarFileIfNeeded(String pluginName, String jarName,
ByteBuffer byteBuffer)
+ throws IOException {
if (byteBuffer != null) {
- PipePluginExecutableManager.getInstance().saveToInstallDir(byteBuffer,
jarName);
+ PipePluginExecutableManager.getInstance()
+ .savePluginToInstallDir(byteBuffer, pluginName, jarName);
}
}
@@ -136,17 +139,19 @@ public class PipeDataNodePluginAgent {
final String className = pipePluginMeta.getClassName();
try {
- final PipePluginClassLoader currentActiveClassLoader =
-
PipePluginClassLoaderManager.getInstance().updateAndGetActiveClassLoader();
- updateAllRegisteredClasses(currentActiveClassLoader);
+ PipePluginClassLoaderManager classLoaderManager =
PipePluginClassLoaderManager.getInstance();
+ String pluginDirPath =
+
PipePluginExecutableManager.getInstance().getPluginsDirPath(pluginName);
+ final PipePluginClassLoader pipePluginClassLoader =
+ classLoaderManager.createPipePluginClassLoader(pluginDirPath);
- final Class<?> pluginClass = Class.forName(className, true,
currentActiveClassLoader);
+ final Class<?> pluginClass = Class.forName(className, true,
pipePluginClassLoader);
@SuppressWarnings("unused") // ensure that it is a PipePlugin class
final PipePlugin ignored = (PipePlugin)
pluginClass.getDeclaredConstructor().newInstance();
pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
- pipePluginMetaKeeper.addPluginAndClass(pluginName, pluginClass);
+ classLoaderManager.addPluginAndClassLoader(pluginName,
pipePluginClassLoader);
} catch (IOException
| InstantiationException
| InvocationTargetException
@@ -164,13 +169,6 @@ public class PipeDataNodePluginAgent {
}
}
- private void updateAllRegisteredClasses(PipePluginClassLoader
activeClassLoader)
- throws ClassNotFoundException {
- for (PipePluginMeta information :
pipePluginMetaKeeper.getAllPipePluginMeta()) {
- pipePluginMetaKeeper.updatePluginClass(information, activeClassLoader);
- }
- }
-
public void deregister(String pluginName, boolean needToDeleteJar) throws
PipeException {
lock.lock();
try {
@@ -185,11 +183,12 @@ public class PipeDataNodePluginAgent {
// remove anyway
pipePluginMetaKeeper.removePipePluginMeta(pluginName);
- pipePluginMetaKeeper.removePluginClass(pluginName);
+
PipePluginClassLoaderManager.getInstance().removePluginClassLoader(pluginName);
// if it is needed to delete jar file of the pipe plugin, delete both
jar file and md5
if (information != null && needToDeleteJar) {
-
PipePluginExecutableManager.getInstance().removeFileUnderLibRoot(information.getJarName());
+ PipePluginExecutableManager.getInstance()
+ .removePluginFileUnderLibRoot(information.getPluginName(),
information.getJarName());
PipePluginExecutableManager.getInstance()
.removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 7138369639a..4e92f94bf51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -114,7 +114,8 @@ class PipeAgentLauncher {
}
// If jar does not exist, add current pipePluginMeta to list
if (!PipePluginExecutableManager.getInstance()
- .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
+ .hasPluginFileUnderInstallDir(
+ pipePluginMeta.getPluginName(), pipePluginMeta.getJarName())) {
pipePluginMetaList.add(pipePluginMeta);
} else {
try {
@@ -144,7 +145,10 @@ class PipeAgentLauncher {
final List<ByteBuffer> jarList = resp.getJarList();
for (int i = 0; i < pipePluginMetaList.size(); i++) {
PipePluginExecutableManager.getInstance()
- .saveToInstallDir(jarList.get(i),
pipePluginMetaList.get(i).getJarName());
+ .savePluginToInstallDir(
+ jarList.get(i),
+ pipePluginMetaList.get(i).getPluginName(),
+ pipePluginMetaList.get(i).getJarName());
}
} catch (IOException | TException | ClientManagerException e) {
throw new StartupException(e);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
index bef7f6bdee4..8af0dff5511 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
+import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -40,13 +41,25 @@ import java.nio.file.Paths;
import java.util.HashMap;
public class PipeDataNodePluginAgentTest {
- private static final String TMP_DIR = "PipePluginAgentTest_libroot";
+ private static final String TMP_LIB_ROOT_DIR = "PipePluginAgentTest_libroot";
+ private static final String TMP_TEMP_LIB_ROOT_DIR =
"PipePluginAgentTest_temporarylibroot";
+ private static final PipePluginMeta PIPE_PLUGIN_META =
+ new PipePluginMeta(
+ "PLUGIN-NAME",
+
"org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor",
+ false,
+ "IoTDBDataRegionExtractor.jar",
+ "md5");
@Before
public void before() {
try {
- Files.createDirectory(Paths.get(TMP_DIR));
- PipePluginClassLoaderManager.setupAndGetInstance(TMP_DIR);
+ PipePluginExecutableManager.setupAndGetInstance(TMP_LIB_ROOT_DIR,
TMP_TEMP_LIB_ROOT_DIR);
+ PipePluginClassLoaderManager.setupAndGetInstance(TMP_LIB_ROOT_DIR);
+ String pluginPath =
+ PipePluginExecutableManager.getInstance()
+ .getPluginsDirPath(PIPE_PLUGIN_META.getPluginName());
+ Files.createDirectories(Paths.get(pluginPath));
} catch (IOException e) {
Assert.fail();
}
@@ -55,7 +68,13 @@ public class PipeDataNodePluginAgentTest {
@After
public void after() {
try {
- Files.deleteIfExists(Paths.get(TMP_DIR));
+ String pluginPath =
+ PipePluginExecutableManager.getInstance()
+ .getPluginsDirPath(PIPE_PLUGIN_META.getPluginName());
+ Files.deleteIfExists(Paths.get(pluginPath));
+
Files.deleteIfExists(Paths.get(PipePluginExecutableManager.getInstance().getInstallDir()));
+ Files.deleteIfExists(Paths.get(TMP_TEMP_LIB_ROOT_DIR));
+ Files.deleteIfExists(Paths.get(TMP_LIB_ROOT_DIR));
} catch (IOException e) {
Assert.fail();
}
@@ -64,16 +83,10 @@ public class PipeDataNodePluginAgentTest {
@Test
public void testPipePluginAgent() {
PipeDataNodePluginAgent agent = new PipeDataNodePluginAgent();
+
try {
- agent.register(
- new PipePluginMeta(
- "plugin-name",
-
"org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor",
- false,
- "jar",
- "md5"),
- null);
- agent.deregister("plugin-name", false);
+ agent.register(PIPE_PLUGIN_META, null);
+ agent.deregister(PIPE_PLUGIN_META.getPluginName(), true);
} catch (Exception e) {
Assert.fail();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 8dc39bd37f0..435ee489234 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -232,6 +232,9 @@ public class ExecutableManager {
try {
Path path = Paths.get(destination);
if (!Files.exists(path)) {
+ if (!Files.exists(path.getParent())) {
+ Files.createDirectories(path.getParent());
+ }
Files.createFile(path);
}
// FileOutPutStream is not in append mode by default, so the file will
be overridden if it
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginConstructor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginConstructor.java
index 80cde27b655..e5253ad705b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginConstructor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginConstructor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
+import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -82,12 +83,19 @@ public abstract class PipePluginConstructor {
}
try {
- return (PipePlugin)
-
pluginMetaKeeper.getPluginClass(pluginName).getDeclaredConstructor().newInstance();
+ final Class<?> pluginClass =
+ information.isBuiltin()
+ ?
pluginMetaKeeper.getBuiltinPluginClass(information.getPluginName())
+ : Class.forName(
+ information.getClassName(),
+ true,
+
PipePluginClassLoaderManager.getInstance().getPluginClassLoader(pluginName));
+ return (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
} catch (InstantiationException
| InvocationTargetException
| NoSuchMethodException
- | IllegalAccessException e) {
+ | IllegalAccessException
+ | ClassNotFoundException e) {
String errorMessage =
String.format(
"Failed to reflect PipePlugin %s(%s) instance, because %s",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
index 63c6995b151..1354fe6d243 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.commons.pipe.plugin.meta;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
-import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -34,21 +33,20 @@ import java.util.concurrent.ConcurrentHashMap;
public abstract class PipePluginMetaKeeper {
protected final Map<String, PipePluginMeta> pipePluginNameToMetaMap = new
ConcurrentHashMap<>();
- protected final Map<String, Class<?>> pipePluginNameToClassMap;
+ protected final Map<String, Class<?>> builtinPipePluginNameToClassMap;
public PipePluginMetaKeeper() {
- pipePluginNameToClassMap = new ConcurrentHashMap<>();
-
- loadBuiltInPlugins();
+ builtinPipePluginNameToClassMap = new ConcurrentHashMap<>();
+ loadBuiltinPlugins();
}
- protected void loadBuiltInPlugins() {
+ protected void loadBuiltinPlugins() {
for (final BuiltinPipePlugin builtinPipePlugin :
BuiltinPipePlugin.values()) {
addPipePluginMeta(
builtinPipePlugin.getPipePluginName(),
new PipePluginMeta(
builtinPipePlugin.getPipePluginName(),
builtinPipePlugin.getClassName()));
- addPluginAndClass(
+ addBuiltinPluginClass(
builtinPipePlugin.getPipePluginName(),
builtinPipePlugin.getPipePluginClass());
}
}
@@ -73,22 +71,21 @@ public abstract class PipePluginMetaKeeper {
return pipePluginNameToMetaMap.containsKey(pluginName.toUpperCase());
}
- public void addPluginAndClass(String pluginName, Class<?> clazz) {
- pipePluginNameToClassMap.put(pluginName.toUpperCase(), clazz);
- }
-
- public Class<?> getPluginClass(String pluginName) {
- return pipePluginNameToClassMap.get(pluginName.toUpperCase());
+ private void addBuiltinPluginClass(String pluginName, Class<?>
builtinPipePluginClass) {
+ builtinPipePluginNameToClassMap.put(pluginName.toUpperCase(),
builtinPipePluginClass);
}
- public void removePluginClass(String pluginName) {
- pipePluginNameToClassMap.remove(pluginName.toUpperCase());
+ public Class<?> getBuiltinPluginClass(String pluginName) {
+ return builtinPipePluginNameToClassMap.get(pluginName.toUpperCase());
}
- public void updatePluginClass(PipePluginMeta pipePluginMeta,
PipePluginClassLoader classLoader)
- throws ClassNotFoundException {
- final Class<?> functionClass =
Class.forName(pipePluginMeta.getClassName(), true, classLoader);
- pipePluginNameToClassMap.put(pipePluginMeta.getPluginName().toUpperCase(),
functionClass);
+ public String getPluginNameByJarName(String jarName) {
+ for (Map.Entry<String, PipePluginMeta> entry :
pipePluginNameToMetaMap.entrySet()) {
+ if (entry.getValue().getJarName().equals(jarName)) {
+ return entry.getKey();
+ }
+ }
+ return null;
}
protected void processTakeSnapshot(OutputStream outputStream) throws
IOException {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
index 98ccdd8eb92..ccc10e624ca 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
@@ -23,10 +23,13 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.pipe.api.PipePlugin;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
@NotThreadSafe
public class PipePluginClassLoaderManager implements IService {
@@ -34,29 +37,39 @@ public class PipePluginClassLoaderManager implements
IService {
private final String libRoot;
/**
- * activeClassLoader is used to load all classes under libRoot. libRoot may
be updated before the
- * user executes CREATE PIPEPLUGIN or after the user executes DROP
PIPEPLUGIN. Therefore, we need
- * to continuously maintain the activeClassLoader so that the classes it
loads are always
- * up-to-date.
+ * Each {@link PipePlugin} is equipped with a dedicated {@link
PipePluginClassLoader}. When a
+ * {@link PipePlugin} is created, the corresponding {@link
PipePluginClassLoader} is generated and
+ * used to load the {@link PipePlugin}. When the {@link PipePlugin} is
deleted, its associated
+ * {@link PipePluginClassLoader} is also removed. The lifecycle of the {@link
+ * PipePluginClassLoader} is strictly consistent with the lifecycle of the
{@link PipePlugin} it
+ * serves.
*/
- private volatile PipePluginClassLoader activeClassLoader;
+ private final Map<String, PipePluginClassLoader>
pipePluginNameToClassLoaderMap;
private PipePluginClassLoaderManager(String libRoot) throws IOException {
this.libRoot = libRoot;
- activeClassLoader = new PipePluginClassLoader(libRoot);
+ pipePluginNameToClassLoaderMap = new ConcurrentHashMap<>();
}
- public PipePluginClassLoader updateAndGetActiveClassLoader() throws
IOException {
- PipePluginClassLoader deprecatedClassLoader = activeClassLoader;
- activeClassLoader = new PipePluginClassLoader(libRoot);
- if (deprecatedClassLoader != null) {
- deprecatedClassLoader.markAsDeprecated();
+ public void removePluginClassLoader(String pluginName) throws IOException {
+ PipePluginClassLoader classLoader =
+ pipePluginNameToClassLoaderMap.remove(pluginName.toUpperCase());
+ if (classLoader != null) {
+ classLoader.markAsDeprecated();
}
- return activeClassLoader;
}
- public PipePluginClassLoader getActiveClassLoader() {
- return activeClassLoader;
+ public PipePluginClassLoader getPluginClassLoader(String pluginName) {
+ return pipePluginNameToClassLoaderMap.get(pluginName.toUpperCase());
+ }
+
+ public void addPluginAndClassLoader(String pluginName, PipePluginClassLoader
classLoader) {
+ pipePluginNameToClassLoaderMap.put(pluginName.toUpperCase(), classLoader);
+ }
+
+ public PipePluginClassLoader createPipePluginClassLoader(String
pluginDirPath)
+ throws IOException {
+ return new PipePluginClassLoader(pluginDirPath);
}
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -67,7 +80,6 @@ public class PipePluginClassLoaderManager implements IService
{
public void start() throws StartupException {
try {
SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
- activeClassLoader = new PipePluginClassLoader(libRoot);
} catch (IOException e) {
throw new StartupException(this.getID().getName(), e.getMessage());
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
index 68756e2a0de..8d403369eb3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
@@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
public class PipePluginExecutableManager extends ExecutableManager {
@@ -45,7 +47,7 @@ public class PipePluginExecutableManager extends
ExecutableManager {
final String pluginName = pipePluginMeta.getPluginName();
final String md5FilePath = pluginName + ".txt";
- if (hasFileUnderInstallDir(md5FilePath)) {
+ if (hasFileUnderTemporaryRoot(md5FilePath)) {
try {
return
readTextFromFileUnderTemporaryRoot(md5FilePath).equals(pipePluginMeta.getJarMD5());
} catch (IOException e) {
@@ -58,7 +60,7 @@ public class PipePluginExecutableManager extends
ExecutableManager {
final String md5 =
DigestUtils.md5Hex(
Files.newInputStream(
- Paths.get(getInstallDir() + File.separator +
pipePluginMeta.getJarName())));
+ Paths.get(getPluginInstallPath(pluginName,
pipePluginMeta.getJarName()))));
// Save the md5 in a txt under trigger temporary lib
saveTextAsFileUnderTemporaryRoot(md5, md5FilePath);
return md5.equals(pipePluginMeta.getJarMD5());
@@ -91,4 +93,41 @@ public class PipePluginExecutableManager extends
ExecutableManager {
public static PipePluginExecutableManager getInstance() {
return instance;
}
+
+ public boolean hasPluginFileUnderInstallDir(String pluginName, String
fileName) {
+ return Files.exists(Paths.get(getPluginInstallPath(pluginName, fileName)));
+ }
+
+ public String getPluginsDirPath(String pluginName) {
+ return this.libRoot + File.separator + INSTALL_DIR + File.separator +
pluginName;
+ }
+
+ public void removePluginFileUnderLibRoot(String pluginName, String fileName)
throws IOException {
+ String pluginPath = getPluginInstallPath(pluginName, fileName);
+ Path path = Paths.get(pluginPath);
+ Files.deleteIfExists(path);
+ Files.deleteIfExists(path.getParent());
+ }
+
+ public String getPluginInstallPath(String pluginName, String fileName) {
+ return this.libRoot
+ + File.separator
+ + INSTALL_DIR
+ + File.separator
+ + pluginName
+ + File.separator
+ + fileName;
+ }
+
+ /**
+ * @param byteBuffer file
+ * @param pluginName
+ * @param fileName Absolute Path will be libRoot + File_Separator +
INSTALL_DIR + File_Separator +
+ * pluginName + File_Separator + fileName
+ */
+ public void savePluginToInstallDir(ByteBuffer byteBuffer, String pluginName,
String fileName)
+ throws IOException {
+ String destination = getPluginInstallPath(pluginName, fileName);
+ saveToDir(byteBuffer, destination);
+ }
}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaTest.java
index f162e627f12..9ce3305751d 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaTest.java
@@ -54,6 +54,6 @@ public class PipePluginMetaTest {
DataNodePipePluginMetaKeeper keeper = new DataNodePipePluginMetaKeeper();
Assert.assertEquals(
BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginClass(),
-
keeper.getPluginClass(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()));
+
keeper.getBuiltinPluginClass(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()));
}
}