This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 66634eab443 Pipe Plugin: Pipe Task Safe Deletion, Plugin Directory 
Structure Reorganization, and Isolated Pipe Plugin ClassLoader Implementation 
(#12868)
66634eab443 is described below

commit 66634eab443fcd0210e97a3466e6fc0e180074f8
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jul 11 16:59:14 2024 +0800

    Pipe Plugin: Pipe Task Safe Deletion, Plugin Directory Structure 
Reorganization, and Isolated Pipe Plugin ClassLoader Implementation (#12868)
    
    Currently, IOTDB allows PipePlugins that are being used by Pipes to be 
deleted. When users drop a PipePlugin and then create a new PipePlugin with the 
same fully qualified class name, IOTDB is unable to properly load the newly 
created PipePlugin. The reason is that after IOTDB drops a PipePlugin, the 
corresponding jar file is not deleted, and the class loader behavior in loading 
the same fully qualified class name in the Install directory is random, as 
there is no unified standard, wh [...]
    
    Solutions:
    
    1. **New Directory Structure**: Adopt a new directory structure 
`install->pluginName->jar`. The advantage of using this new directory is that 
the class loader does not need to load all jars. However, using this form will 
also not support the case where different jars reference each other. For 
example, if plugin A references some utility classes in the jar of plugin B, 
this situation will not be allowed.
    
    2. **Improve Drop Plugin Process**: For the process of dropping a plugin, 
it is necessary to verify whether it can be deleted during the Lock process. 
Therefore, it is possible to check in the Lock step whether there is a Pipe 
using this plugin. If there is a Pipe using this plugin, the check fails; if 
not, continue with the following process.
    
    3. **ClassLoader for Each Create Plugin**: Create a new ClassLoader each 
time a plugin is created. Each ClassLoader can only load a class with the same 
fully qualified name, so using the same class loader may lead to exceptions. 
However, the current strategy is to save the plugin's bytecode object class, 
and we can implement on-demand loading, that is, only load into memory when it 
is necessary to run the corresponding bytecode. Each plugin saves a 
ClassLoader, and the one needed is l [...]
    
    ------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../persistence/pipe/PipePluginInfo.java           | 32 ++++++++----
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 58 ++++++++++++++++++++++
 .../impl/pipe/plugin/DropPipePluginProcedure.java  | 28 +++++++++--
 .../pipe/agent/plugin/PipeDataNodePluginAgent.java | 35 +++++++------
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   |  8 ++-
 .../agent/plugin/PipeDataNodePluginAgentTest.java  | 39 ++++++++++-----
 .../commons/executable/ExecutableManager.java      |  3 ++
 .../pipe/agent/plugin/PipePluginConstructor.java   | 14 ++++--
 .../pipe/plugin/meta/PipePluginMetaKeeper.java     | 35 ++++++-------
 .../service/PipePluginClassLoaderManager.java      | 42 ++++++++++------
 .../service/PipePluginExecutableManager.java       | 43 +++++++++++++++-
 .../pipe/plugin/meta/PipePluginMetaTest.java       |  2 +-
 12 files changed, 250 insertions(+), 89 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index d3c46c2957d..c25d92c9b17 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -100,13 +100,6 @@ public class PipePluginInfo implements SnapshotProcessor {
               "Failed to create PipePlugin [%s], the same name PipePlugin has 
been created",
               pluginName));
     }
-
-    if (pipePluginMetaKeeper.jarNameExistsAndMatchesMd5(jarName, jarMD5)) {
-      throw new PipeException(
-          String.format(
-              "Failed to create PipePlugin [%s], the same name Jar [%s] but 
different MD5 [%s] has existed",
-              pluginName, jarName, jarMD5));
-    }
   }
 
   public void validateBeforeDroppingPipePlugin(final String pluginName) {
@@ -187,8 +180,9 @@ public class PipePluginInfo implements SnapshotProcessor {
           pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
 
       if (createPipePluginPlan.getJarFile() != null) {
-        pipePluginExecutableManager.saveToInstallDir(
+        pipePluginExecutableManager.savePluginToInstallDir(
             ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
+            createPipePluginPlan.getPipePluginMeta().getPluginName(),
             pipePluginMeta.getJarName());
       }
 
@@ -208,9 +202,21 @@ public class PipePluginInfo implements SnapshotProcessor {
     final String pluginName = dropPipePluginPlan.getPluginName();
 
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
-      pipePluginMetaKeeper.removeJarNameAndMd5IfPossible(
-          pipePluginMetaKeeper.getPipePluginMeta(pluginName).getJarName());
+      String jarName = 
pipePluginMetaKeeper.getPipePluginMeta(pluginName).getJarName();
+      pipePluginMetaKeeper.removeJarNameAndMd5IfPossible(jarName);
       pipePluginMetaKeeper.removePipePluginMeta(pluginName);
+
+      try {
+        pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName, 
jarName);
+      } catch (IOException e) {
+        final String errorMessage =
+            String.format(
+                "Failed to execute dropPipePlugin(%s) on config nodes, because 
of %s",
+                pluginName, e);
+        LOGGER.warn(errorMessage, e);
+        return new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage(errorMessage);
+      }
     }
 
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -226,10 +232,14 @@ public class PipePluginInfo implements SnapshotProcessor {
     try {
       final List<ByteBuffer> jarList = new ArrayList<>();
       for (final String jarName : getPipePluginJarPlan.getJarNames()) {
+        String pluginName = 
pipePluginMetaKeeper.getPluginNameByJarName(jarName);
+        if (pluginName == null) {
+          throw new PipeException(String.format("%s does not exist", jarName));
+        }
         jarList.add(
             ExecutableManager.transferToBytebuffer(
                 PipePluginExecutableManager.getInstance()
-                    .getFileStringUnderInstallByName(jarName)));
+                    .getPluginInstallPath(pluginName, jarName)));
       }
       return new JarResp(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
     } catch (final Exception e) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index f83a7eac35f..ee864076a5b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -24,6 +24,10 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -61,6 +65,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -71,6 +76,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
+
 public class PipeTaskInfo implements SnapshotProcessor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskInfo.class);
@@ -344,6 +351,57 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
+  public void validatePipePluginUsageByPipe(String pluginName) {
+    acquireReadLock();
+    try {
+      validatePipePluginUsageByPipeInternal(pluginName);
+    } finally {
+      releaseReadLock();
+    }
+  }
+
+  private void validatePipePluginUsageByPipeInternal(String pluginName) {
+    Iterable<PipeMeta> pipeMetas = getPipeMetaList();
+    for (PipeMeta pipeMeta : pipeMetas) {
+      PipeParameters extractorParameters = 
pipeMeta.getStaticMeta().getExtractorParameters();
+      final String extractorPluginName =
+          extractorParameters.getStringOrDefault(
+              Arrays.asList(PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
+              BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName());
+      if (pluginName.equals(extractorPluginName)) {
+        String exceptionMessage =
+            String.format(
+                "PipePlugin '%s' is already used by Pipe '%s' as a source.",
+                pluginName, pipeMeta.getStaticMeta().getPipeName());
+        throw new PipeException(exceptionMessage);
+      }
+
+      PipeParameters processorParameters = 
pipeMeta.getStaticMeta().getProcessorParameters();
+      final String processorPluginName =
+          processorParameters.getString(PipeProcessorConstant.PROCESSOR_KEY);
+      if (pluginName.equals(processorPluginName)) {
+        String exceptionMessage =
+            String.format(
+                "PipePlugin '%s' is already used by Pipe '%s' as a processor.",
+                pluginName, pipeMeta.getStaticMeta().getPipeName());
+        throw new PipeException(exceptionMessage);
+      }
+
+      PipeParameters connectorParameters = 
pipeMeta.getStaticMeta().getConnectorParameters();
+      final String connectorPluginName =
+          connectorParameters.getStringOrDefault(
+              Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY),
+              IOTDB_THRIFT_CONNECTOR.getPipePluginName());
+      if (pluginName.equals(connectorPluginName)) {
+        String exceptionMessage =
+            String.format(
+                "PipePlugin '%s' is already used by Pipe '%s' as a sink.",
+                pluginName, pipeMeta.getStaticMeta().getPipeName());
+        throw new PipeException(exceptionMessage);
+      }
+    }
+  }
+
   /////////////////////////////// Pipe Task Management 
///////////////////////////////
 
   public TSStatus createPipe(final CreatePipePlanV2 plan) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index a69d2dc096b..ed4925316ae 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -21,6 +21,8 @@ package 
org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
 
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import 
org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator;
+import 
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
@@ -44,6 +46,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * This class extends {@link AbstractNodeProcedure} to make sure that when a 
{@link
@@ -104,18 +107,34 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
 
   private Flow executeFromLock(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipePluginProcedure: executeFromLock({})", pluginName);
+
+    final PipeTaskCoordinator pipeTaskCoordinator =
+        env.getConfigManager().getPipeManager().getPipeTaskCoordinator();
     final PipePluginCoordinator pipePluginCoordinator =
         env.getConfigManager().getPipeManager().getPipePluginCoordinator();
 
+    final AtomicReference<PipeTaskInfo> pipeTaskInfo = 
pipeTaskCoordinator.tryLock();
+    if (pipeTaskInfo == null) {
+      String exceptionMessage =
+          String.format(
+              "ProcedureId %d failed to acquire pipe lock due to high 
competition with other pipe operations. "
+                  + "The PipeTaskInfo is frequently accessed by other 
operations.",
+              getProcId());
+      LOGGER.warn(exceptionMessage);
+      setFailure(new ProcedureException(exceptionMessage));
+      return Flow.NO_MORE_STATE;
+    }
     pipePluginCoordinator.lock();
 
     try {
       
pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
+      pipeTaskInfo.get().validatePipePluginUsageByPipe(pluginName);
     } catch (PipeException e) {
       // if the pipe plugin is a built-in plugin, we should not drop it
       LOGGER.warn(e.getMessage());
-      setFailure(new ProcedureException(e.getMessage()));
       pipePluginCoordinator.unlock();
+      pipeTaskCoordinator.unlock();
+      setFailure(new ProcedureException(e.getMessage()));
       return Flow.NO_MORE_STATE;
     }
 
@@ -124,7 +143,6 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
     } catch (ConsensusException e) {
       LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
     }
-
     setNextState(DropPipePluginState.DROP_ON_DATA_NODES);
     return Flow.HAS_MORE_STATE;
   }
@@ -132,8 +150,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
   private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipePluginProcedure: executeFromDropOnDataNodes({})", 
pluginName);
 
-    if 
(RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName, 
false))
-            .getCode()
+    if 
(RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName, 
true)).getCode()
         == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       setNextState(DropPipePluginState.DROP_ON_CONFIG_NODES);
       return Flow.HAS_MORE_STATE;
@@ -160,7 +177,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
     LOGGER.info("DropPipePluginProcedure: executeFromUnlock({})", pluginName);
 
     
env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
-
+    env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
     return Flow.NO_MORE_STATE;
   }
 
@@ -184,6 +201,7 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
     LOGGER.info("DropPipePluginProcedure: rollbackFromLock({})", pluginName);
 
     
env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
+    env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
   }
 
   private void rollbackFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
index 12dfeb47430..c93bb5a5882 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
@@ -77,7 +77,7 @@ public class PipeDataNodePluginAgent {
 
       // register process from here
       checkIfRegistered(pipePluginMeta);
-      saveJarFileIfNeeded(pipePluginMeta.getJarName(), jarFile);
+      saveJarFileIfNeeded(pipePluginMeta.getPluginName(), 
pipePluginMeta.getJarName(), jarFile);
       doRegister(pipePluginMeta);
     } finally {
       lock.unlock();
@@ -102,7 +102,8 @@ public class PipeDataNodePluginAgent {
     }
 
     if (PipePluginExecutableManager.getInstance()
-            .hasFileUnderInstallDir(pipePluginMeta.getJarName())
+            .hasPluginFileUnderInstallDir(
+                pipePluginMeta.getPluginName(), pipePluginMeta.getJarName())
         && 
!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
       String errMsg =
           String.format(
@@ -118,9 +119,11 @@ public class PipeDataNodePluginAgent {
     // we allow users to register the same pipe plugin multiple times without 
any error
   }
 
-  private void saveJarFileIfNeeded(String jarName, ByteBuffer byteBuffer) 
throws IOException {
+  private void saveJarFileIfNeeded(String pluginName, String jarName, 
ByteBuffer byteBuffer)
+      throws IOException {
     if (byteBuffer != null) {
-      PipePluginExecutableManager.getInstance().saveToInstallDir(byteBuffer, 
jarName);
+      PipePluginExecutableManager.getInstance()
+          .savePluginToInstallDir(byteBuffer, pluginName, jarName);
     }
   }
 
@@ -136,17 +139,19 @@ public class PipeDataNodePluginAgent {
     final String className = pipePluginMeta.getClassName();
 
     try {
-      final PipePluginClassLoader currentActiveClassLoader =
-          
PipePluginClassLoaderManager.getInstance().updateAndGetActiveClassLoader();
-      updateAllRegisteredClasses(currentActiveClassLoader);
+      PipePluginClassLoaderManager classLoaderManager = 
PipePluginClassLoaderManager.getInstance();
+      String pluginDirPath =
+          
PipePluginExecutableManager.getInstance().getPluginsDirPath(pluginName);
+      final PipePluginClassLoader pipePluginClassLoader =
+          classLoaderManager.createPipePluginClassLoader(pluginDirPath);
 
-      final Class<?> pluginClass = Class.forName(className, true, 
currentActiveClassLoader);
+      final Class<?> pluginClass = Class.forName(className, true, 
pipePluginClassLoader);
 
       @SuppressWarnings("unused") // ensure that it is a PipePlugin class
       final PipePlugin ignored = (PipePlugin) 
pluginClass.getDeclaredConstructor().newInstance();
 
       pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
-      pipePluginMetaKeeper.addPluginAndClass(pluginName, pluginClass);
+      classLoaderManager.addPluginAndClassLoader(pluginName, 
pipePluginClassLoader);
     } catch (IOException
         | InstantiationException
         | InvocationTargetException
@@ -164,13 +169,6 @@ public class PipeDataNodePluginAgent {
     }
   }
 
-  private void updateAllRegisteredClasses(PipePluginClassLoader 
activeClassLoader)
-      throws ClassNotFoundException {
-    for (PipePluginMeta information : 
pipePluginMetaKeeper.getAllPipePluginMeta()) {
-      pipePluginMetaKeeper.updatePluginClass(information, activeClassLoader);
-    }
-  }
-
   public void deregister(String pluginName, boolean needToDeleteJar) throws 
PipeException {
     lock.lock();
     try {
@@ -185,11 +183,12 @@ public class PipeDataNodePluginAgent {
 
       // remove anyway
       pipePluginMetaKeeper.removePipePluginMeta(pluginName);
-      pipePluginMetaKeeper.removePluginClass(pluginName);
+      
PipePluginClassLoaderManager.getInstance().removePluginClassLoader(pluginName);
 
       // if it is needed to delete jar file of the pipe plugin, delete both 
jar file and md5
       if (information != null && needToDeleteJar) {
-        
PipePluginExecutableManager.getInstance().removeFileUnderLibRoot(information.getJarName());
+        PipePluginExecutableManager.getInstance()
+            .removePluginFileUnderLibRoot(information.getPluginName(), 
information.getJarName());
         PipePluginExecutableManager.getInstance()
             .removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt");
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 7138369639a..4e92f94bf51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -114,7 +114,8 @@ class PipeAgentLauncher {
       }
       // If jar does not exist, add current pipePluginMeta to list
       if (!PipePluginExecutableManager.getInstance()
-          .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
+          .hasPluginFileUnderInstallDir(
+              pipePluginMeta.getPluginName(), pipePluginMeta.getJarName())) {
         pipePluginMetaList.add(pipePluginMeta);
       } else {
         try {
@@ -144,7 +145,10 @@ class PipeAgentLauncher {
       final List<ByteBuffer> jarList = resp.getJarList();
       for (int i = 0; i < pipePluginMetaList.size(); i++) {
         PipePluginExecutableManager.getInstance()
-            .saveToInstallDir(jarList.get(i), 
pipePluginMetaList.get(i).getJarName());
+            .savePluginToInstallDir(
+                jarList.get(i),
+                pipePluginMetaList.get(i).getPluginName(),
+                pipePluginMetaList.get(i).getJarName());
       }
     } catch (IOException | TException | ClientManagerException e) {
       throw new StartupException(e);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
index bef7f6bdee4..8af0dff5511 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
+import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -40,13 +41,25 @@ import java.nio.file.Paths;
 import java.util.HashMap;
 
 public class PipeDataNodePluginAgentTest {
-  private static final String TMP_DIR = "PipePluginAgentTest_libroot";
+  private static final String TMP_LIB_ROOT_DIR = "PipePluginAgentTest_libroot";
+  private static final String TMP_TEMP_LIB_ROOT_DIR = 
"PipePluginAgentTest_temporarylibroot";
+  private static final PipePluginMeta PIPE_PLUGIN_META =
+      new PipePluginMeta(
+          "PLUGIN-NAME",
+          
"org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor",
+          false,
+          "IoTDBDataRegionExtractor.jar",
+          "md5");
 
   @Before
   public void before() {
     try {
-      Files.createDirectory(Paths.get(TMP_DIR));
-      PipePluginClassLoaderManager.setupAndGetInstance(TMP_DIR);
+      PipePluginExecutableManager.setupAndGetInstance(TMP_LIB_ROOT_DIR, 
TMP_TEMP_LIB_ROOT_DIR);
+      PipePluginClassLoaderManager.setupAndGetInstance(TMP_LIB_ROOT_DIR);
+      String pluginPath =
+          PipePluginExecutableManager.getInstance()
+              .getPluginsDirPath(PIPE_PLUGIN_META.getPluginName());
+      Files.createDirectories(Paths.get(pluginPath));
     } catch (IOException e) {
       Assert.fail();
     }
@@ -55,7 +68,13 @@ public class PipeDataNodePluginAgentTest {
   @After
   public void after() {
     try {
-      Files.deleteIfExists(Paths.get(TMP_DIR));
+      String pluginPath =
+          PipePluginExecutableManager.getInstance()
+              .getPluginsDirPath(PIPE_PLUGIN_META.getPluginName());
+      Files.deleteIfExists(Paths.get(pluginPath));
+      
Files.deleteIfExists(Paths.get(PipePluginExecutableManager.getInstance().getInstallDir()));
+      Files.deleteIfExists(Paths.get(TMP_TEMP_LIB_ROOT_DIR));
+      Files.deleteIfExists(Paths.get(TMP_LIB_ROOT_DIR));
     } catch (IOException e) {
       Assert.fail();
     }
@@ -64,16 +83,10 @@ public class PipeDataNodePluginAgentTest {
   @Test
   public void testPipePluginAgent() {
     PipeDataNodePluginAgent agent = new PipeDataNodePluginAgent();
+
     try {
-      agent.register(
-          new PipePluginMeta(
-              "plugin-name",
-              
"org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor",
-              false,
-              "jar",
-              "md5"),
-          null);
-      agent.deregister("plugin-name", false);
+      agent.register(PIPE_PLUGIN_META, null);
+      agent.deregister(PIPE_PLUGIN_META.getPluginName(), true);
     } catch (Exception e) {
       Assert.fail();
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 8dc39bd37f0..435ee489234 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -232,6 +232,9 @@ public class ExecutableManager {
     try {
       Path path = Paths.get(destination);
       if (!Files.exists(path)) {
+        if (!Files.exists(path.getParent())) {
+          Files.createDirectories(path.getParent());
+        }
         Files.createFile(path);
       }
       // FileOutPutStream is not in append mode by default, so the file will 
be overridden if it
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginConstructor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginConstructor.java
index 80cde27b655..e5253ad705b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginConstructor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginConstructor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin;
 
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper;
+import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -82,12 +83,19 @@ public abstract class PipePluginConstructor {
     }
 
     try {
-      return (PipePlugin)
-          
pluginMetaKeeper.getPluginClass(pluginName).getDeclaredConstructor().newInstance();
+      final Class<?> pluginClass =
+          information.isBuiltin()
+              ? 
pluginMetaKeeper.getBuiltinPluginClass(information.getPluginName())
+              : Class.forName(
+                  information.getClassName(),
+                  true,
+                  
PipePluginClassLoaderManager.getInstance().getPluginClassLoader(pluginName));
+      return (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
     } catch (InstantiationException
         | InvocationTargetException
         | NoSuchMethodException
-        | IllegalAccessException e) {
+        | IllegalAccessException
+        | ClassNotFoundException e) {
       String errorMessage =
           String.format(
               "Failed to reflect PipePlugin %s(%s) instance, because %s",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
index 63c6995b151..1354fe6d243 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.commons.pipe.plugin.meta;
 
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
-import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
 
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
@@ -34,21 +33,20 @@ import java.util.concurrent.ConcurrentHashMap;
 public abstract class PipePluginMetaKeeper {
 
   protected final Map<String, PipePluginMeta> pipePluginNameToMetaMap = new 
ConcurrentHashMap<>();
-  protected final Map<String, Class<?>> pipePluginNameToClassMap;
+  protected final Map<String, Class<?>> builtinPipePluginNameToClassMap;
 
   public PipePluginMetaKeeper() {
-    pipePluginNameToClassMap = new ConcurrentHashMap<>();
-
-    loadBuiltInPlugins();
+    builtinPipePluginNameToClassMap = new ConcurrentHashMap<>();
+    loadBuiltinPlugins();
   }
 
-  protected void loadBuiltInPlugins() {
+  protected void loadBuiltinPlugins() {
     for (final BuiltinPipePlugin builtinPipePlugin : 
BuiltinPipePlugin.values()) {
       addPipePluginMeta(
           builtinPipePlugin.getPipePluginName(),
           new PipePluginMeta(
               builtinPipePlugin.getPipePluginName(), 
builtinPipePlugin.getClassName()));
-      addPluginAndClass(
+      addBuiltinPluginClass(
           builtinPipePlugin.getPipePluginName(), 
builtinPipePlugin.getPipePluginClass());
     }
   }
@@ -73,22 +71,21 @@ public abstract class PipePluginMetaKeeper {
     return pipePluginNameToMetaMap.containsKey(pluginName.toUpperCase());
   }
 
-  public void addPluginAndClass(String pluginName, Class<?> clazz) {
-    pipePluginNameToClassMap.put(pluginName.toUpperCase(), clazz);
-  }
-
-  public Class<?> getPluginClass(String pluginName) {
-    return pipePluginNameToClassMap.get(pluginName.toUpperCase());
+  private void addBuiltinPluginClass(String pluginName, Class<?> 
builtinPipePluginClass) {
+    builtinPipePluginNameToClassMap.put(pluginName.toUpperCase(), 
builtinPipePluginClass);
   }
 
-  public void removePluginClass(String pluginName) {
-    pipePluginNameToClassMap.remove(pluginName.toUpperCase());
+  public Class<?> getBuiltinPluginClass(String pluginName) {
+    return builtinPipePluginNameToClassMap.get(pluginName.toUpperCase());
   }
 
-  public void updatePluginClass(PipePluginMeta pipePluginMeta, 
PipePluginClassLoader classLoader)
-      throws ClassNotFoundException {
-    final Class<?> functionClass = 
Class.forName(pipePluginMeta.getClassName(), true, classLoader);
-    pipePluginNameToClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), 
functionClass);
+  public String getPluginNameByJarName(String jarName) {
+    for (Map.Entry<String, PipePluginMeta> entry : 
pipePluginNameToMetaMap.entrySet()) {
+      if (entry.getValue().getJarName().equals(jarName)) {
+        return entry.getKey();
+      }
+    }
+    return null;
   }
 
   protected void processTakeSnapshot(OutputStream outputStream) throws 
IOException {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
index 98ccdd8eb92..ccc10e624ca 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
@@ -23,10 +23,13 @@ import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.pipe.api.PipePlugin;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @NotThreadSafe
 public class PipePluginClassLoaderManager implements IService {
@@ -34,29 +37,39 @@ public class PipePluginClassLoaderManager implements 
IService {
   private final String libRoot;
 
   /**
-   * activeClassLoader is used to load all classes under libRoot. libRoot may 
be updated before the
-   * user executes CREATE PIPEPLUGIN or after the user executes DROP 
PIPEPLUGIN. Therefore, we need
-   * to continuously maintain the activeClassLoader so that the classes it 
loads are always
-   * up-to-date.
+   * Each {@link PipePlugin} is equipped with a dedicated {@link 
PipePluginClassLoader}. When a
+   * {@link PipePlugin} is created, the corresponding {@link 
PipePluginClassLoader} is generated and
+   * used to load the {@link PipePlugin}. When the {@link PipePlugin} is 
deleted, its associated
+   * {@link PipePluginClassLoader} is also removed. The lifecycle of the {@link
+   * PipePluginClassLoader} is strictly consistent with the lifecycle of the 
{@link PipePlugin} it
+   * serves.
    */
-  private volatile PipePluginClassLoader activeClassLoader;
+  private final Map<String, PipePluginClassLoader> 
pipePluginNameToClassLoaderMap;
 
   private PipePluginClassLoaderManager(String libRoot) throws IOException {
     this.libRoot = libRoot;
-    activeClassLoader = new PipePluginClassLoader(libRoot);
+    pipePluginNameToClassLoaderMap = new ConcurrentHashMap<>();
   }
 
-  public PipePluginClassLoader updateAndGetActiveClassLoader() throws 
IOException {
-    PipePluginClassLoader deprecatedClassLoader = activeClassLoader;
-    activeClassLoader = new PipePluginClassLoader(libRoot);
-    if (deprecatedClassLoader != null) {
-      deprecatedClassLoader.markAsDeprecated();
+  public void removePluginClassLoader(String pluginName) throws IOException {
+    PipePluginClassLoader classLoader =
+        pipePluginNameToClassLoaderMap.remove(pluginName.toUpperCase());
+    if (classLoader != null) {
+      classLoader.markAsDeprecated();
     }
-    return activeClassLoader;
   }
 
-  public PipePluginClassLoader getActiveClassLoader() {
-    return activeClassLoader;
+  public PipePluginClassLoader getPluginClassLoader(String pluginName) {
+    return pipePluginNameToClassLoaderMap.get(pluginName.toUpperCase());
+  }
+
+  public void addPluginAndClassLoader(String pluginName, PipePluginClassLoader 
classLoader) {
+    pipePluginNameToClassLoaderMap.put(pluginName.toUpperCase(), classLoader);
+  }
+
+  public PipePluginClassLoader createPipePluginClassLoader(String 
pluginDirPath)
+      throws IOException {
+    return new PipePluginClassLoader(pluginDirPath);
   }
 
   
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -67,7 +80,6 @@ public class PipePluginClassLoaderManager implements IService 
{
   public void start() throws StartupException {
     try {
       SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
-      activeClassLoader = new PipePluginClassLoader(libRoot);
     } catch (IOException e) {
       throw new StartupException(this.getID().getName(), e.getMessage());
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
index 68756e2a0de..8d403369eb3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
@@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 
 public class PipePluginExecutableManager extends ExecutableManager {
@@ -45,7 +47,7 @@ public class PipePluginExecutableManager extends 
ExecutableManager {
     final String pluginName = pipePluginMeta.getPluginName();
     final String md5FilePath = pluginName + ".txt";
 
-    if (hasFileUnderInstallDir(md5FilePath)) {
+    if (hasFileUnderTemporaryRoot(md5FilePath)) {
       try {
         return 
readTextFromFileUnderTemporaryRoot(md5FilePath).equals(pipePluginMeta.getJarMD5());
       } catch (IOException e) {
@@ -58,7 +60,7 @@ public class PipePluginExecutableManager extends 
ExecutableManager {
       final String md5 =
           DigestUtils.md5Hex(
               Files.newInputStream(
-                  Paths.get(getInstallDir() + File.separator + 
pipePluginMeta.getJarName())));
+                  Paths.get(getPluginInstallPath(pluginName, 
pipePluginMeta.getJarName()))));
       // Save the md5 in a txt under trigger temporary lib
       saveTextAsFileUnderTemporaryRoot(md5, md5FilePath);
       return md5.equals(pipePluginMeta.getJarMD5());
@@ -91,4 +93,41 @@ public class PipePluginExecutableManager extends 
ExecutableManager {
   public static PipePluginExecutableManager getInstance() {
     return instance;
   }
+
+  public boolean hasPluginFileUnderInstallDir(String pluginName, String 
fileName) {
+    return Files.exists(Paths.get(getPluginInstallPath(pluginName, fileName)));
+  }
+
+  public String getPluginsDirPath(String pluginName) {
+    return this.libRoot + File.separator + INSTALL_DIR + File.separator + 
pluginName;
+  }
+
+  public void removePluginFileUnderLibRoot(String pluginName, String fileName) 
throws IOException {
+    String pluginPath = getPluginInstallPath(pluginName, fileName);
+    Path path = Paths.get(pluginPath);
+    Files.deleteIfExists(path);
+    Files.deleteIfExists(path.getParent());
+  }
+
+  public String getPluginInstallPath(String pluginName, String fileName) {
+    return this.libRoot
+        + File.separator
+        + INSTALL_DIR
+        + File.separator
+        + pluginName
+        + File.separator
+        + fileName;
+  }
+
+  /**
+   * @param byteBuffer file
+   * @param pluginName
+   * @param fileName Absolute Path will be libRoot + File_Separator + 
INSTALL_DIR + File_Separator +
+   *     pluginName + File_Separator + fileName
+   */
+  public void savePluginToInstallDir(ByteBuffer byteBuffer, String pluginName, 
String fileName)
+      throws IOException {
+    String destination = getPluginInstallPath(pluginName, fileName);
+    saveToDir(byteBuffer, destination);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaTest.java
index f162e627f12..9ce3305751d 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaTest.java
@@ -54,6 +54,6 @@ public class PipePluginMetaTest {
     DataNodePipePluginMetaKeeper keeper = new DataNodePipePluginMetaKeeper();
     Assert.assertEquals(
         BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginClass(),
-        
keeper.getPluginClass(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()));
+        
keeper.getBuiltinPluginClass(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()));
   }
 }


Reply via email to