This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cabdec75b63 Fixed restart error when DataRegion is RatisConsensus
(#12554)
cabdec75b63 is described below
commit cabdec75b637a9f235a55ec634f4b71b66765f65
Author: Potato <[email protected]>
AuthorDate: Tue May 21 21:19:14 2024 +0800
Fixed restart error when DataRegion is RatisConsensus (#12554)
---
.../iotdb/confignode/manager/ConfigManager.java | 2 +-
.../manager/consensus/ConsensusManager.java | 7 ++
.../org/apache/iotdb/consensus/IConsensus.java | 3 +-
.../apache/iotdb/consensus/ratis/DiskGuardian.java | 65 ++++++------
.../iotdb/consensus/ratis/RatisConsensus.java | 113 ++++++++++++---------
5 files changed, 110 insertions(+), 80 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 0c407afec2f..2666008cd80 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1280,7 +1280,7 @@ public class ConfigManager implements IManager {
for (int i = 0; i < rpcTimeoutInMS / retryIntervalInMS; i++) {
try {
- if (consensusManager.get() == null) {
+ if (consensusManager.get() == null ||
!consensusManager.get().isInitialized()) {
TimeUnit.MILLISECONDS.sleep(retryIntervalInMS);
} else {
// When add non Seed-ConfigNode to the ConfigNodeGroup, the
parameter should be emptyList
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index e28aee7fdab..361229f4435 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -77,6 +77,8 @@ public class ConsensusManager {
private final IManager configManager;
private IConsensus consensusImpl;
+ private boolean isInitialized;
+
public ConsensusManager(IManager configManager, ConfigRegionStateMachine
stateMachine) {
this.configManager = configManager;
setConsensusLayer(stateMachine);
@@ -101,6 +103,7 @@ public class ConsensusManager {
"Something wrong happened while calling consensus layer's
createLocalPeer API.", e);
}
}
+ isInitialized = true;
}
public void close() throws IOException {
@@ -444,4 +447,8 @@ public class ConsensusManager {
public void manuallyTakeSnapshot() throws ConsensusException {
consensusImpl.triggerSnapshot(ConfigNodeInfo.CONFIG_REGION_ID, true);
}
+
+ public boolean isInitialized() {
+ return isInitialized;
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 8baaec89247..643c8360e84 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -43,7 +43,8 @@ import java.util.List;
public interface IConsensus {
/**
- * Start the consensus module.
+ * Start the consensus module. Note: You should call this function
immediately after initializing
+ * the instance, because calling other functions without start may produce
unexpected errors
*
* @throws IOException when start consensus errors
*/
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
index faa0f317d1a..ae6e597aafb 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
@@ -143,7 +143,7 @@ class DiskGuardian {
void start() {
// first schedule the snapshot daemon
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- workerThread, this::snapshotDaemon, 0L, daemonIntervalMs,
TimeUnit.SECONDS);
+ workerThread, this::snapshotDaemon, daemonIntervalMs,
daemonIntervalMs, TimeUnit.SECONDS);
// then schedule all checker daemons
snapshotArbitrators.forEach(
@@ -151,7 +151,7 @@ class DiskGuardian {
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
workerThread,
() -> checkerDaemon(checkers),
- 0L,
+ daemonIntervalMs,
interval.toLong(TimeUnit.SECONDS),
TimeUnit.SECONDS));
}
@@ -175,26 +175,31 @@ class DiskGuardian {
if (isStopped.get()) {
return;
}
- for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
- if (getSnapshotFlag(groupId).get()) {
- try {
-
serverRef.get().triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(groupId),
false);
- final boolean flagCleared =
snapshotFlag.get(groupId).compareAndSet(true, false);
- if (!flagCleared) {
- logger.warn(
- "{}: clear snapshot flag failed for group {}, please check the
related implementation",
+ try {
+ for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
+ if (getSnapshotFlag(groupId).get()) {
+ try {
+ serverRef
+ .get()
+
.triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(groupId), false);
+ final boolean flagCleared =
snapshotFlag.get(groupId).compareAndSet(true, false);
+ if (!flagCleared) {
+ logger.info(
+ "{}: clear snapshot flag failed for group {}, please check
the related implementation",
+ this,
+ groupId);
+ }
+ } catch (ConsensusException e) {
+ logger.info(
+ "{} take snapshot failed for group {} due to {}. Disk file
status {}",
this,
- groupId);
+ groupId,
+ e,
+ getLatestSummary(groupId).orElse(null));
}
- } catch (ConsensusException e) {
- logger.warn(
- "{} take snapshot failed for group {} due to {}. Disk file
status {}",
- this,
- groupId,
- e,
- getLatestSummary(groupId).orElse(null));
}
}
+ } catch (IOException ignore) {
}
}
@@ -203,18 +208,21 @@ class DiskGuardian {
if (isStopped.get()) {
return;
}
- for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
- final Optional<RaftLogSummary> summary = getLatestSummary(groupId);
- if (summary.isPresent()) {
- final Optional<Boolean> anyCheckerPositive =
- checkerList.stream()
- .map(checker -> checker.test(summary.get()))
- .filter(Boolean::booleanValue)
- .findAny();
- if (anyCheckerPositive.isPresent()) {
- getSnapshotFlag(groupId).set(true);
+ try {
+ for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
+ final Optional<RaftLogSummary> summary = getLatestSummary(groupId);
+ if (summary.isPresent()) {
+ final Optional<Boolean> anyCheckerPositive =
+ checkerList.stream()
+ .map(checker -> checker.test(summary.get()))
+ .filter(Boolean::booleanValue)
+ .findAny();
+ if (anyCheckerPositive.isPresent()) {
+ getSnapshotFlag(groupId).set(true);
+ }
}
}
+ } catch (IOException ignore) {
}
}
@@ -240,7 +248,6 @@ class DiskGuardian {
.getCurrentDir();
return new RaftLogSummary(gid, root);
} catch (IOException e) {
- logger.warn("{}: group not exists for {} and caught exception
", this, groupId, e);
return null;
}
});
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index de646de2334..45c82332f55 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -83,6 +83,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
+import org.apache.ratis.util.MemoizedCheckedSupplier;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
@@ -115,8 +116,7 @@ class RatisConsensus implements IConsensus {
private final RaftPeer myself;
private final File storageDir;
-
- private final RaftServer server;
+ private final MemoizedCheckedSupplier<RaftServer, IOException> server;
private final RaftProperties properties = new RaftProperties();
private final RaftClientRpc clientRpc;
@@ -144,8 +144,7 @@ class RatisConsensus implements IConsensus {
private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean>
canServeStaleRead;
- public RatisConsensus(ConsensusConfig config, IStateMachine.Registry
registry)
- throws IOException {
+ public RatisConsensus(ConsensusConfig config, IStateMachine.Registry
registry) {
myself =
Utils.fromNodeInfoAndPriorityToRaftPeer(
config.getThisNodeId(), config.getThisNodeEndPoint(),
DEFAULT_PRIORITY);
@@ -195,24 +194,28 @@ class RatisConsensus implements IConsensus {
clientRpc = new GrpcFactory(new
Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
+ // do not build server in constructor in case stateMachine is not ready
server =
- RaftServer.newBuilder()
- .setServerId(myself.getId())
- .setProperties(properties)
- .setOption(RaftStorage.StartupOption.RECOVER)
- .setStateMachineRegistry(
- raftGroupId ->
- new ApplicationStateMachineProxy(
-
registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
- raftGroupId,
- this::onLeaderChanged))
- .build();
+ MemoizedCheckedSupplier.valueOf(
+ () ->
+ RaftServer.newBuilder()
+ .setServerId(myself.getId())
+ .setProperties(properties)
+ .setOption(RaftStorage.StartupOption.RECOVER)
+ .setStateMachineRegistry(
+ raftGroupId ->
+ new ApplicationStateMachineProxy(
+ registry.apply(
+
Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
+ raftGroupId,
+ this::onLeaderChanged))
+ .build());
}
@Override
public synchronized void start() throws IOException {
MetricService.getInstance().addMetricSet(this.ratisMetricSet);
- server.start();
+ server.get().start();
registerAndStartDiskGuardian();
}
@@ -225,7 +228,7 @@ class RatisConsensus implements IConsensus {
Thread.currentThread().interrupt();
} finally {
clientManager.close();
- server.close();
+ server.get().close();
MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
}
}
@@ -252,7 +255,7 @@ class RatisConsensus implements IConsensus {
}
private RaftClientReply writeLocallyWithRetry(RaftClientRequest request)
throws IOException {
- return writeWithRetry(() -> server.submitClientRequest(request));
+ return writeWithRetry(() -> server.get().submitClientRequest(request));
}
private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message
message)
@@ -288,8 +291,12 @@ class RatisConsensus implements IConsensus {
Message message = new RequestMessage(request);
// 1. first try the local server
- RaftClientRequest clientRequest =
- buildRawRequest(raftGroupId, message,
RaftClientRequest.writeRequestType());
+ RaftClientRequest clientRequest;
+ try {
+ clientRequest = buildRawRequest(raftGroupId, message,
RaftClientRequest.writeRequestType());
+ } catch (IOException e) {
+ throw new RatisRequestFailedException(e);
+ }
RaftPeer suggestedLeader = null;
if ((isLeader(groupId) || raftGroup.getPeers().size() == 1)
@@ -392,7 +399,7 @@ class RatisConsensus implements IConsensus {
Retriable.attempt(
() -> {
try {
- return server.submitClientRequest(request);
+ return server.get().submitClientRequest(request);
} catch (
IOException
ioe) { // IOE indicates some unexpected errors, say
StatusRuntimeException
@@ -402,7 +409,7 @@ class RatisConsensus implements IConsensus {
// We can still retry in case it's a temporary network
partition.
return RaftClientReply.newBuilder()
.setClientId(localFakeId)
- .setServerId(server.getId())
+ .setServerId(server.get().getId())
.setGroupId(request.getRaftGroupId())
.setException(
new ReadIndexException(
@@ -440,9 +447,11 @@ class RatisConsensus implements IConsensus {
RaftGroup group = buildRaftGroup(groupId, peers);
try {
RaftClientReply reply =
- server.groupManagement(
- GroupManagementRequest.newAdd(
- localFakeId, myself.getId(),
localFakeCallId.incrementAndGet(), group, true));
+ server
+ .get()
+ .groupManagement(
+ GroupManagementRequest.newAdd(
+ localFakeId, myself.getId(),
localFakeCallId.incrementAndGet(), group, true));
if (!reply.isSuccess()) {
throw new RatisRequestFailedException(reply.getException());
}
@@ -468,14 +477,16 @@ class RatisConsensus implements IConsensus {
RaftClientReply reply;
try {
reply =
- server.groupManagement(
- GroupManagementRequest.newRemove(
- localFakeId,
- myself.getId(),
- localFakeCallId.incrementAndGet(),
- raftGroupId,
- true,
- false));
+ server
+ .get()
+ .groupManagement(
+ GroupManagementRequest.newRemove(
+ localFakeId,
+ myself.getId(),
+ localFakeCallId.incrementAndGet(),
+ raftGroupId,
+ true,
+ false));
if (!reply.isSuccess()) {
throw new RatisRequestFailedException(reply.getException());
}
@@ -602,7 +613,7 @@ class RatisConsensus implements IConsensus {
public boolean isLeader(ConsensusGroupId groupId) {
RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
try {
- return server.getDivision(raftGroupId).getInfo().isLeader();
+ return server.get().getDivision(raftGroupId).getInfo().isLeader();
} catch (IOException exception) {
// if the read fails, simply return not leader
logger.info("isLeader request failed with exception: ", exception);
@@ -614,7 +625,7 @@ class RatisConsensus implements IConsensus {
public boolean isLeaderReady(ConsensusGroupId groupId) {
RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
try {
- return server.getDivision(raftGroupId).getInfo().isLeaderReady();
+ return server.get().getDivision(raftGroupId).getInfo().isLeaderReady();
} catch (IOException exception) {
// if the read fails, simply return not ready
logger.info("isLeaderReady request failed with exception: ", exception);
@@ -626,7 +637,7 @@ class RatisConsensus implements IConsensus {
public long getLogicalClock(ConsensusGroupId groupId) {
RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
try {
- return server.getDivision(raftGroupId).getInfo().getCurrentTerm();
+ return server.get().getDivision(raftGroupId).getInfo().getCurrentTerm();
} catch (IOException exception) {
// if the read fails, simply return 0
logger.info("getLogicalClock request failed with exception: ",
exception);
@@ -637,7 +648,7 @@ class RatisConsensus implements IConsensus {
private boolean waitUntilLeaderReady(RaftGroupId groupId) {
DivisionInfo divisionInfo;
try {
- divisionInfo = server.getDivision(groupId).getInfo();
+ divisionInfo = server.get().getDivision(groupId).getInfo();
} catch (IOException e) {
// if the read fails, simply return not leader
logger.info("isLeaderReady checking failed with exception: ", e);
@@ -685,7 +696,7 @@ class RatisConsensus implements IConsensus {
RaftPeerId leaderId;
try {
- leaderId = server.getDivision(raftGroupId).getInfo().getLeaderId();
+ leaderId = server.get().getDivision(raftGroupId).getInfo().getLeaderId();
} catch (IOException e) {
logger.warn("fetch division info for group " + groupId + " failed due
to: ", e);
return null;
@@ -700,10 +711,15 @@ class RatisConsensus implements IConsensus {
@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
List<ConsensusGroupId> ids = new ArrayList<>();
- server
- .getGroupIds()
- .forEach(groupId ->
ids.add(Utils.fromRaftGroupIdToConsensusGroupId(groupId)));
- return ids;
+ try {
+ server
+ .get()
+ .getGroupIds()
+ .forEach(groupId ->
ids.add(Utils.fromRaftGroupIdToConsensusGroupId(groupId)));
+ return ids;
+ } catch (IOException e) {
+ return Collections.emptyList();
+ }
}
@Override
@@ -757,7 +773,7 @@ class RatisConsensus implements IConsensus {
final RaftClientReply reply;
try {
- reply = server.snapshotManagement(request);
+ reply = server.get().snapshotManagement(request);
if (!reply.isSuccess()) {
throw new RatisRequestFailedException(reply.getException());
}
@@ -785,9 +801,9 @@ class RatisConsensus implements IConsensus {
}
private RaftClientRequest buildRawRequest(
- RaftGroupId groupId, Message message, RaftClientRequest.Type type) {
+ RaftGroupId groupId, Message message, RaftClientRequest.Type type)
throws IOException {
return RaftClientRequest.newBuilder()
- .setServerId(server.getId())
+ .setServerId(server.get().getId())
.setClientId(localFakeId)
.setCallId(localFakeCallId.incrementAndGet())
.setGroupId(groupId)
@@ -799,7 +815,7 @@ class RatisConsensus implements IConsensus {
private RaftGroup getGroupInfo(RaftGroupId raftGroupId) {
RaftGroup raftGroup = null;
try {
- raftGroup = server.getDivision(raftGroupId).getGroup();
+ raftGroup = server.get().getDivision(raftGroupId).getGroup();
RaftGroup lastSeenGroup = lastSeen.getOrDefault(raftGroupId, null);
if (lastSeenGroup != null && !lastSeenGroup.equals(raftGroup)) {
// delete the pooled raft-client of the out-dated group and cache the
latest
@@ -854,9 +870,8 @@ class RatisConsensus implements IConsensus {
});
}
- @TestOnly
- public RaftServer getServer() {
- return server;
+ public RaftServer getServer() throws IOException {
+ return server.get();
}
@TestOnly