This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch plugin-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/plugin-fix by this push:
new dba2a186107 partial
dba2a186107 is described below
commit dba2a186107a2d4695730c77e96c2fe6ad84ac0c
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 8 10:10:54 2025 +0800
partial
---
.../persistence/pipe/PipePluginInfo.java | 73 ++++++++++++++--------
1 file changed, 47 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 280a891b598..e64dc67e011 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -161,19 +161,19 @@ public class PipePluginInfo implements SnapshotProcessor {
}
public void checkPipePluginExistence(
- final Map<String, String> extractorAttributes,
+ final Map<String, String> sourceAttributes,
final Map<String, String> processorAttributes,
- final Map<String, String> connectorAttributes) {
- final PipeParameters extractorParameters = new
PipeParameters(extractorAttributes);
- final String extractorPluginName =
- extractorParameters.getStringOrDefault(
+ final Map<String, String> sinkAttributes) {
+ final PipeParameters sourceParameters = new
PipeParameters(sourceAttributes);
+ final String sourcePluginName =
+ sourceParameters.getStringOrDefault(
Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY,
PipeSourceConstant.SOURCE_KEY),
IOTDB_EXTRACTOR.getPipePluginName());
- if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) {
+ if (!pipePluginMetaKeeper.containsPipePlugin(sourcePluginName)) {
final String exceptionMessage =
String.format(
"Failed to create or alter pipe, the pipe extractor plugin %s
does not exist",
- extractorPluginName);
+ sourcePluginName);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
@@ -191,16 +191,16 @@ public class PipePluginInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
- final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
- final String connectorPluginName =
- connectorParameters.getStringOrDefault(
+ final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
+ final String sinkPluginName =
+ sinkParameters.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
IOTDB_THRIFT_CONNECTOR.getPipePluginName());
- if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
+ if (!pipePluginMetaKeeper.containsPipePlugin(sinkPluginName)) {
final String exceptionMessage =
String.format(
"Failed to create or alter pipe, the pipe connector plugin %s
does not exist",
- connectorPluginName);
+ sinkPluginName);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
@@ -212,6 +212,7 @@ public class PipePluginInfo implements SnapshotProcessor {
try {
final PipePluginMeta pipePluginMeta =
createPipePluginPlan.getPipePluginMeta();
final String pluginName = pipePluginMeta.getPluginName();
+ final String className = pipePluginMeta.getClassName();
// try to drop the old pipe plugin if exists to reduce the effect of the
inconsistency
dropPipePlugin(new DropPipePluginPlan(pluginName));
@@ -225,21 +226,22 @@ public class PipePluginInfo implements SnapshotProcessor {
ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
pluginName,
pipePluginMeta.getJarName());
- final String pluginDirPath =
pipePluginExecutableManager.getPluginsDirPath(pluginName);
- final PipePluginClassLoader pipePluginClassLoader =
- classLoaderManager.createPipePluginClassLoader(pluginDirPath);
- try {
- final Class<?> pluginClass =
- Class.forName(pipePluginMeta.getClassName(), true,
pipePluginClassLoader);
+ computeFromPluginClass(
+ pipePluginExecutableManager.getPluginsDirPath(pluginName),
pluginName, className);
+ } else {
+ final String existed =
+
pipePluginMetaKeeper.getPluginNameByJarName(pipePluginMeta.getJarName());
+ if (Objects.nonNull(existed)
+ && pipePluginMetaKeeper
+ .getPipePluginMeta(existed)
+ .getClassName()
+ .equals(pipePluginMeta.getClassName())
+ &&
pipePluginMetaKeeper.getPipePluginNameToVisibilityMap().containsKey(existed)) {
pipePluginMetaKeeper.addPipePluginVisibility(
- pluginName,
VisibilityUtils.calculateFromPluginClass(pluginClass));
- classLoaderManager.addPluginAndClassLoader(pluginName,
pipePluginClassLoader);
- } catch (final Exception e) {
- try {
- pipePluginClassLoader.close();
- } catch (final Exception ignored) {
- }
- throw e;
+ pluginName,
pipePluginMetaKeeper.getPipePluginNameToVisibilityMap().get(existed));
+ } else {
+ computeFromPluginClass(
+ pipePluginExecutableManager.getPluginsDirPath(existed),
pluginName, className);
}
}
@@ -255,6 +257,25 @@ public class PipePluginInfo implements SnapshotProcessor {
}
}
+ private void computeFromPluginClass(
+ final String pluginDirPath, final String pluginName, final String
className)
+ throws Exception {
+ final PipePluginClassLoader pipePluginClassLoader =
+ classLoaderManager.createPipePluginClassLoader(pluginDirPath);
+ try {
+ final Class<?> pluginClass = Class.forName(className, true,
pipePluginClassLoader);
+ pipePluginMetaKeeper.addPipePluginVisibility(
+ pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
+ classLoaderManager.addPluginAndClassLoader(pluginName,
pipePluginClassLoader);
+ } catch (final Exception e) {
+ try {
+ pipePluginClassLoader.close();
+ } catch (final Exception ignored) {
+ }
+ throw e;
+ }
+ }
+
public TSStatus dropPipePlugin(final DropPipePluginPlan dropPipePluginPlan) {
final String pluginName = dropPipePluginPlan.getPluginName();