This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-3227
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2a122b2f5261572d29dc50bf9e68fb263fb4fb9e
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue May 24 23:53:11 2022 +0800

    snapshot for UDFExecutableManager
---
 .../iotdb/confignode/persistence/UDFInfo.java      |  6 +-
 .../commons/udf/service/UDFExecutableManager.java  | 78 +++++++++++++++++++---
 2 files changed, 71 insertions(+), 13 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
index 0bfcd42a54..b78a067177 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
@@ -61,7 +61,7 @@ public class UDFInfo implements SnapshotProcessor {
     }
   }
 
-  public TSStatus createFunction(CreateFunctionReq req) {
+  public synchronized TSStatus createFunction(CreateFunctionReq req) {
     final String functionName = req.getFunctionName();
     final String className = req.getClassName();
     final List<String> uris = req.getUris();
@@ -81,13 +81,13 @@ public class UDFInfo implements SnapshotProcessor {
   }
 
   @Override
-  public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+  public synchronized boolean processTakeSnapshot(File snapshotDir) throws 
IOException {
     return udfExecutableManager.processTakeSnapshot(snapshotDir)
         && udfRegistrationService.processTakeSnapshot(snapshotDir);
   }
 
   @Override
-  public void processLoadSnapshot(File snapshotDir) throws IOException {
+  public synchronized void processLoadSnapshot(File snapshotDir) throws 
IOException {
     udfExecutableManager.processLoadSnapshot(snapshotDir);
     udfRegistrationService.processLoadSnapshot(snapshotDir);
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
index 5fe7e18380..ea43a16b86 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -34,18 +36,21 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class UDFExecutableManager implements IService, SnapshotProcessor {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UDFExecutableManager.class);
+
   private final String temporaryLibRoot;
-  private final String extLibRoot;
+  private final String udfLibRoot;
 
   private final AtomicLong requestCounter;
 
-  private UDFExecutableManager(String temporaryLibRoot, String extLibRoot) {
+  private UDFExecutableManager(String temporaryLibRoot, String udfLibRoot) {
     this.temporaryLibRoot = temporaryLibRoot;
-    this.extLibRoot = extLibRoot;
+    this.udfLibRoot = udfLibRoot;
 
     requestCounter = new AtomicLong(0);
   }
@@ -62,7 +67,7 @@ public class UDFExecutableManager implements IService, 
SnapshotProcessor {
         getDirByRequestId(resource.getRequestId()), 
getDirByFunctionName(functionName));
   }
 
-  public void removeFromTemporaryLibRoot(UDFExecutableResource resource) 
throws IOException {
+  public void removeFromTemporaryLibRoot(UDFExecutableResource resource) {
     removeFromTemporaryLibRoot(resource.getRequestId());
   }
 
@@ -117,7 +122,7 @@ public class UDFExecutableManager implements IService, 
SnapshotProcessor {
   }
 
   public String getDirStringByFunctionName(String functionName) {
-    return extLibRoot + File.separator + functionName + File.separator;
+    return udfLibRoot + File.separator + functionName + File.separator;
   }
 
   
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -128,7 +133,7 @@ public class UDFExecutableManager implements IService, 
SnapshotProcessor {
   public void start() throws StartupException {
     try {
       makeDirIfNecessary(temporaryLibRoot);
-      makeDirIfNecessary(extLibRoot);
+      makeDirIfNecessary(udfLibRoot);
     } catch (Exception e) {
       throw new StartupException(e);
     }
@@ -151,9 +156,9 @@ public class UDFExecutableManager implements IService, 
SnapshotProcessor {
   private static UDFExecutableManager INSTANCE = null;
 
   public static synchronized UDFExecutableManager setupAndGetInstance(
-      String temporaryLibRoot, String extLibRoot) {
+      String temporaryLibRoot, String udfLibRoot) {
     if (INSTANCE == null) {
-      INSTANCE = new UDFExecutableManager(temporaryLibRoot, extLibRoot);
+      INSTANCE = new UDFExecutableManager(temporaryLibRoot, udfLibRoot);
     }
     return INSTANCE;
   }
@@ -176,9 +181,62 @@ public class UDFExecutableManager implements IService, 
SnapshotProcessor {
 
   @Override
   public boolean processTakeSnapshot(File snapshotDir) throws IOException {
-    return false;
+    return takeSnapshotForDir(
+            temporaryLibRoot,
+            snapshotDir.getAbsolutePath() + File.separator + "ext" + 
File.separator + "temporary")
+        && takeSnapshotForDir(
+            udfLibRoot,
+            snapshotDir.getAbsolutePath() + File.separator + "ext" + 
File.separator + "udf");
+  }
+
+  private boolean takeSnapshotForDir(String source, String 
snapshotDestination) throws IOException {
+    final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
+    final File sourceFile = systemFileFactory.getFile(source);
+    final File destinationFile = 
systemFileFactory.getFile(snapshotDestination);
+    final File temporaryFile =
+        systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + 
UUID.randomUUID());
+
+    FileUtils.deleteQuietly(temporaryFile);
+    FileUtils.forceMkdir(temporaryFile);
+
+    try {
+      FileUtils.copyDirectory(sourceFile, temporaryFile);
+      FileUtils.deleteQuietly(destinationFile);
+      return temporaryFile.renameTo(destinationFile);
+    } finally {
+      FileUtils.deleteQuietly(temporaryFile);
+    }
   }
 
   @Override
-  public void processLoadSnapshot(File snapshotDir) throws IOException {}
+  public void processLoadSnapshot(File snapshotDir) throws IOException {
+    loadSnapshotForDir(
+        snapshotDir.getAbsolutePath() + File.separator + "ext" + 
File.separator + "temporary",
+        temporaryLibRoot);
+    loadSnapshotForDir(
+        snapshotDir.getAbsolutePath() + File.separator + "ext" + 
File.separator + "udf",
+        udfLibRoot);
+  }
+
+  private void loadSnapshotForDir(String snapshotSource, String destination) 
throws IOException {
+    final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
+    final File sourceFile = systemFileFactory.getFile(snapshotSource);
+    final File destinationFile = systemFileFactory.getFile(destination);
+    final File temporaryFile =
+        systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + 
UUID.randomUUID());
+
+    try {
+      FileUtils.moveDirectory(destinationFile, temporaryFile);
+      FileUtils.forceMkdir(destinationFile);
+      try {
+        FileUtils.copyDirectory(sourceFile, destinationFile);
+      } catch (Exception e) {
+        LOGGER.error("Failed to load udf snapshot and rollback.");
+        FileUtils.deleteQuietly(destinationFile);
+        FileUtils.moveDirectory(temporaryFile, destinationFile);
+      }
+    } finally {
+      FileUtils.deleteQuietly(temporaryFile);
+    }
+  }
 }

Reply via email to