This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a0dbf9b18e0 Pipe: Fixed the bug that reused plugins may not get loader
and visibility (#16877)
a0dbf9b18e0 is described below
commit a0dbf9b18e0b8f1042d613c672eab91e3fc5ef2e
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 8 14:41:33 2025 +0800
Pipe: Fixed the bug that reused plugins may not get loader and visibility
(#16877)
---
.../persistence/pipe/PipePluginInfo.java | 78 +++++++++++++---------
.../service/PipePluginExecutableManager.java | 9 +++
2 files changed, 55 insertions(+), 32 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 280a891b598..27cc3cc4cbf 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -161,19 +161,19 @@ public class PipePluginInfo implements SnapshotProcessor {
}
public void checkPipePluginExistence(
- final Map<String, String> extractorAttributes,
+ final Map<String, String> sourceAttributes,
final Map<String, String> processorAttributes,
- final Map<String, String> connectorAttributes) {
- final PipeParameters extractorParameters = new
PipeParameters(extractorAttributes);
- final String extractorPluginName =
- extractorParameters.getStringOrDefault(
+ final Map<String, String> sinkAttributes) {
+ final PipeParameters sourceParameters = new
PipeParameters(sourceAttributes);
+ final String sourcePluginName =
+ sourceParameters.getStringOrDefault(
Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY,
PipeSourceConstant.SOURCE_KEY),
IOTDB_EXTRACTOR.getPipePluginName());
- if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) {
+ if (!pipePluginMetaKeeper.containsPipePlugin(sourcePluginName)) {
final String exceptionMessage =
String.format(
"Failed to create or alter pipe, the pipe extractor plugin %s
does not exist",
- extractorPluginName);
+ sourcePluginName);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
@@ -191,16 +191,16 @@ public class PipePluginInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
- final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
- final String connectorPluginName =
- connectorParameters.getStringOrDefault(
+ final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
+ final String sinkPluginName =
+ sinkParameters.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
IOTDB_THRIFT_CONNECTOR.getPipePluginName());
- if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
+ if (!pipePluginMetaKeeper.containsPipePlugin(sinkPluginName)) {
final String exceptionMessage =
String.format(
"Failed to create or alter pipe, the pipe connector plugin %s
does not exist",
- connectorPluginName);
+ sinkPluginName);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
@@ -212,34 +212,29 @@ public class PipePluginInfo implements SnapshotProcessor {
try {
final PipePluginMeta pipePluginMeta =
createPipePluginPlan.getPipePluginMeta();
final String pluginName = pipePluginMeta.getPluginName();
+ final String className = pipePluginMeta.getClassName();
+ final String jarName = pipePluginMeta.getJarName();
// try to drop the old pipe plugin if exists to reduce the effect of the
inconsistency
dropPipePlugin(new DropPipePluginPlan(pluginName));
pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
- pipePluginMetaKeeper.addJarNameAndMd5(
- pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
+ pipePluginMetaKeeper.addJarNameAndMd5(jarName,
pipePluginMeta.getJarMD5());
if (createPipePluginPlan.getJarFile() != null) {
pipePluginExecutableManager.savePluginToInstallDir(
- ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
- pluginName,
- pipePluginMeta.getJarName());
- final String pluginDirPath =
pipePluginExecutableManager.getPluginsDirPath(pluginName);
- final PipePluginClassLoader pipePluginClassLoader =
- classLoaderManager.createPipePluginClassLoader(pluginDirPath);
- try {
- final Class<?> pluginClass =
- Class.forName(pipePluginMeta.getClassName(), true,
pipePluginClassLoader);
- pipePluginMetaKeeper.addPipePluginVisibility(
- pluginName,
VisibilityUtils.calculateFromPluginClass(pluginClass));
- classLoaderManager.addPluginAndClassLoader(pluginName,
pipePluginClassLoader);
- } catch (final Exception e) {
- try {
- pipePluginClassLoader.close();
- } catch (final Exception ignored) {
- }
- throw e;
+ ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
pluginName, jarName);
+ computeFromPluginClass(pluginName, className);
+ } else {
+ final String existed =
pipePluginMetaKeeper.getPluginNameByJarName(jarName);
+ if (Objects.nonNull(existed)) {
+ pipePluginExecutableManager.linkExistedPlugin(existed, pluginName,
jarName);
+ computeFromPluginClass(pluginName, className);
+ } else {
+ throw new PipeException(
+ String.format(
+ "The %s's creation has not passed in jarName, which does not
exist in other pipePlugins. Please check",
+ pluginName));
}
}
@@ -255,6 +250,25 @@ public class PipePluginInfo implements SnapshotProcessor {
}
}
+ private void computeFromPluginClass(final String pluginName, final String
className)
+ throws Exception {
+ final String pluginDirPath =
pipePluginExecutableManager.getPluginsDirPath(pluginName);
+ final PipePluginClassLoader pipePluginClassLoader =
+ classLoaderManager.createPipePluginClassLoader(pluginDirPath);
+ try {
+ final Class<?> pluginClass = Class.forName(className, true,
pipePluginClassLoader);
+ pipePluginMetaKeeper.addPipePluginVisibility(
+ pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
+ classLoaderManager.addPluginAndClassLoader(pluginName,
pipePluginClassLoader);
+ } catch (final Exception e) {
+ try {
+ pipePluginClassLoader.close();
+ } catch (final Exception ignored) {
+ }
+ throw e;
+ }
+ }
+
public TSStatus dropPipePlugin(final DropPipePluginPlan dropPipePluginPlan) {
final String pluginName = dropPipePluginPlan.getPluginName();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java
index 9d28ea0130e..276af31bb1a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin.service;
import org.apache.iotdb.commons.executable.ExecutableManager;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.external.commons.codec.digest.DigestUtils;
@@ -123,6 +124,14 @@ public class PipePluginExecutableManager extends
ExecutableManager {
return this.libRoot + File.separator + INSTALL_DIR + File.separator +
fileName;
}
+ public void linkExistedPlugin(
+ final String oldPluginName, final String newPluginName, final String
fileName)
+ throws IOException {
+ FileUtils.createHardLink(
+ new File(getPluginsDirPath(oldPluginName), fileName),
+ new File(getPluginsDirPath(newPluginName), fileName));
+ }
+
/**
* @param byteBuffer file
* @param pluginName