This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fa7fc93b040 [IOTDB-6251] Make read in SchemaRegion linearizable by
default (#11571)
fa7fc93b040 is described below
commit fa7fc93b04029b776db402be50717c7b4ef8a611
Author: William Song <[email protected]>
AuthorDate: Fri Nov 24 23:12:21 2023 +0800
[IOTDB-6251] Make read in SchemaRegion linearizable by default (#11571)
---
.../apache/iotdb/consensus/config/RatisConfig.java | 2 +-
...ion.java => RatisReadUnavailableException.java} | 11 ++++----
.../ratis/ApplicationStateMachineProxy.java | 17 ++++--------
.../iotdb/consensus/ratis/RatisConsensus.java | 32 +++++++++++++++++-----
.../apache/iotdb/consensus/ratis/utils/Utils.java | 8 ++----
.../iotdb/consensus/ratis/RecoverReadTest.java | 4 +--
.../apache/iotdb/consensus/ratis/TestUtils.java | 4 +--
.../db/consensus/SchemaRegionConsensusImpl.java | 1 +
8 files changed, 44 insertions(+), 35 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index 6ea4d71604b..ad40495972b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -1123,7 +1123,7 @@ public class RatisConfig {
}
public static class Builder {
- private Read.Option readOption = Option.LINEARIZABLE;
+ private Read.Option readOption = Option.DEFAULT;
private TimeDuration readTimeout = TimeDuration.valueOf(10,
TimeUnit.SECONDS);
public Read.Builder setReadOption(Read.Option readOption) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
similarity index 71%
rename from
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java
rename to
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
index edf67629890..d5ac76b93fe 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
@@ -19,14 +19,13 @@
package org.apache.iotdb.consensus.exception;
-/** RaftServer is redoing RaftLog. Unable to serve linearizable read requests.
*/
-public class RatisUnderRecoveryException extends ConsensusException {
+/** RaftServer is unable to serve linearizable read requests. */
+public class RatisReadUnavailableException extends ConsensusException {
- public RatisUnderRecoveryException(Throwable cause) {
+ public RatisReadUnavailableException(Throwable cause) {
super(
- "Raft Server is redoing Raft Log and cannot serve read requests now. "
- + "Please try read later: "
- + cause,
+ "Raft Server cannot serve read requests now (leader is unknown or
under recovery). "
+ + "Please try read later: ",
cause);
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 1460398ab05..06e7df52891 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
@@ -55,10 +54,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
public class ApplicationStateMachineProxy extends BaseStateMachine {
@@ -68,9 +65,8 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
private final IStateMachine applicationStateMachine;
private final SnapshotStorage snapshotStorage;
private final RaftGroupId groupId;
- private final ConsensusGroupId consensusGroupId;
private final TConsensusGroupType consensusGroupType;
- private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean>
canStaleRead;
+ private final BiConsumer<RaftGroupMemberId, RaftPeerId> leaderChangeListener;
ApplicationStateMachineProxy(IStateMachine stateMachine, RaftGroupId id) {
this(stateMachine, id, null);
@@ -79,11 +75,10 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
ApplicationStateMachineProxy(
IStateMachine stateMachine,
RaftGroupId id,
- ConcurrentHashMap<ConsensusGroupId, AtomicBoolean> canStaleRead) {
+ BiConsumer<RaftGroupMemberId, RaftPeerId> onLeaderChanged) {
this.applicationStateMachine = stateMachine;
- this.canStaleRead = canStaleRead;
+ this.leaderChangeListener = onLeaderChanged;
this.groupId = id;
- this.consensusGroupId = Utils.fromRaftGroupIdToConsensusGroupId(id);
snapshotStorage = new SnapshotStorage(applicationStateMachine, groupId);
consensusGroupType =
Utils.getConsensusGroupTypeFromPrefix(groupId.toString());
applicationStateMachine.start();
@@ -282,9 +277,7 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
@Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId
newLeaderId) {
- Optional.ofNullable(canStaleRead)
- .ifPresent(
- m -> m.computeIfAbsent(consensusGroupId, id -> new
AtomicBoolean(false)).set(false));
+ leaderChangeListener.accept(groupMemberId, newLeaderId);
applicationStateMachine
.event()
.notifyLeaderChanged(
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 349bde3a804..2ae303ec96c 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -44,8 +44,8 @@ import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
-import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
import org.apache.iotdb.consensus.ratis.utils.Retriable;
@@ -67,6 +67,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SnapshotManagementRequest;
@@ -127,14 +128,14 @@ class RatisConsensus implements IConsensus {
private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int)
TimeUnit.SECONDS.toMillis(20);
private final RatisConfig config;
+ private final RatisConfig.Read.Option readOption;
private final RetryPolicy<RaftClientReply> readRetryPolicy;
private final RetryPolicy<RaftClientReply> writeRetryPolicy;
private final RatisMetricSet ratisMetricSet;
private final TConsensusGroupType consensusGroupType;
- private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean>
canServeStaleRead =
- new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean>
canServeStaleRead;
public RatisConsensus(ConsensusConfig config, IStateMachine.Registry
registry)
throws IOException {
@@ -148,6 +149,9 @@ class RatisConsensus implements IConsensus {
Utils.initRatisConfig(properties, config.getRatisConfig());
this.config = config.getRatisConfig();
+ this.readOption = this.config.getRead().getReadOption();
+ this.canServeStaleRead =
+ this.readOption == RatisConfig.Read.Option.DEFAULT ? new
ConcurrentHashMap<>() : null;
this.consensusGroupType = config.getConsensusGroupType();
this.ratisMetricSet = new RatisMetricSet();
this.readRetryPolicy =
@@ -188,7 +192,7 @@ class RatisConsensus implements IConsensus {
new ApplicationStateMachineProxy(
registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
raftGroupId,
- canServeStaleRead))
+ this::onLeaderChanged))
.build();
}
@@ -327,21 +331,25 @@ class RatisConsensus implements IConsensus {
throw new ConsensusGroupNotExistException(groupId);
}
+ // perform linearizable read under following two conditions:
+ // 1. Read.Option is linearizable
+ // 2. First probing read when Read.Option is default
final boolean isLinearizableRead =
- !canServeStaleRead.computeIfAbsent(groupId, id -> new
AtomicBoolean(false)).get();
+ readOption == RatisConfig.Read.Option.LINEARIZABLE
+ || !canServeStaleRead.computeIfAbsent(groupId, id -> new
AtomicBoolean(false)).get();
RaftClientReply reply;
try {
reply = doRead(raftGroupId, request, isLinearizableRead);
// allow stale read if current linearizable read returns successfully
- if (isLinearizableRead) {
+ if (canServeStaleRead != null && isLinearizableRead) {
canServeStaleRead.get(groupId).set(true);
}
} catch (ReadException | ReadIndexException e) {
if (isLinearizableRead) {
// linearizable read failed. the RaftServer is recovering from Raft
Log and cannot serve
// read requests.
- throw new RatisUnderRecoveryException(e);
+ throw new RatisReadUnavailableException(e);
} else {
throw new RatisRequestFailedException(e);
}
@@ -766,6 +774,16 @@ class RatisConsensus implements IConsensus {
return reply;
}
+ private void onLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId
leaderId) {
+ Optional.ofNullable(canServeStaleRead)
+ .ifPresent(
+ m -> {
+ final ConsensusGroupId gid =
+
Utils.fromRaftGroupIdToConsensusGroupId(groupMemberId.getGroupId());
+ canServeStaleRead.computeIfAbsent(gid, id -> new
AtomicBoolean()).set(false);
+ });
+ }
+
@TestOnly
public RaftServer getServer() {
return server;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
index 5390420b304..e3f4d2328c5 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
@@ -307,11 +307,9 @@ public class Utils {
RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(
properties, config.getRpc().getFirstElectionTimeoutMax());
- RaftServerConfigKeys.Read.Option option =
- config.getRead().getReadOption() == RatisConfig.Read.Option.DEFAULT
- ? RaftServerConfigKeys.Read.Option.DEFAULT
- : RaftServerConfigKeys.Read.Option.LINEARIZABLE;
- RaftServerConfigKeys.Read.setOption(properties, option);
+ /* linearizable means we can obtain consistent data from followers.
+ If we prefer latency, we can directly use staleRead */
+ RaftServerConfigKeys.Read.setOption(properties,
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
RaftServerConfigKeys.Read.setTimeout(properties,
config.getRead().getReadTimeout());
RaftServerConfigKeys.setSleepDeviationThreshold(
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
index 55d12730ffa..68df45a74a8 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.config.RatisConfig;
-import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
+import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
import org.apache.ratis.util.TimeDuration;
import org.junit.After;
@@ -228,6 +228,6 @@ public class RecoverReadTest {
miniCluster.waitUntilActiveLeader();
// query during redo: get exception that ratis is under recovery
- Assert.assertThrows(RatisUnderRecoveryException.class, () ->
miniCluster.readThrough(0));
+ Assert.assertThrows(RatisReadUnavailableException.class, () ->
miniCluster.readThrough(0));
}
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 64f00266d71..275097796ba 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -35,7 +35,7 @@ import
org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
+import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
import org.apache.ratis.thirdparty.com.google.common.base.Preconditions;
import org.apache.ratis.util.FileUtils;
@@ -406,7 +406,7 @@ public class TestUtils {
try {
readResp = readThrough(serverIndex);
break;
- } catch (RatisUnderRecoveryException e) {
+ } catch (RatisReadUnavailableException e) {
logger.warn("ratis is redoing raft log, shall wait some time: ", e);
waitDuration.sleep();
} catch (ConsensusException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 9f6d42ca5f8..711fa5d36a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -142,6 +142,7 @@ public class SchemaRegionConsensusImpl {
.build())
.setRead(
RatisConfig.Read.newBuilder()
+
.setReadOption(RatisConfig.Read.Option.LINEARIZABLE)
// use thrift connection timeout to unify
read timeout
.setReadTimeout(
TimeDuration.valueOf(