This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ea53dc1 HDDS-3227. Ensure eviction of stateMachineData from cache
only when both followers catch up (#2704)
ea53dc1 is described below
commit ea53dc186312548e1286c6b2f4c4f5b14ddfa690
Author: bshashikant <[email protected]>
AuthorDate: Tue Dec 7 12:20:42 2021 +0530
HDDS-3227. Ensure eviction of stateMachineData from cache only when both
followers catch up (#2704)
---
.../common/statemachine/DatanodeConfiguration.java | 24 +++++++
.../server/ratis/ContainerStateMachine.java | 73 ++++++++++++++++------
2 files changed, 78 insertions(+), 19 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index a2509a1..14ae4c9 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -52,6 +52,9 @@ public class DatanodeConfiguration {
public static final String DISK_CHECK_TIMEOUT_KEY =
"hdds.datanode.disk.check.timeout";
+ public static final String WAIT_ON_ALL_FOLLOWERS =
+ "hdds.datanode.wait.on.all.followers";
+
static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;
static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;
@@ -60,6 +63,8 @@ public class DatanodeConfiguration {
static final int FAILED_VOLUMES_TOLERATED_DEFAULT = -1;
+ static final boolean WAIT_ON_ALL_FOLLOWERS_DEFAULT = false;
+
static final long DISK_CHECK_MIN_GAP_DEFAULT =
Duration.ofMinutes(15).toMillis();
@@ -238,6 +243,25 @@ public class DatanodeConfiguration {
private boolean isChunkDataValidationCheck =
CHUNK_DATA_VALIDATION_CHECK_DEFAULT;
+ @Config(key = "wait.on.all.followers",
+ defaultValue = "false",
+ type = ConfigType.BOOLEAN,
+ tags = { DATANODE },
+ description = "Defines whether the leader datanode will wait for both"
+ + "followers to catch up before removing the stateMachineData from "
+ + "the cache."
+ )
+
+ private boolean waitOnAllFollowers = WAIT_ON_ALL_FOLLOWERS_DEFAULT;
+
+ public boolean waitOnAllFollowers() {
+ return waitOnAllFollowers;
+ }
+
+ public void setWaitOnAllFollowers(boolean val) {
+ this.waitOnAllFollowers = val;
+ }
+
@PostConstruct
public void validate() {
if (replicationMaxStreams < 1) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 3b35c97..301fc59 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -23,6 +23,8 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hdds.utils.ResourceLimitCache;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.util.Time;
@@ -148,6 +151,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
private final AtomicBoolean stateMachineHealthy;
private final Semaphore applyTransactionSemaphore;
+ private final boolean waitOnBothFollowers;
/**
* CSM metrics.
*/
@@ -195,6 +199,9 @@ public class ContainerStateMachine extends BaseStateMachine
{
this.executor = Executors.newFixedThreadPool(numContainerOpExecutors);
this.containerTaskQueues = new ConcurrentHashMap<>();
+ this.waitOnBothFollowers = conf.getObject(
+ DatanodeConfiguration.class).waitOnAllFollowers();
+
}
@Override
@@ -711,6 +718,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
// with some information like its peers and termIndex). So, calling
// updateLastApplied updates lastAppliedTermIndex.
updateLastApplied();
+ removeStateMachineDataIfNeeded(index);
}
private CompletableFuture<ContainerCommandResponseProto> submitTask(
@@ -736,23 +744,47 @@ public class ContainerStateMachine extends
BaseStateMachine {
return f;
}
+ // Removes the stateMachine data from cache once both followers catch up
+ // to the particular index.
+ private void removeStateMachineDataIfNeeded(long index) {
+ if (waitOnBothFollowers) {
+ try {
+ RaftServer.Division division =
ratisServer.getServer().getDivision(gid);
+ if (division.getInfo().isLeader()) {
+ long minIndex = Arrays.stream(division.getInfo()
+ .getFollowerNextIndices()).min().getAsLong();
+ LOG.debug("Removing data corresponding to log index {} min index {} "
+ + "from cache", index, minIndex);
+ stateMachineDataCache.removeIf(k -> k >= (Math.min(minIndex,
index)));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
/*
* ApplyTransaction calls in Ratis are sequential.
*/
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
long index = trx.getLogEntry().getIndex();
- // Since leader and one of the followers has written the data, it can
- // be removed from the stateMachineDataMap.
- stateMachineDataCache.remove(index);
-
- DispatcherContext.Builder builder =
- new DispatcherContext.Builder()
- .setTerm(trx.getLogEntry().getTerm())
- .setLogIndex(index);
-
- long applyTxnStartTime = Time.monotonicNowNanos();
try {
+ // Remove the stateMachine data once both followers have caught up. If
any
+ // one of the follower is behind, the pending queue will max out as
+ // configurable limit on pending request size and count and then will
+ // block and client will backoff as a result of that.
+ removeStateMachineDataIfNeeded(index);
+ // if waitOnBothFollower is false, remove the entry from the cache
+ // as soon as its applied and such entry exists in the cache.
+ if (!waitOnBothFollowers) {
+ stateMachineDataCache.removeIf(k -> k >= index);
+ }
+ DispatcherContext.Builder builder =
+ new DispatcherContext.Builder().setTerm(trx.getLogEntry().getTerm())
+ .setLogIndex(index);
+
+ long applyTxnStartTime = Time.monotonicNowNanos();
applyTransactionSemaphore.acquire();
metrics.incNumApplyTransactionsOps();
ContainerCommandRequestProto requestProto =
@@ -763,8 +795,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
if (cmdType == Type.WriteChunk) {
Preconditions
.checkArgument(requestProto.getWriteChunk().getData().isEmpty());
- builder
- .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
+ builder.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
}
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile
|| cmdType == Type.PutBlock || cmdType == Type.CreateContainer) {
@@ -774,8 +805,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
new CompletableFuture<>();
final Consumer<Exception> exceptionHandler = e -> {
LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
- + "{} exception {}", gid, requestProto.getCmdType(),
- index, e);
+ + "{} exception {}", gid, requestProto.getCmdType(), index, e);
stateMachineHealthy.compareAndSet(true, false);
metrics.incNumApplyTransactionsFails();
applyTransactionFuture.completeExceptionally(e);
@@ -783,8 +813,8 @@ public class ContainerStateMachine extends BaseStateMachine
{
// Ensure the command gets executed in a separate thread than
// stateMachineUpdater thread which is calling applyTransaction here.
- final CompletableFuture<ContainerCommandResponseProto> future
- = submitTask(requestProto, builder, exceptionHandler);
+ final CompletableFuture<ContainerCommandResponseProto> future =
+ submitTask(requestProto, builder, exceptionHandler);
future.thenApply(r -> {
if (trx.getServerRole() == RaftPeerRole.LEADER
&& trx.getStateMachineContext() != null) {
@@ -834,12 +864,11 @@ public class ContainerStateMachine extends
BaseStateMachine {
}
}
return applyTransactionFuture;
- }).whenComplete((r, t) -> {
+ }).whenComplete((r, t) -> {
if (t != null) {
stateMachineHealthy.set(false);
LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
- + "{} exception {}", gid, requestProto.getCmdType(),
- index, t);
+ + "{} exception {}", gid, requestProto.getCmdType(), index, t);
}
applyTransactionSemaphore.release();
metrics.recordApplyTransactionCompletion(
@@ -863,6 +892,12 @@ public class ContainerStateMachine extends
BaseStateMachine {
}
@Override
+ public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
+ // once the leader steps down , clear the cache
+ evictStateMachineCache();
+ }
+
+ @Override
public CompletableFuture<Void> truncate(long index) {
stateMachineDataCache.removeIf(k -> k >= index);
return CompletableFuture.completedFuture(null);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]