This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a0dbf9b18e0 Pipe: Fixed the bug that reused plugins may not get loader 
and visibility (#16877)
a0dbf9b18e0 is described below

commit a0dbf9b18e0b8f1042d613c672eab91e3fc5ef2e
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 8 14:41:33 2025 +0800

    Pipe: Fixed the bug that reused plugins may not get loader and visibility 
(#16877)
---
 .../persistence/pipe/PipePluginInfo.java           | 78 +++++++++++++---------
 .../service/PipePluginExecutableManager.java       |  9 +++
 2 files changed, 55 insertions(+), 32 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 280a891b598..27cc3cc4cbf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -161,19 +161,19 @@ public class PipePluginInfo implements SnapshotProcessor {
   }
 
   public void checkPipePluginExistence(
-      final Map<String, String> extractorAttributes,
+      final Map<String, String> sourceAttributes,
       final Map<String, String> processorAttributes,
-      final Map<String, String> connectorAttributes) {
-    final PipeParameters extractorParameters = new 
PipeParameters(extractorAttributes);
-    final String extractorPluginName =
-        extractorParameters.getStringOrDefault(
+      final Map<String, String> sinkAttributes) {
+    final PipeParameters sourceParameters = new 
PipeParameters(sourceAttributes);
+    final String sourcePluginName =
+        sourceParameters.getStringOrDefault(
             Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, 
PipeSourceConstant.SOURCE_KEY),
             IOTDB_EXTRACTOR.getPipePluginName());
-    if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) {
+    if (!pipePluginMetaKeeper.containsPipePlugin(sourcePluginName)) {
       final String exceptionMessage =
           String.format(
               "Failed to create or alter pipe, the pipe extractor plugin %s 
does not exist",
-              extractorPluginName);
+              sourcePluginName);
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
@@ -191,16 +191,16 @@ public class PipePluginInfo implements SnapshotProcessor {
       throw new PipeException(exceptionMessage);
     }
 
-    final PipeParameters connectorParameters = new 
PipeParameters(connectorAttributes);
-    final String connectorPluginName =
-        connectorParameters.getStringOrDefault(
+    final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
+    final String sinkPluginName =
+        sinkParameters.getStringOrDefault(
             Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
             IOTDB_THRIFT_CONNECTOR.getPipePluginName());
-    if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
+    if (!pipePluginMetaKeeper.containsPipePlugin(sinkPluginName)) {
       final String exceptionMessage =
           String.format(
               "Failed to create or alter pipe, the pipe connector plugin %s 
does not exist",
-              connectorPluginName);
+              sinkPluginName);
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
@@ -212,34 +212,29 @@ public class PipePluginInfo implements SnapshotProcessor {
     try {
       final PipePluginMeta pipePluginMeta = 
createPipePluginPlan.getPipePluginMeta();
       final String pluginName = pipePluginMeta.getPluginName();
+      final String className = pipePluginMeta.getClassName();
+      final String jarName = pipePluginMeta.getJarName();
 
       // try to drop the old pipe plugin if exists to reduce the effect of the 
inconsistency
       dropPipePlugin(new DropPipePluginPlan(pluginName));
 
       pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
-      pipePluginMetaKeeper.addJarNameAndMd5(
-          pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
+      pipePluginMetaKeeper.addJarNameAndMd5(jarName, 
pipePluginMeta.getJarMD5());
 
       if (createPipePluginPlan.getJarFile() != null) {
         pipePluginExecutableManager.savePluginToInstallDir(
-            ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
-            pluginName,
-            pipePluginMeta.getJarName());
-        final String pluginDirPath = 
pipePluginExecutableManager.getPluginsDirPath(pluginName);
-        final PipePluginClassLoader pipePluginClassLoader =
-            classLoaderManager.createPipePluginClassLoader(pluginDirPath);
-        try {
-          final Class<?> pluginClass =
-              Class.forName(pipePluginMeta.getClassName(), true, 
pipePluginClassLoader);
-          pipePluginMetaKeeper.addPipePluginVisibility(
-              pluginName, 
VisibilityUtils.calculateFromPluginClass(pluginClass));
-          classLoaderManager.addPluginAndClassLoader(pluginName, 
pipePluginClassLoader);
-        } catch (final Exception e) {
-          try {
-            pipePluginClassLoader.close();
-          } catch (final Exception ignored) {
-          }
-          throw e;
+            ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), 
pluginName, jarName);
+        computeFromPluginClass(pluginName, className);
+      } else {
+        final String existed = 
pipePluginMetaKeeper.getPluginNameByJarName(jarName);
+        if (Objects.nonNull(existed)) {
+          pipePluginExecutableManager.linkExistedPlugin(existed, pluginName, 
jarName);
+          computeFromPluginClass(pluginName, className);
+        } else {
+          throw new PipeException(
+              String.format(
+                  "The %s's creation has not passed in jarName, which does not 
exist in other pipePlugins. Please check",
+                  pluginName));
         }
       }
 
@@ -255,6 +250,25 @@ public class PipePluginInfo implements SnapshotProcessor {
     }
   }
 
+  private void computeFromPluginClass(final String pluginName, final String 
className)
+      throws Exception {
+    final String pluginDirPath = 
pipePluginExecutableManager.getPluginsDirPath(pluginName);
+    final PipePluginClassLoader pipePluginClassLoader =
+        classLoaderManager.createPipePluginClassLoader(pluginDirPath);
+    try {
+      final Class<?> pluginClass = Class.forName(className, true, 
pipePluginClassLoader);
+      pipePluginMetaKeeper.addPipePluginVisibility(
+          pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
+      classLoaderManager.addPluginAndClassLoader(pluginName, 
pipePluginClassLoader);
+    } catch (final Exception e) {
+      try {
+        pipePluginClassLoader.close();
+      } catch (final Exception ignored) {
+      }
+      throw e;
+    }
+  }
+
   public TSStatus dropPipePlugin(final DropPipePluginPlan dropPipePluginPlan) {
     final String pluginName = dropPipePluginPlan.getPluginName();
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java
index 9d28ea0130e..276af31bb1a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin.service;
 import org.apache.iotdb.commons.executable.ExecutableManager;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.external.commons.codec.digest.DigestUtils;
@@ -123,6 +124,14 @@ public class PipePluginExecutableManager extends 
ExecutableManager {
     return this.libRoot + File.separator + INSTALL_DIR + File.separator + 
fileName;
   }
 
+  public void linkExistedPlugin(
+      final String oldPluginName, final String newPluginName, final String 
fileName)
+      throws IOException {
+    FileUtils.createHardLink(
+        new File(getPluginsDirPath(oldPluginName), fileName),
+        new File(getPluginsDirPath(newPluginName), fileName));
+  }
+
   /**
    * @param byteBuffer file
    * @param pluginName

Reply via email to