This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aa3fcafd6 [IOTDB-3098] ClusterSchemaInfo snapshot interface (#5816)
5aa3fcafd6 is described below

commit 5aa3fcafd68b9a1b7b7e30dbfbbcb1fc73dd5e4b
Author: lisijia <[email protected]>
AuthorDate: Mon May 9 12:34:35 2022 +0800

    [IOTDB-3098] ClusterSchemaInfo snapshot interface (#5816)
---
 .../consensus/response/StorageGroupSchemaResp.java |  6 ++
 .../statemachine/PartitionRegionStateMachine.java  |  6 +-
 .../confignode/persistence/ClusterSchemaInfo.java  | 69 +++++++++++++--
 .../confignode/persistence/SnapshotProcessor.java  | 52 ++++++++++++
 .../executor/ConfigRequestExecutor.java            | 75 ++++++++++++++++
 .../persistence/ClusterSchemaInfoTest.java         | 99 ++++++++++++++++++++++
 6 files changed, 299 insertions(+), 8 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
index 437ea3cd00..eb9f0aab00 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.confignode.consensus.response;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -52,4 +53,9 @@ public class StorageGroupSchemaResp implements DataSet {
       resp.setStorageGroupSchemaMap(schemaMap);
     }
   }
+
+  @TestOnly
+  public Map<String, TStorageGroupSchema> getSchemaMap() {
+    return schemaMap;
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index fe0196421c..ff9d76c4a9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -98,11 +98,13 @@ public class PartitionRegionStateMachine implements 
IStateMachine, IStateMachine
 
   @Override
   public boolean takeSnapshot(File snapshotDir) {
-    return false;
+    return executor.takeSnapshot(snapshotDir);
   }
 
   @Override
-  public void loadSnapshot(File latestSnapshotRootDir) {}
+  public void loadSnapshot(File latestSnapshotRootDir) {
+    executor.loadSnapshot(latestSnapshotRootDir);
+  }
 
   /** Transmit PhysicalPlan to confignode.service.executor.PlanExecutor */
   protected DataSet read(ConfigRequest plan) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
index 672745c686..a6d4fe2ca4 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
@@ -43,23 +43,33 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class ClusterSchemaInfo {
+public class ClusterSchemaInfo implements SnapshotProcessor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterSchemaInfo.class);
 
   // StorageGroup read write lock
   private final ReentrantReadWriteLock storageGroupReadWriteLock;
 
-  // TODO: serialize and deserialize
   private MTreeAboveSG mTree;
 
+  // The size of the buffer used for snapshot(temporary value)
+  private final int bufferSize = 10 * 1024 * 1024;
+
+  private final String snapshotFileName = "cluster_schema.bin";
+
   private ClusterSchemaInfo() {
     storageGroupReadWriteLock = new ReentrantReadWriteLock();
 
@@ -332,12 +342,59 @@ public class ClusterSchemaInfo {
     return result;
   }
 
-  public void serialize(ByteBuffer buffer) {
-    // TODO: Serialize ClusterSchemaInfo
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (snapshotFile.exists() && snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to take snapshot, because snapshot file [{}] is already 
exist.",
+          snapshotFile.getAbsolutePath());
+      return false;
+    }
+
+    File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + 
UUID.randomUUID());
+    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+
+    storageGroupReadWriteLock.readLock().lock();
+    try {
+      try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
+          FileChannel fileChannel = fileOutputStream.getChannel()) {
+        mTree.serialize(buffer);
+        buffer.flip();
+        fileChannel.write(buffer);
+      }
+      return tmpFile.renameTo(snapshotFile);
+    } finally {
+      buffer.clear();
+      tmpFile.delete();
+      storageGroupReadWriteLock.readLock().unlock();
+    }
   }
 
-  public void deserialize(ByteBuffer buffer) {
-    // TODO: Deserialize ClusterSchemaInfo
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws IOException {
+
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to load snapshot,snapshot file [{}] is not exist.",
+          snapshotFile.getAbsolutePath());
+      return;
+    }
+    storageGroupReadWriteLock.writeLock().lock();
+    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+    try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
+        FileChannel fileChannel = fileInputStream.getChannel()) {
+      // get buffer from fileChannel
+      fileChannel.read(buffer);
+      mTree.clear();
+      buffer.flip();
+      mTree.deserialize(buffer);
+    } finally {
+      buffer.clear();
+      storageGroupReadWriteLock.writeLock().unlock();
+    }
   }
 
   @TestOnly
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/SnapshotProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/SnapshotProcessor.java
new file mode 100644
index 0000000000..1ff79b5a83
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/SnapshotProcessor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Different from interface of state machines interacting with consensus 
groups. This interface is
+ * where each module actually performs the snapshot inside the state machine. 
Every module that can
+ * actually process(modify) snapshot content should implement it.
+ */
+public interface SnapshotProcessor {
+
+  /**
+   * Take snapshot
+   *
+   * @param snapshotDir Where snapshot files are stored.
+   * @return Whether the snapshot is successfully executed
+   * @throws TException Exception occurred during the thrift serialize struct
+   * @throws IOException Exception related to file read and write
+   */
+  boolean processTakeSnapshot(File snapshotDir) throws TException, IOException;
+
+  /**
+   * Load snapshot
+   *
+   * @param snapshotDir Load snapshot from here
+   * @throws TException Exception occurred during the thrift deserialize struct
+   * @throws IOException Exception related to file read and write
+   */
+  void processLoadSnapshot(File snapshotDir) throws TException, IOException;
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index 882fdf8f26..acf156938b 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -44,10 +44,23 @@ import org.apache.iotdb.confignode.persistence.AuthorInfo;
 import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.persistence.PartitionInfo;
+import org.apache.iotdb.confignode.persistence.SnapshotProcessor;
 import org.apache.iotdb.consensus.common.DataSet;
 
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public class ConfigRequestExecutor {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigRequestExecutor.class);
+
   private final NodeInfo nodeInfo;
 
   private final ClusterSchemaInfo clusterSchemaInfo;
@@ -138,4 +151,66 @@ public class ConfigRequestExecutor {
         throw new UnknownPhysicalPlanTypeException(req.getType());
     }
   }
+
+  public boolean takeSnapshot(File snapshotDir) {
+
+    if (!snapshotDir.exists() && !snapshotDir.mkdirs()) {
+      LOGGER.error("snapshot directory [{}] can not be created.", 
snapshotDir.getAbsolutePath());
+      return false;
+    }
+
+    File[] fileList = snapshotDir.listFiles();
+    if (fileList != null && fileList.length > 0) {
+      LOGGER.error("snapshot directory [{}] is not empty.", 
snapshotDir.getAbsolutePath());
+      return false;
+    }
+
+    AtomicBoolean result = new AtomicBoolean(true);
+    getAllAttributes()
+        .parallelStream()
+        .forEach(
+            x -> {
+              boolean takeSnapshotResult = true;
+              try {
+                takeSnapshotResult = x.processTakeSnapshot(snapshotDir);
+              } catch (TException | IOException e) {
+                LOGGER.error(e.getMessage());
+                takeSnapshotResult = false;
+              } finally {
+                // If any snapshot fails, the whole fails
+                // So this is just going to be false
+                if (!takeSnapshotResult) {
+                  result.set(false);
+                }
+              }
+            });
+    return result.get();
+  }
+
+  public void loadSnapshot(File latestSnapshotRootDir) {
+
+    if (!latestSnapshotRootDir.exists()) {
+      LOGGER.error(
+          "snapshot directory [{}] is not exist, can not load snapshot with 
this directory.",
+          latestSnapshotRootDir.getAbsolutePath());
+      return;
+    }
+
+    getAllAttributes()
+        .parallelStream()
+        .forEach(
+            x -> {
+              try {
+                x.processLoadSnapshot(latestSnapshotRootDir);
+              } catch (TException | IOException e) {
+                LOGGER.error(e.getMessage());
+              }
+            });
+  }
+
+  private List<SnapshotProcessor> getAllAttributes() {
+    List<SnapshotProcessor> allAttributes = new ArrayList<>();
+    allAttributes.add(clusterSchemaInfo);
+    return allAttributes;
+  }
 }
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
new file mode 100644
index 0000000000..629d216597
--- /dev/null
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class ClusterSchemaInfoTest {
+
+  private static ClusterSchemaInfo clusterSchemaInfo;
+  private static final File snapshotDir = new File(BASE_OUTPUT_PATH, 
"snapshot");
+
+  @BeforeClass
+  public static void setup() {
+    clusterSchemaInfo = ClusterSchemaInfo.getInstance();
+    if (!snapshotDir.exists()) {
+      snapshotDir.mkdirs();
+    }
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    clusterSchemaInfo.clear();
+    if (snapshotDir.exists()) {
+      FileUtils.deleteDirectory(snapshotDir);
+    }
+  }
+
+  @Test
+  public void testSnapshot() throws IOException, IllegalPathException {
+    Set<String> storageGroupPathList = new TreeSet<>();
+    storageGroupPathList.add("root.sg");
+    storageGroupPathList.add("root.a.sg");
+    storageGroupPathList.add("root.a.b.sg");
+    storageGroupPathList.add("root.a.a.a.b.sg");
+
+    Map<String, TStorageGroupSchema> testMap = new TreeMap<>();
+    int i = 0;
+    for (String path : storageGroupPathList) {
+      TStorageGroupSchema tStorageGroupSchema = new TStorageGroupSchema();
+      tStorageGroupSchema.setName(path);
+      tStorageGroupSchema.setTTL(i);
+      tStorageGroupSchema.setDataReplicationFactor(i);
+      tStorageGroupSchema.setSchemaReplicationFactor(i);
+      tStorageGroupSchema.setTimePartitionInterval(i);
+      testMap.put(path, tStorageGroupSchema);
+      clusterSchemaInfo.setStorageGroup(new 
SetStorageGroupReq(tStorageGroupSchema));
+      i++;
+    }
+    clusterSchemaInfo.processTakeSnapshot(snapshotDir);
+    clusterSchemaInfo.clear();
+    clusterSchemaInfo.processLoadSnapshot(snapshotDir);
+
+    Assert.assertEquals(
+        storageGroupPathList.size(), 
clusterSchemaInfo.getStorageGroupNames().size());
+
+    GetStorageGroupReq getStorageGroupReq =
+        new 
GetStorageGroupReq(Arrays.asList(PathUtils.splitPathToDetachedPath("root.**")));
+    Map<String, TStorageGroupSchema> reloadResult =
+        
clusterSchemaInfo.getMatchedStorageGroupSchemas(getStorageGroupReq).getSchemaMap();
+    Assert.assertEquals(testMap, reloadResult);
+  }
+}

Reply via email to