This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 00807e79f8cabaa4931cefa0e600821552131001 Author: Caideyipi <[email protected]> AuthorDate: Mon Dec 8 14:41:33 2025 +0800 Pipe: Fixed the bug that reused plugins may not get loader and visibility (#16877) (cherry picked from commit a0dbf9b18e0b8f1042d613c672eab91e3fc5ef2e) --- .../persistence/pipe/PipePluginInfo.java | 78 +++++++++++++--------- .../service/PipePluginExecutableManager.java | 9 +++ 2 files changed, 55 insertions(+), 32 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 280a891b598..27cc3cc4cbf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -161,19 +161,19 @@ public class PipePluginInfo implements SnapshotProcessor { } public void checkPipePluginExistence( - final Map<String, String> extractorAttributes, + final Map<String, String> sourceAttributes, final Map<String, String> processorAttributes, - final Map<String, String> connectorAttributes) { - final PipeParameters extractorParameters = new PipeParameters(extractorAttributes); - final String extractorPluginName = - extractorParameters.getStringOrDefault( + final Map<String, String> sinkAttributes) { + final PipeParameters sourceParameters = new PipeParameters(sourceAttributes); + final String sourcePluginName = + sourceParameters.getStringOrDefault( Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, PipeSourceConstant.SOURCE_KEY), IOTDB_EXTRACTOR.getPipePluginName()); - if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) { + if (!pipePluginMetaKeeper.containsPipePlugin(sourcePluginName)) { final String exceptionMessage = String.format( "Failed to create or alter pipe, the pipe extractor plugin %s does not exist", - extractorPluginName); + sourcePluginName); LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } @@ -191,16 +191,16 @@ public class PipePluginInfo implements SnapshotProcessor { throw new PipeException(exceptionMessage); } - final PipeParameters connectorParameters = new PipeParameters(connectorAttributes); - final String connectorPluginName = - connectorParameters.getStringOrDefault( + final PipeParameters sinkParameters = new PipeParameters(sinkAttributes); + final String sinkPluginName = + sinkParameters.getStringOrDefault( Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY), IOTDB_THRIFT_CONNECTOR.getPipePluginName()); - if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) { + if (!pipePluginMetaKeeper.containsPipePlugin(sinkPluginName)) { final String exceptionMessage = String.format( "Failed to create or alter pipe, the pipe connector plugin %s does not exist", - connectorPluginName); + sinkPluginName); LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } @@ -212,34 +212,29 @@ public class PipePluginInfo implements SnapshotProcessor { try { final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta(); final String pluginName = pipePluginMeta.getPluginName(); + final String className = pipePluginMeta.getClassName(); + final String jarName = pipePluginMeta.getJarName(); // try to drop the old pipe plugin if exists to reduce the effect of the inconsistency dropPipePlugin(new DropPipePluginPlan(pluginName)); pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta); - pipePluginMetaKeeper.addJarNameAndMd5( - pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5()); + pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5()); if (createPipePluginPlan.getJarFile() != null) { pipePluginExecutableManager.savePluginToInstallDir( - ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), - pluginName, - pipePluginMeta.getJarName()); - final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName); - final PipePluginClassLoader pipePluginClassLoader = - classLoaderManager.createPipePluginClassLoader(pluginDirPath); - try { - final Class<?> pluginClass = - Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader); - pipePluginMetaKeeper.addPipePluginVisibility( - pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); - classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); - } catch (final Exception e) { - try { - pipePluginClassLoader.close(); - } catch (final Exception ignored) { - } - throw e; + ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName); + computeFromPluginClass(pluginName, className); + } else { + final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName); + if (Objects.nonNull(existed)) { + pipePluginExecutableManager.linkExistedPlugin(existed, pluginName, jarName); + computeFromPluginClass(pluginName, className); + } else { + throw new PipeException( + String.format( + "The %s's creation has not passed in jarName, which does not exist in other pipePlugins. Please check", + pluginName)); } } @@ -255,6 +250,25 @@ public class PipePluginInfo implements SnapshotProcessor { } } + private void computeFromPluginClass(final String pluginName, final String className) + throws Exception { + final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName); + final PipePluginClassLoader pipePluginClassLoader = + classLoaderManager.createPipePluginClassLoader(pluginDirPath); + try { + final Class<?> pluginClass = Class.forName(className, true, pipePluginClassLoader); + pipePluginMetaKeeper.addPipePluginVisibility( + pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); + classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); + } catch (final Exception e) { + try { + pipePluginClassLoader.close(); + } catch (final Exception ignored) { + } + throw e; + } + } + public TSStatus dropPipePlugin(final DropPipePluginPlan dropPipePluginPlan) { final String pluginName = dropPipePluginPlan.getPluginName(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java index 9d28ea0130e..276af31bb1a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginExecutableManager.java @@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin.service; import org.apache.iotdb.commons.executable.ExecutableManager; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.external.commons.codec.digest.DigestUtils; @@ -123,6 +124,14 @@ public class PipePluginExecutableManager extends ExecutableManager { return this.libRoot + File.separator + INSTALL_DIR + File.separator + fileName; } + public void linkExistedPlugin( + final String oldPluginName, final String newPluginName, final String fileName) + throws IOException { + FileUtils.createHardLink( + new File(getPluginsDirPath(oldPluginName), fileName), + new File(getPluginsDirPath(newPluginName), fileName)); + } + /** * @param byteBuffer file * @param pluginName
