This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit db22199acdb865b4615b67eebbc33db3b05e1d97 Author: Zhenyu Luo <[email protected]> AuthorDate: Wed Jul 24 10:46:34 2024 +0800 PipePlugin: Enhance PipePlugin Jar Directory Compatibility and Update Format During CN and DN Upgrade (#13006) (cherry picked from commit 7d94bba6ad6af9398a91b3b0ad4f279165396c35) --- .../persistence/pipe/PipePluginInfo.java | 27 ++++++++++++++++++---- .../pipe/plugin/meta/PipePluginMetaKeeper.java | 2 +- .../service/PipePluginExecutableManager.java | 18 +++++++++------ 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index c25d92c9b17..1e4a81cfe44 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -48,6 +48,8 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -231,15 +233,32 @@ public class PipePluginInfo implements SnapshotProcessor { public JarResp getPipePluginJar(final GetPipePluginJarPlan getPipePluginJarPlan) { try { final List<ByteBuffer> jarList = new ArrayList<>(); + final PipePluginExecutableManager manager = PipePluginExecutableManager.getInstance(); + for (final String jarName : getPipePluginJarPlan.getJarNames()) { String pluginName = pipePluginMetaKeeper.getPluginNameByJarName(jarName); if (pluginName == null) { throw new PipeException(String.format("%s does not exist", jarName)); } - jarList.add( - ExecutableManager.transferToBytebuffer( - PipePluginExecutableManager.getInstance() - .getPluginInstallPath(pluginName, jarName))); + + String jarPath = manager.getPluginInstallPathV2(pluginName, jarName); + + boolean isJarExistedInV2Dir = Files.exists(Paths.get(jarPath)); + if (!isJarExistedInV2Dir) { + jarPath = manager.getPluginInstallPathV1(jarName); + } + + if (!Files.exists(Paths.get(jarPath))) { + throw new PipeException(String.format("%s does not exist", jarName)); + } + + ByteBuffer byteBuffer = ExecutableManager.transferToBytebuffer(jarPath); + if (!isJarExistedInV2Dir) { + pipePluginExecutableManager.savePluginToInstallDir( + byteBuffer.duplicate(), pluginName, jarName); + } + + jarList.add(byteBuffer); } return new JarResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList); } catch (final Exception e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java index 1354fe6d243..b1b515db95a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java @@ -81,7 +81,7 @@ public abstract class PipePluginMetaKeeper { public String getPluginNameByJarName(String jarName) { for (Map.Entry<String, PipePluginMeta> entry : pipePluginNameToMetaMap.entrySet()) { - if (entry.getValue().getJarName().equals(jarName)) { + if (jarName.equals(entry.getValue().getJarName())) { return entry.getKey(); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java index 8d403369eb3..a1477475c01 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java @@ -60,7 +60,7 @@ public class PipePluginExecutableManager extends ExecutableManager { final String md5 = DigestUtils.md5Hex( Files.newInputStream( - Paths.get(getPluginInstallPath(pluginName, pipePluginMeta.getJarName())))); + Paths.get(getPluginInstallPathV2(pluginName, pipePluginMeta.getJarName())))); // Save the md5 in a txt under trigger temporary lib saveTextAsFileUnderTemporaryRoot(md5, md5FilePath); return md5.equals(pipePluginMeta.getJarMD5()); @@ -95,30 +95,34 @@ public class PipePluginExecutableManager extends ExecutableManager { } public boolean hasPluginFileUnderInstallDir(String pluginName, String fileName) { - return Files.exists(Paths.get(getPluginInstallPath(pluginName, fileName))); + return Files.exists(Paths.get(getPluginInstallPathV2(pluginName, fileName))); } public String getPluginsDirPath(String pluginName) { - return this.libRoot + File.separator + INSTALL_DIR + File.separator + pluginName; + return this.libRoot + File.separator + INSTALL_DIR + File.separator + pluginName.toUpperCase(); } public void removePluginFileUnderLibRoot(String pluginName, String fileName) throws IOException { - String pluginPath = getPluginInstallPath(pluginName, fileName); + String pluginPath = getPluginInstallPathV2(pluginName, fileName); Path path = Paths.get(pluginPath); Files.deleteIfExists(path); Files.deleteIfExists(path.getParent()); } - public String getPluginInstallPath(String pluginName, String fileName) { + public String getPluginInstallPathV2(String pluginName, String fileName) { return this.libRoot + File.separator + INSTALL_DIR + File.separator - + pluginName + + pluginName.toUpperCase() + File.separator + fileName; } + public String getPluginInstallPathV1(String fileName) { + return this.libRoot + File.separator + INSTALL_DIR + File.separator + fileName; + } + /** * @param byteBuffer file * @param pluginName @@ -127,7 +131,7 @@ public class PipePluginExecutableManager extends ExecutableManager { */ public void savePluginToInstallDir(ByteBuffer byteBuffer, String pluginName, String fileName) throws IOException { - String destination = getPluginInstallPath(pluginName, fileName); + String destination = getPluginInstallPathV2(pluginName, fileName); saveToDir(byteBuffer, destination); } }
