This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch plugin-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/plugin-fix by this push:
     new dba2a186107 partial
dba2a186107 is described below

commit dba2a186107a2d4695730c77e96c2fe6ad84ac0c
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 8 10:10:54 2025 +0800

    partial
---
 .../persistence/pipe/PipePluginInfo.java           | 73 ++++++++++++++--------
 1 file changed, 47 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 280a891b598..e64dc67e011 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -161,19 +161,19 @@ public class PipePluginInfo implements SnapshotProcessor {
   }
 
   public void checkPipePluginExistence(
-      final Map<String, String> extractorAttributes,
+      final Map<String, String> sourceAttributes,
       final Map<String, String> processorAttributes,
-      final Map<String, String> connectorAttributes) {
-    final PipeParameters extractorParameters = new 
PipeParameters(extractorAttributes);
-    final String extractorPluginName =
-        extractorParameters.getStringOrDefault(
+      final Map<String, String> sinkAttributes) {
+    final PipeParameters sourceParameters = new 
PipeParameters(sourceAttributes);
+    final String sourcePluginName =
+        sourceParameters.getStringOrDefault(
             Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, 
PipeSourceConstant.SOURCE_KEY),
             IOTDB_EXTRACTOR.getPipePluginName());
-    if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) {
+    if (!pipePluginMetaKeeper.containsPipePlugin(sourcePluginName)) {
       final String exceptionMessage =
           String.format(
               "Failed to create or alter pipe, the pipe extractor plugin %s 
does not exist",
-              extractorPluginName);
+              sourcePluginName);
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
@@ -191,16 +191,16 @@ public class PipePluginInfo implements SnapshotProcessor {
       throw new PipeException(exceptionMessage);
     }
 
-    final PipeParameters connectorParameters = new 
PipeParameters(connectorAttributes);
-    final String connectorPluginName =
-        connectorParameters.getStringOrDefault(
+    final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
+    final String sinkPluginName =
+        sinkParameters.getStringOrDefault(
             Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
             IOTDB_THRIFT_CONNECTOR.getPipePluginName());
-    if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
+    if (!pipePluginMetaKeeper.containsPipePlugin(sinkPluginName)) {
       final String exceptionMessage =
           String.format(
               "Failed to create or alter pipe, the pipe connector plugin %s 
does not exist",
-              connectorPluginName);
+              sinkPluginName);
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
@@ -212,6 +212,7 @@ public class PipePluginInfo implements SnapshotProcessor {
     try {
       final PipePluginMeta pipePluginMeta = 
createPipePluginPlan.getPipePluginMeta();
       final String pluginName = pipePluginMeta.getPluginName();
+      final String className = pipePluginMeta.getClassName();
 
       // try to drop the old pipe plugin if exists to reduce the effect of the 
inconsistency
       dropPipePlugin(new DropPipePluginPlan(pluginName));
@@ -225,21 +226,22 @@ public class PipePluginInfo implements SnapshotProcessor {
             ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
             pluginName,
             pipePluginMeta.getJarName());
-        final String pluginDirPath = 
pipePluginExecutableManager.getPluginsDirPath(pluginName);
-        final PipePluginClassLoader pipePluginClassLoader =
-            classLoaderManager.createPipePluginClassLoader(pluginDirPath);
-        try {
-          final Class<?> pluginClass =
-              Class.forName(pipePluginMeta.getClassName(), true, 
pipePluginClassLoader);
+        computeFromPluginClass(
+            pipePluginExecutableManager.getPluginsDirPath(pluginName), 
pluginName, className);
+      } else {
+        final String existed =
+            
pipePluginMetaKeeper.getPluginNameByJarName(pipePluginMeta.getJarName());
+        if (Objects.nonNull(existed)
+            && pipePluginMetaKeeper
+                .getPipePluginMeta(existed)
+                .getClassName()
+                .equals(pipePluginMeta.getClassName())
+            && 
pipePluginMetaKeeper.getPipePluginNameToVisibilityMap().containsKey(existed)) {
           pipePluginMetaKeeper.addPipePluginVisibility(
-              pluginName, 
VisibilityUtils.calculateFromPluginClass(pluginClass));
-          classLoaderManager.addPluginAndClassLoader(pluginName, 
pipePluginClassLoader);
-        } catch (final Exception e) {
-          try {
-            pipePluginClassLoader.close();
-          } catch (final Exception ignored) {
-          }
-          throw e;
+              pluginName, 
pipePluginMetaKeeper.getPipePluginNameToVisibilityMap().get(existed));
+        } else {
+          computeFromPluginClass(
+              pipePluginExecutableManager.getPluginsDirPath(existed), 
pluginName, className);
         }
       }
 
@@ -255,6 +257,25 @@ public class PipePluginInfo implements SnapshotProcessor {
     }
   }
 
+  private void computeFromPluginClass(
+      final String pluginDirPath, final String pluginName, final String 
className)
+      throws Exception {
+    final PipePluginClassLoader pipePluginClassLoader =
+        classLoaderManager.createPipePluginClassLoader(pluginDirPath);
+    try {
+      final Class<?> pluginClass = Class.forName(className, true, 
pipePluginClassLoader);
+      pipePluginMetaKeeper.addPipePluginVisibility(
+          pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
+      classLoaderManager.addPluginAndClassLoader(pluginName, 
pipePluginClassLoader);
+    } catch (final Exception e) {
+      try {
+        pipePluginClassLoader.close();
+      } catch (final Exception ignored) {
+      }
+      throw e;
+    }
+  }
+
   public TSStatus dropPipePlugin(final DropPipePluginPlan dropPipePluginPlan) {
     final String pluginName = dropPipePluginPlan.getPluginName();
 

Reply via email to