This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5aa3fcafd6 [IOTDB-3098] ClusterSchemaInfo snapshot interface (#5816)
5aa3fcafd6 is described below
commit 5aa3fcafd68b9a1b7b7e30dbfbbcb1fc73dd5e4b
Author: lisijia <[email protected]>
AuthorDate: Mon May 9 12:34:35 2022 +0800
[IOTDB-3098] ClusterSchemaInfo snapshot interface (#5816)
---
.../consensus/response/StorageGroupSchemaResp.java | 6 ++
.../statemachine/PartitionRegionStateMachine.java | 6 +-
.../confignode/persistence/ClusterSchemaInfo.java | 69 +++++++++++++--
.../confignode/persistence/SnapshotProcessor.java | 52 ++++++++++++
.../executor/ConfigRequestExecutor.java | 75 ++++++++++++++++
.../persistence/ClusterSchemaInfoTest.java | 99 ++++++++++++++++++++++
6 files changed, 299 insertions(+), 8 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
index 437ea3cd00..eb9f0aab00 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaResp.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.consensus.response;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.consensus.common.DataSet;
@@ -52,4 +53,9 @@ public class StorageGroupSchemaResp implements DataSet {
resp.setStorageGroupSchemaMap(schemaMap);
}
}
+
+ @TestOnly
+ public Map<String, TStorageGroupSchema> getSchemaMap() {
+ return schemaMap;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index fe0196421c..ff9d76c4a9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -98,11 +98,13 @@ public class PartitionRegionStateMachine implements
IStateMachine, IStateMachine
@Override
public boolean takeSnapshot(File snapshotDir) {
- return false;
+ return executor.takeSnapshot(snapshotDir);
}
@Override
- public void loadSnapshot(File latestSnapshotRootDir) {}
+ public void loadSnapshot(File latestSnapshotRootDir) {
+ executor.loadSnapshot(latestSnapshotRootDir);
+ }
/** Transmit PhysicalPlan to confignode.service.executor.PlanExecutor */
protected DataSet read(ConfigRequest plan) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
index 672745c686..a6d4fe2ca4 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
@@ -43,23 +43,33 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class ClusterSchemaInfo {
+public class ClusterSchemaInfo implements SnapshotProcessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterSchemaInfo.class);
// StorageGroup read write lock
private final ReentrantReadWriteLock storageGroupReadWriteLock;
- // TODO: serialize and deserialize
private MTreeAboveSG mTree;
+ // The size of the buffer used for snapshot(temporary value)
+ private final int bufferSize = 10 * 1024 * 1024;
+
+ private final String snapshotFileName = "cluster_schema.bin";
+
private ClusterSchemaInfo() {
storageGroupReadWriteLock = new ReentrantReadWriteLock();
@@ -332,12 +342,59 @@ public class ClusterSchemaInfo {
return result;
}
- public void serialize(ByteBuffer buffer) {
- // TODO: Serialize ClusterSchemaInfo
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+
+ File snapshotFile = new File(snapshotDir, snapshotFileName);
+ if (snapshotFile.exists() && snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to take snapshot, because snapshot file [{}] is already
exist.",
+ snapshotFile.getAbsolutePath());
+ return false;
+ }
+
+ File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" +
UUID.randomUUID());
+ ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+
+ storageGroupReadWriteLock.readLock().lock();
+ try {
+ try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
+ FileChannel fileChannel = fileOutputStream.getChannel()) {
+ mTree.serialize(buffer);
+ buffer.flip();
+ fileChannel.write(buffer);
+ }
+ return tmpFile.renameTo(snapshotFile);
+ } finally {
+ buffer.clear();
+ tmpFile.delete();
+ storageGroupReadWriteLock.readLock().unlock();
+ }
}
- public void deserialize(ByteBuffer buffer) {
- // TODO: Deserialize ClusterSchemaInfo
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws IOException {
+
+ File snapshotFile = new File(snapshotDir, snapshotFileName);
+ if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to load snapshot,snapshot file [{}] is not exist.",
+ snapshotFile.getAbsolutePath());
+ return;
+ }
+ storageGroupReadWriteLock.writeLock().lock();
+ ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+ try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
+ FileChannel fileChannel = fileInputStream.getChannel()) {
+ // get buffer from fileChannel
+ fileChannel.read(buffer);
+ mTree.clear();
+ buffer.flip();
+ mTree.deserialize(buffer);
+ } finally {
+ buffer.clear();
+ storageGroupReadWriteLock.writeLock().unlock();
+ }
}
@TestOnly
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/SnapshotProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/SnapshotProcessor.java
new file mode 100644
index 0000000000..1ff79b5a83
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/SnapshotProcessor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Different from interface of state machines interacting with consensus
groups. This interface is
+ * where each module actually performs the snapshot inside the state machine.
Every module that can
+ * actually process(modify) snapshot content should implement it.
+ */
+public interface SnapshotProcessor {
+
+ /**
+ * Take snapshot
+ *
+ * @param snapshotDir Where snapshot files are stored.
+ * @return Whether the snapshot is successfully executed
+ * @throws TException Exception occurred during the thrift serialize struct
+ * @throws IOException Exception related to file read and write
+ */
+ boolean processTakeSnapshot(File snapshotDir) throws TException, IOException;
+
+ /**
+ * Load snapshot
+ *
+ * @param snapshotDir Load snapshot from here
+ * @throws TException Exception occurred during the thrift deserialize struct
+ * @throws IOException Exception related to file read and write
+ */
+ void processLoadSnapshot(File snapshotDir) throws TException, IOException;
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index 882fdf8f26..acf156938b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -44,10 +44,23 @@ import org.apache.iotdb.confignode.persistence.AuthorInfo;
import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.PartitionInfo;
+import org.apache.iotdb.confignode.persistence.SnapshotProcessor;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public class ConfigRequestExecutor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigRequestExecutor.class);
+
private final NodeInfo nodeInfo;
private final ClusterSchemaInfo clusterSchemaInfo;
@@ -138,4 +151,66 @@ public class ConfigRequestExecutor {
throw new UnknownPhysicalPlanTypeException(req.getType());
}
}
+
+ public boolean takeSnapshot(File snapshotDir) {
+
+ if (!snapshotDir.exists() && !snapshotDir.mkdirs()) {
+ LOGGER.error("snapshot directory [{}] can not be created.",
snapshotDir.getAbsolutePath());
+ return false;
+ }
+
+ File[] fileList = snapshotDir.listFiles();
+ if (fileList != null && fileList.length > 0) {
+ LOGGER.error("snapshot directory [{}] is not empty.",
snapshotDir.getAbsolutePath());
+ return false;
+ }
+
+ AtomicBoolean result = new AtomicBoolean(true);
+ getAllAttributes()
+ .parallelStream()
+ .forEach(
+ x -> {
+ boolean takeSnapshotResult = true;
+ try {
+ takeSnapshotResult = x.processTakeSnapshot(snapshotDir);
+ } catch (TException | IOException e) {
+ LOGGER.error(e.getMessage());
+ takeSnapshotResult = false;
+ } finally {
+ // If any snapshot fails, the whole fails
+ // So this is just going to be false
+ if (!takeSnapshotResult) {
+ result.set(false);
+ }
+ }
+ });
+ return result.get();
+ }
+
+ public void loadSnapshot(File latestSnapshotRootDir) {
+
+ if (!latestSnapshotRootDir.exists()) {
+ LOGGER.error(
+ "snapshot directory [{}] is not exist, can not load snapshot with
this directory.",
+ latestSnapshotRootDir.getAbsolutePath());
+ return;
+ }
+
+ getAllAttributes()
+ .parallelStream()
+ .forEach(
+ x -> {
+ try {
+ x.processLoadSnapshot(latestSnapshotRootDir);
+ } catch (TException | IOException e) {
+ LOGGER.error(e.getMessage());
+ }
+ });
+ }
+
+ private List<SnapshotProcessor> getAllAttributes() {
+ List<SnapshotProcessor> allAttributes = new ArrayList<>();
+ allAttributes.add(clusterSchemaInfo);
+ return allAttributes;
+ }
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
new file mode 100644
index 0000000000..629d216597
--- /dev/null
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class ClusterSchemaInfoTest {
+
+ private static ClusterSchemaInfo clusterSchemaInfo;
+ private static final File snapshotDir = new File(BASE_OUTPUT_PATH,
"snapshot");
+
+ @BeforeClass
+ public static void setup() {
+ clusterSchemaInfo = ClusterSchemaInfo.getInstance();
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ clusterSchemaInfo.clear();
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ }
+
+ @Test
+ public void testSnapshot() throws IOException, IllegalPathException {
+ Set<String> storageGroupPathList = new TreeSet<>();
+ storageGroupPathList.add("root.sg");
+ storageGroupPathList.add("root.a.sg");
+ storageGroupPathList.add("root.a.b.sg");
+ storageGroupPathList.add("root.a.a.a.b.sg");
+
+ Map<String, TStorageGroupSchema> testMap = new TreeMap<>();
+ int i = 0;
+ for (String path : storageGroupPathList) {
+ TStorageGroupSchema tStorageGroupSchema = new TStorageGroupSchema();
+ tStorageGroupSchema.setName(path);
+ tStorageGroupSchema.setTTL(i);
+ tStorageGroupSchema.setDataReplicationFactor(i);
+ tStorageGroupSchema.setSchemaReplicationFactor(i);
+ tStorageGroupSchema.setTimePartitionInterval(i);
+ testMap.put(path, tStorageGroupSchema);
+ clusterSchemaInfo.setStorageGroup(new
SetStorageGroupReq(tStorageGroupSchema));
+ i++;
+ }
+ clusterSchemaInfo.processTakeSnapshot(snapshotDir);
+ clusterSchemaInfo.clear();
+ clusterSchemaInfo.processLoadSnapshot(snapshotDir);
+
+ Assert.assertEquals(
+ storageGroupPathList.size(),
clusterSchemaInfo.getStorageGroupNames().size());
+
+ GetStorageGroupReq getStorageGroupReq =
+ new
GetStorageGroupReq(Arrays.asList(PathUtils.splitPathToDetachedPath("root.**")));
+ Map<String, TStorageGroupSchema> reloadResult =
+
clusterSchemaInfo.getMatchedStorageGroupSchemas(getStorageGroupReq).getSchemaMap();
+ Assert.assertEquals(testMap, reloadResult);
+ }
+}