This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 609bc289c09 Pipe: Fixed the hardlink bug of plugin meta (#16937)
609bc289c09 is described below

commit 609bc289c09a2e1314d65b927c38ee23e437646a
Author: Caideyipi <[email protected]>
AuthorDate: Tue Dec 23 16:58:20 2025 +0800

    Pipe: Fixed the hardlink bug of plugin meta (#16937)
---
 .../persistence/pipe/PipePluginInfo.java           | 33 ++++++++++++++++------
 .../iotdb/confignode/persistence/PipeInfoTest.java | 16 +++++++++--
 2 files changed, 37 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 27cc3cc4cbf..51007f10236 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -215,16 +215,8 @@ public class PipePluginInfo implements SnapshotProcessor {
       final String className = pipePluginMeta.getClassName();
       final String jarName = pipePluginMeta.getJarName();
 
-      // try to drop the old pipe plugin if exists to reduce the effect of the 
inconsistency
-      dropPipePlugin(new DropPipePluginPlan(pluginName));
-
-      pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
-      pipePluginMetaKeeper.addJarNameAndMd5(jarName, 
pipePluginMeta.getJarMD5());
-
       if (createPipePluginPlan.getJarFile() != null) {
-        pipePluginExecutableManager.savePluginToInstallDir(
-            ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), 
pluginName, jarName);
-        computeFromPluginClass(pluginName, className);
+        savePipePluginWithRollback(createPipePluginPlan);
       } else {
         final String existed = 
pipePluginMetaKeeper.getPluginNameByJarName(jarName);
         if (Objects.nonNull(existed)) {
@@ -238,6 +230,12 @@ public class PipePluginInfo implements SnapshotProcessor {
         }
       }
 
+      // try to drop the old pipe plugin if exists to reduce the effect of the 
inconsistency
+      dropPipePlugin(new DropPipePluginPlan(pluginName));
+
+      pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
+      pipePluginMetaKeeper.addJarNameAndMd5(jarName, 
pipePluginMeta.getJarMD5());
+
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (final Exception e) {
       final String errorMessage =
@@ -250,6 +248,23 @@ public class PipePluginInfo implements SnapshotProcessor {
     }
   }
 
+  private void savePipePluginWithRollback(final CreatePipePluginPlan 
createPipePluginPlan)
+      throws Exception {
+    final PipePluginMeta pipePluginMeta = 
createPipePluginPlan.getPipePluginMeta();
+    final String pluginName = pipePluginMeta.getPluginName();
+    final String className = pipePluginMeta.getClassName();
+    final String jarName = pipePluginMeta.getJarName();
+    try {
+      pipePluginExecutableManager.savePluginToInstallDir(
+          ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), 
pluginName, jarName);
+      computeFromPluginClass(pluginName, className);
+    } catch (final Exception e) {
+      // We need to rollback if the creation has failed
+      pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName, 
jarName);
+      throw e;
+    }
+  }
+
   private void computeFromPluginClass(final String pluginName, final String 
className)
       throws Exception {
     final String pluginDirPath = 
pipePluginExecutableManager.getPluginsDirPath(pluginName);
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index a0a3ae222f7..0d50abfbead 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -31,6 +31,8 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 import org.apache.tsfile.common.conf.TSFileConfig;
@@ -149,10 +151,18 @@ public class PipeInfoTest {
         new CreatePipePluginPlan(
             new PipePluginMeta(pluginName, "org.apache.iotdb.TestJar", false, 
"test.jar", "???"),
             new Binary("123", TSFileConfig.STRING_CHARSET));
-    pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan);
 
-    // Drop pipe plugin test plugin
-    pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName, 
false);
+    // Shall fail due to validation
+    Assert.assertEquals(
+        TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
+        
pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan).getCode());
+
+    // Drop pipe plugin test plugin, validation failure
+    Assert.assertThrows(
+        PipeException.class,
+        () -> 
pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName, 
false));
+
+    // Idempotent
     DropPipePluginPlan dropPipePluginPlan = new DropPipePluginPlan(pluginName);
     pipeInfo.getPipePluginInfo().dropPipePlugin(dropPipePluginPlan);
   }

Reply via email to