This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch dledger-controller-snapshot
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/dledger-controller-snapshot by
this push:
new 28f2d0826 [ISSUE #5585] Implement controller statemachine snapshot
file generator (#5642)
28f2d0826 is described below
commit 28f2d0826288faee798942973e8ba8c5b70445e4
Author: hzh0425 <[email protected]>
AuthorDate: Mon Dec 5 15:13:21 2022 +0800
[ISSUE #5585] Implement controller statemachine snapshot file generator
(#5642)
* implement Statemachine snapshot file generator
* Using filechannel to replace fileoutputstream
---
.../controller/impl/DLedgerController.java | 26 +--
.../impl/manager/ReplicasInfoManager.java | 3 +-
.../DLedgerControllerStateMachine.java | 4 +-
.../StatemachineSnapshotFileGenerator.java | 177 +++++++++++++++++++++
.../StatemachineSnapshotFileGeneratorTest.java | 89 +++++++++++
5 files changed, 283 insertions(+), 16 deletions(-)
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 71e8e465c..27a75b2ea 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -24,18 +24,6 @@ import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiPredicate;
-import java.util.function.Supplier;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -47,6 +35,7 @@ import
org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.controller.impl.event.EventSerializer;
import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
+import
org.apache.rocketmq.controller.impl.statemachine.DLedgerControllerStateMachine;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -64,6 +53,19 @@ import
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataRespon
import
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerRequestHeader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiPredicate;
+import java.util.function.Supplier;
+
/**
* The implementation of controller, based on DLedger (raft).
*/
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 9636f63a0..f54709572 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -477,13 +477,12 @@ public class ReplicasInfoManager implements
SnapshotAbleMetadataManager {
@Override
public byte[] encodeMetadata() {
String json = RemotingSerializable.toJson(this, true);
- System.out.println(json);
return json.getBytes(StandardCharsets.UTF_8);
}
@Override
public boolean loadMetadata(byte[] data) {
- String json = new String(data);
+ String json = new String(data, StandardCharsets.UTF_8);
ReplicasInfoManager obj = RemotingSerializable.fromJson(json,
ReplicasInfoManager.class);
this.syncStateSetInfoTable.putAll(obj.syncStateSetInfoTable);
this.replicaInfoTable.putAll(obj.replicaInfoTable);
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/DLedgerControllerStateMachine.java
similarity index 95%
rename from
controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
rename to
controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/DLedgerControllerStateMachine.java
index 4f1408b37..ee10775d6 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/DLedgerControllerStateMachine.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.controller.impl;
+package org.apache.rocketmq.controller.impl.statemachine;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
@@ -39,7 +39,7 @@ public class DLedgerControllerStateMachine implements
StateMachine {
private final String dLedgerId;
public DLedgerControllerStateMachine(final ReplicasInfoManager
replicasInfoManager,
- final EventSerializer eventSerializer, final String dLedgerId) {
+ final EventSerializer
eventSerializer, final String dLedgerId) {
this.replicasInfoManager = replicasInfoManager;
this.eventSerializer = eventSerializer;
this.dLedgerId = dLedgerId;
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/StatemachineSnapshotFileGenerator.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/StatemachineSnapshotFileGenerator.java
new file mode 100644
index 000000000..d65c1eaea
--- /dev/null
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/StatemachineSnapshotFileGenerator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.statemachine;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.controller.impl.manager.MetadataManagerType;
+import org.apache.rocketmq.controller.impl.manager.SnapshotAbleMetadataManager;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StatemachineSnapshotFileGenerator {
+
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+
+ static class SnapshotFileHeader {
+ // Magic + Version + TotalSections + BodyLength + Reversed
+ public static final int HEADER_LENGTH = 18;
+ public static final Integer MAGIC = -626843481;
+ public static final Short VERSION = 1;
+ public static final Long REVERSED = 0L;
+ public final int totalSections;
+
+
+ public SnapshotFileHeader(int totalSections) {
+ this.totalSections = totalSections;
+ }
+
+ public static SnapshotFileHeader from(ByteBuffer header) {
+ if (header == null || header.capacity() < HEADER_LENGTH) {
+ return null;
+ }
+ int magic = header.getInt();
+ if (magic != SnapshotFileHeader.MAGIC) {
+ return null;
+ }
+ short version = header.getShort();
+ if (version != SnapshotFileHeader.VERSION) {
+ return null;
+ }
+
+ int totalSections = header.getInt();
+ return new SnapshotFileHeader(totalSections);
+ }
+
+ public ByteBuffer build() {
+ ByteBuffer buffer = ByteBuffer.allocate(HEADER_LENGTH);
+ buffer.putInt(MAGIC);
+ buffer.putShort(VERSION);
+ buffer.putInt(this.totalSections);
+ buffer.putLong(REVERSED);
+ buffer.flip();
+ return buffer;
+ }
+ }
+
+ private final Map<Short/*MetadataManagerId*/, SnapshotAbleMetadataManager>
metadataManagerTable;
+
+ public StatemachineSnapshotFileGenerator(final
List<SnapshotAbleMetadataManager> managers) {
+ this.metadataManagerTable = new HashMap<>();
+ managers.forEach(manager ->
this.metadataManagerTable.put(manager.getMetadataManagerType().getId(),
manager));
+ }
+
+
+ /**
+ * Generate snapshot and write the data to snapshot file.
+ */
+ public synchronized void generateSnapshot(final String snapshotPath)
throws IOException {
+ try (final FileChannel fileChannel =
FileChannel.open(Paths.get(snapshotPath),
+ StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
+ // Write Snapshot Header
+ SnapshotFileHeader header = new
SnapshotFileHeader(this.metadataManagerTable.size());
+
+ fileChannel.write(header.build());
+
+ // Write each section
+ ByteBuffer sectionHeader = ByteBuffer.allocate(6);
+ for (Map.Entry<Short, SnapshotAbleMetadataManager> section :
this.metadataManagerTable.entrySet()) {
+ byte[] serializedBytes = section.getValue().encodeMetadata();
+ // Section format:
<Section-MetadataManagerType><Section-Length><Section-Bytes>
+
+ // Write section header
+ sectionHeader.putShort(section.getKey());
+ sectionHeader.putInt(serializedBytes.length);
+ sectionHeader.flip();
+ fileChannel.write(sectionHeader);
+ sectionHeader.rewind();
+
+ // Write section bytes
+ fileChannel.write(ByteBuffer.wrap(serializedBytes));
+ }
+
+ fileChannel.force(true);
+ }
+ }
+
+ /**
+ * Read snapshot from snapshot file and load the metadata into
corresponding metadataManager
+ */
+ public synchronized boolean loadSnapshot(final String snapshotPath) throws
IOException {
+ try (ReadableByteChannel channel =
Channels.newChannel(Files.newInputStream(Paths.get(snapshotPath)))) {
+ // Read snapshot Header
+ ByteBuffer header =
ByteBuffer.allocate(SnapshotFileHeader.HEADER_LENGTH);
+ if (channel.read(header) < 0) {
+ return false;
+ }
+ header.rewind();
+
+ SnapshotFileHeader fileHeader = SnapshotFileHeader.from(header);
+ if (fileHeader == null) {
+ return false;
+ }
+
+ // Read each section
+ ByteBuffer sectionHeader = ByteBuffer.allocate(6);
+ int successLoadCnt = 0;
+ int readSize;
+ while ((readSize = channel.read(sectionHeader)) > 0) {
+ sectionHeader.rewind();
+
+ if (readSize != sectionHeader.capacity()) {
+ throw new IOException("Invalid amount of data read for the
header of a section");
+ }
+
+ // Section format:
<Section-MetadataManagerType><Section-Length><Section-Bytes>
+ short sectionType = sectionHeader.getShort();
+ int length = sectionHeader.getInt();
+
+ ByteBuffer data = ByteBuffer.allocate(length);
+ readSize = channel.read(data);
+
+ if (readSize != length) {
+ throw new IOException("Invalid amount of data read for the
body of a section");
+ }
+
+ if (this.metadataManagerTable.containsKey(sectionType)) {
+ SnapshotAbleMetadataManager metadataManager =
this.metadataManagerTable.get(sectionType);
+ if (!metadataManager.loadMetadata(data.array())) {
+ return false;
+ }
+ successLoadCnt ++;
+ log.info("Load snapshot metadata for {} success!",
MetadataManagerType.from(sectionType));
+ }
+ }
+ if (successLoadCnt != this.metadataManagerTable.size()) {
+ log.info("Failed to load snapshot metadata file totally,
expected section nums:{}, success load nums:{}",
this.metadataManagerTable.size(), successLoadCnt);
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/statemachine/StatemachineSnapshotFileGeneratorTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/statemachine/StatemachineSnapshotFileGeneratorTest.java
new file mode 100644
index 000000000..1c4d7e478
--- /dev/null
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/statemachine/StatemachineSnapshotFileGeneratorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.controller.impl.statemachine;
+
+import org.apache.rocketmq.controller.impl.manager.BrokerInfo;
+import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
+import org.apache.rocketmq.controller.impl.manager.SyncStateInfo;
+import
org.apache.rocketmq.controller.impl.statemachine.StatemachineSnapshotFileGenerator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StatemachineSnapshotFileGeneratorTest {
+
+ public String snapshotPath;
+ StatemachineSnapshotFileGenerator snapshotGenerator;
+ ReplicasInfoManager replicasInfoManager;
+
+
+ public void mockMetadata() {
+ BrokerInfo broker1 = new BrokerInfo("broker1", "cluster1");
+ broker1.setBrokerIdTable(new HashMap<String, Long>() {{
+ put("127.0.0.1:10000", 1L);
+ put("127.0.0.1:10001", 2L);
+ }});
+ broker1.setBrokerIdCount(2L);
+
+ SyncStateInfo syncStateInfo1 = new SyncStateInfo("cluster1",
"broker1", "127.0.0.1:10000");
+ syncStateInfo1.setSyncStateSet(new HashSet<String>() {{
+ add("127.0.0.1:10000");
+ add("127.0.0.1:10001");
+ }});
+
+ this.replicasInfoManager.setReplicaInfoTable(new HashMap<String,
BrokerInfo>() {{
+ put("broker1", broker1);
+ }});
+ this.replicasInfoManager.setSyncStateSetInfoTable(new HashMap<String,
SyncStateInfo>() {{
+ put("broker1", syncStateInfo1);
+ }});
+ }
+
+ @Before
+ public void init() {
+ this.snapshotPath = Paths.get(File.separator + "tmp",
"ControllerSnapshot").toString();
+ File file = new File(snapshotPath);
+ File parentFile = file.getParentFile();
+ if (parentFile != null) {
+ parentFile.mkdirs();
+ }
+ this.replicasInfoManager = new ReplicasInfoManager(null);
+ mockMetadata();
+ this.snapshotGenerator = new
StatemachineSnapshotFileGenerator(Collections.singletonList(this.replicasInfoManager));
+ }
+
+
+ @Test
+ public void testGenerateAndLoadSnapshot() throws IOException {
+ this.snapshotGenerator.generateSnapshot(this.snapshotPath);
+
+ ReplicasInfoManager emptyManager = new ReplicasInfoManager(null);
+ StatemachineSnapshotFileGenerator generator1 = new
StatemachineSnapshotFileGenerator(Collections.singletonList(emptyManager));
+ assertTrue(generator1.loadSnapshot(this.snapshotPath));
+
+ assertArrayEquals(emptyManager.encodeMetadata(),
this.replicasInfoManager.encodeMetadata());
+ }
+}
\ No newline at end of file