This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 609bc289c09 Pipe: Fixed the hardlink bug of plugin meta (#16937)
609bc289c09 is described below
commit 609bc289c09a2e1314d65b927c38ee23e437646a
Author: Caideyipi <[email protected]>
AuthorDate: Tue Dec 23 16:58:20 2025 +0800
Pipe: Fixed the hardlink bug of plugin meta (#16937)
---
.../persistence/pipe/PipePluginInfo.java | 33 ++++++++++++++++------
.../iotdb/confignode/persistence/PipeInfoTest.java | 16 +++++++++--
2 files changed, 37 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 27cc3cc4cbf..51007f10236 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -215,16 +215,8 @@ public class PipePluginInfo implements SnapshotProcessor {
final String className = pipePluginMeta.getClassName();
final String jarName = pipePluginMeta.getJarName();
- // try to drop the old pipe plugin if exists to reduce the effect of the
inconsistency
- dropPipePlugin(new DropPipePluginPlan(pluginName));
-
- pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
- pipePluginMetaKeeper.addJarNameAndMd5(jarName,
pipePluginMeta.getJarMD5());
-
if (createPipePluginPlan.getJarFile() != null) {
- pipePluginExecutableManager.savePluginToInstallDir(
- ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
pluginName, jarName);
- computeFromPluginClass(pluginName, className);
+ savePipePluginWithRollback(createPipePluginPlan);
} else {
final String existed =
pipePluginMetaKeeper.getPluginNameByJarName(jarName);
if (Objects.nonNull(existed)) {
@@ -238,6 +230,12 @@ public class PipePluginInfo implements SnapshotProcessor {
}
}
+ // try to drop the old pipe plugin if exists to reduce the effect of the
inconsistency
+ dropPipePlugin(new DropPipePluginPlan(pluginName));
+
+ pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
+ pipePluginMetaKeeper.addJarNameAndMd5(jarName,
pipePluginMeta.getJarMD5());
+
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final Exception e) {
final String errorMessage =
@@ -250,6 +248,23 @@ public class PipePluginInfo implements SnapshotProcessor {
}
}
+ private void savePipePluginWithRollback(final CreatePipePluginPlan
createPipePluginPlan)
+ throws Exception {
+ final PipePluginMeta pipePluginMeta =
createPipePluginPlan.getPipePluginMeta();
+ final String pluginName = pipePluginMeta.getPluginName();
+ final String className = pipePluginMeta.getClassName();
+ final String jarName = pipePluginMeta.getJarName();
+ try {
+ pipePluginExecutableManager.savePluginToInstallDir(
+ ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
pluginName, jarName);
+ computeFromPluginClass(pluginName, className);
+ } catch (final Exception e) {
+ // We need to rollback if the creation has failed
+ pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName,
jarName);
+ throw e;
+ }
+ }
+
private void computeFromPluginClass(final String pluginName, final String
className)
throws Exception {
final String pluginDirPath =
pipePluginExecutableManager.getPluginsDirPath(pluginName);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index a0a3ae222f7..0d50abfbead 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -31,6 +31,8 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.tsfile.common.conf.TSFileConfig;
@@ -149,10 +151,18 @@ public class PipeInfoTest {
new CreatePipePluginPlan(
new PipePluginMeta(pluginName, "org.apache.iotdb.TestJar", false,
"test.jar", "???"),
new Binary("123", TSFileConfig.STRING_CHARSET));
- pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan);
- // Drop pipe plugin test plugin
- pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName,
false);
+ // Shall fail due to validation
+ Assert.assertEquals(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
+
pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan).getCode());
+
+ // Drop pipe plugin test plugin, validation failure
+ Assert.assertThrows(
+ PipeException.class,
+ () ->
pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName,
false));
+
+ // Idempotent
DropPipePluginPlan dropPipePluginPlan = new DropPipePluginPlan(pluginName);
pipeInfo.getPipePluginInfo().dropPipePlugin(dropPipePluginPlan);
}