This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7d94bba6ad6 PipePlugin: Enhance PipePlugin Jar Directory Compatibility
and Update Format During CN and DN Upgrade (#13006)
7d94bba6ad6 is described below
commit 7d94bba6ad6af9398a91b3b0ad4f279165396c35
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Jul 24 10:46:34 2024 +0800
PipePlugin: Enhance PipePlugin Jar Directory Compatibility and Update
Format During CN and DN Upgrade (#13006)
---
.../persistence/pipe/PipePluginInfo.java | 27 ++++++++++++++++++----
.../pipe/plugin/meta/PipePluginMetaKeeper.java | 2 +-
.../service/PipePluginExecutableManager.java | 18 +++++++++------
3 files changed, 35 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index c25d92c9b17..1e4a81cfe44 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -48,6 +48,8 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -231,15 +233,32 @@ public class PipePluginInfo implements SnapshotProcessor {
public JarResp getPipePluginJar(final GetPipePluginJarPlan
getPipePluginJarPlan) {
try {
final List<ByteBuffer> jarList = new ArrayList<>();
+ final PipePluginExecutableManager manager =
PipePluginExecutableManager.getInstance();
+
for (final String jarName : getPipePluginJarPlan.getJarNames()) {
String pluginName =
pipePluginMetaKeeper.getPluginNameByJarName(jarName);
if (pluginName == null) {
throw new PipeException(String.format("%s does not exist", jarName));
}
- jarList.add(
- ExecutableManager.transferToBytebuffer(
- PipePluginExecutableManager.getInstance()
- .getPluginInstallPath(pluginName, jarName)));
+
+ String jarPath = manager.getPluginInstallPathV2(pluginName, jarName);
+
+ boolean isJarExistedInV2Dir = Files.exists(Paths.get(jarPath));
+ if (!isJarExistedInV2Dir) {
+ jarPath = manager.getPluginInstallPathV1(jarName);
+ }
+
+ if (!Files.exists(Paths.get(jarPath))) {
+ throw new PipeException(String.format("%s does not exist", jarName));
+ }
+
+ ByteBuffer byteBuffer =
ExecutableManager.transferToBytebuffer(jarPath);
+ if (!isJarExistedInV2Dir) {
+ pipePluginExecutableManager.savePluginToInstallDir(
+ byteBuffer.duplicate(), pluginName, jarName);
+ }
+
+ jarList.add(byteBuffer);
}
return new JarResp(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
} catch (final Exception e) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
index 1354fe6d243..b1b515db95a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
@@ -81,7 +81,7 @@ public abstract class PipePluginMetaKeeper {
public String getPluginNameByJarName(String jarName) {
for (Map.Entry<String, PipePluginMeta> entry :
pipePluginNameToMetaMap.entrySet()) {
- if (entry.getValue().getJarName().equals(jarName)) {
+ if (jarName.equals(entry.getValue().getJarName())) {
return entry.getKey();
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
index 8d403369eb3..a1477475c01 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
@@ -60,7 +60,7 @@ public class PipePluginExecutableManager extends
ExecutableManager {
final String md5 =
DigestUtils.md5Hex(
Files.newInputStream(
- Paths.get(getPluginInstallPath(pluginName,
pipePluginMeta.getJarName()))));
+ Paths.get(getPluginInstallPathV2(pluginName,
pipePluginMeta.getJarName()))));
// Save the md5 in a txt under trigger temporary lib
saveTextAsFileUnderTemporaryRoot(md5, md5FilePath);
return md5.equals(pipePluginMeta.getJarMD5());
@@ -95,30 +95,34 @@ public class PipePluginExecutableManager extends
ExecutableManager {
}
public boolean hasPluginFileUnderInstallDir(String pluginName, String
fileName) {
- return Files.exists(Paths.get(getPluginInstallPath(pluginName, fileName)));
+ return Files.exists(Paths.get(getPluginInstallPathV2(pluginName,
fileName)));
}
public String getPluginsDirPath(String pluginName) {
- return this.libRoot + File.separator + INSTALL_DIR + File.separator +
pluginName;
+ return this.libRoot + File.separator + INSTALL_DIR + File.separator +
pluginName.toUpperCase();
}
public void removePluginFileUnderLibRoot(String pluginName, String fileName)
throws IOException {
- String pluginPath = getPluginInstallPath(pluginName, fileName);
+ String pluginPath = getPluginInstallPathV2(pluginName, fileName);
Path path = Paths.get(pluginPath);
Files.deleteIfExists(path);
Files.deleteIfExists(path.getParent());
}
- public String getPluginInstallPath(String pluginName, String fileName) {
+ public String getPluginInstallPathV2(String pluginName, String fileName) {
return this.libRoot
+ File.separator
+ INSTALL_DIR
+ File.separator
- + pluginName
+ + pluginName.toUpperCase()
+ File.separator
+ fileName;
}
+ public String getPluginInstallPathV1(String fileName) {
+ return this.libRoot + File.separator + INSTALL_DIR + File.separator +
fileName;
+ }
+
/**
* @param byteBuffer file
* @param pluginName
@@ -127,7 +131,7 @@ public class PipePluginExecutableManager extends
ExecutableManager {
*/
public void savePluginToInstallDir(ByteBuffer byteBuffer, String pluginName,
String fileName)
throws IOException {
- String destination = getPluginInstallPath(pluginName, fileName);
+ String destination = getPluginInstallPathV2(pluginName, fileName);
saveToDir(byteBuffer, destination);
}
}