This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-3227 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2a122b2f5261572d29dc50bf9e68fb263fb4fb9e Author: Steve Yurong Su <[email protected]> AuthorDate: Tue May 24 23:53:11 2022 +0800 snapshot for UDFExecutableManager --- .../iotdb/confignode/persistence/UDFInfo.java | 6 +- .../commons/udf/service/UDFExecutableManager.java | 78 +++++++++++++++++++--- 2 files changed, 71 insertions(+), 13 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java index 0bfcd42a54..b78a067177 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java @@ -61,7 +61,7 @@ public class UDFInfo implements SnapshotProcessor { } } - public TSStatus createFunction(CreateFunctionReq req) { + public synchronized TSStatus createFunction(CreateFunctionReq req) { final String functionName = req.getFunctionName(); final String className = req.getClassName(); final List<String> uris = req.getUris(); @@ -81,13 +81,13 @@ public class UDFInfo implements SnapshotProcessor { } @Override - public boolean processTakeSnapshot(File snapshotDir) throws IOException { + public synchronized boolean processTakeSnapshot(File snapshotDir) throws IOException { return udfExecutableManager.processTakeSnapshot(snapshotDir) && udfRegistrationService.processTakeSnapshot(snapshotDir); } @Override - public void processLoadSnapshot(File snapshotDir) throws IOException { + public synchronized void processLoadSnapshot(File snapshotDir) throws IOException { udfExecutableManager.processLoadSnapshot(snapshotDir); udfRegistrationService.processLoadSnapshot(snapshotDir); } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java index 5fe7e18380..ea43a16b86 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java @@ -27,6 +27,8 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -34,18 +36,21 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; public class UDFExecutableManager implements IService, SnapshotProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(UDFExecutableManager.class); + private final String temporaryLibRoot; - private final String extLibRoot; + private final String udfLibRoot; private final AtomicLong requestCounter; - private UDFExecutableManager(String temporaryLibRoot, String extLibRoot) { + private UDFExecutableManager(String temporaryLibRoot, String udfLibRoot) { this.temporaryLibRoot = temporaryLibRoot; - this.extLibRoot = extLibRoot; + this.udfLibRoot = udfLibRoot; requestCounter = new AtomicLong(0); } @@ -62,7 +67,7 @@ public class UDFExecutableManager implements IService, SnapshotProcessor { getDirByRequestId(resource.getRequestId()), getDirByFunctionName(functionName)); } - public void removeFromTemporaryLibRoot(UDFExecutableResource resource) throws IOException { + public void removeFromTemporaryLibRoot(UDFExecutableResource resource) { removeFromTemporaryLibRoot(resource.getRequestId()); } @@ -117,7 +122,7 @@ public class UDFExecutableManager implements IService, SnapshotProcessor { } public String getDirStringByFunctionName(String functionName) { - return extLibRoot + File.separator + functionName + File.separator; + return udfLibRoot + File.separator + functionName + File.separator; } ///////////////////////////////////////////////////////////////////////////////////////////////// @@ -128,7 +133,7 @@ public class UDFExecutableManager implements IService, SnapshotProcessor { public void start() throws StartupException { try { makeDirIfNecessary(temporaryLibRoot); - makeDirIfNecessary(extLibRoot); + makeDirIfNecessary(udfLibRoot); } catch (Exception e) { throw new StartupException(e); } @@ -151,9 +156,9 @@ public class UDFExecutableManager implements IService, SnapshotProcessor { private static UDFExecutableManager INSTANCE = null; public static synchronized UDFExecutableManager setupAndGetInstance( - String temporaryLibRoot, String extLibRoot) { + String temporaryLibRoot, String udfLibRoot) { if (INSTANCE == null) { - INSTANCE = new UDFExecutableManager(temporaryLibRoot, extLibRoot); + INSTANCE = new UDFExecutableManager(temporaryLibRoot, udfLibRoot); } return INSTANCE; } @@ -176,9 +181,62 @@ public class UDFExecutableManager implements IService, SnapshotProcessor { @Override public boolean processTakeSnapshot(File snapshotDir) throws IOException { - return false; + return takeSnapshotForDir( + temporaryLibRoot, + snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "temporary") + && takeSnapshotForDir( + udfLibRoot, + snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "udf"); + } + + private boolean takeSnapshotForDir(String source, String snapshotDestination) throws IOException { + final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE; + final File sourceFile = systemFileFactory.getFile(source); + final File destinationFile = systemFileFactory.getFile(snapshotDestination); + final File temporaryFile = + systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID()); + + FileUtils.deleteQuietly(temporaryFile); + FileUtils.forceMkdir(temporaryFile); + + try { + FileUtils.copyDirectory(sourceFile, temporaryFile); + FileUtils.deleteQuietly(destinationFile); + return temporaryFile.renameTo(destinationFile); + } finally { + FileUtils.deleteQuietly(temporaryFile); + } } @Override - public void processLoadSnapshot(File snapshotDir) throws IOException {} + public void processLoadSnapshot(File snapshotDir) throws IOException { + loadSnapshotForDir( + snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "temporary", + temporaryLibRoot); + loadSnapshotForDir( + snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "udf", + udfLibRoot); + } + + private void loadSnapshotForDir(String snapshotSource, String destination) throws IOException { + final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE; + final File sourceFile = systemFileFactory.getFile(snapshotSource); + final File destinationFile = systemFileFactory.getFile(destination); + final File temporaryFile = + systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID()); + + try { + FileUtils.moveDirectory(destinationFile, temporaryFile); + FileUtils.forceMkdir(destinationFile); + try { + FileUtils.copyDirectory(sourceFile, destinationFile); + } catch (Exception e) { + LOGGER.error("Failed to load udf snapshot and rollback."); + FileUtils.deleteQuietly(destinationFile); + FileUtils.moveDirectory(temporaryFile, destinationFile); + } + } finally { + FileUtils.deleteQuietly(temporaryFile); + } + } }
