This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ea53dc1  HDDS-3227. Ensure eviction of stateMachineData from cache 
only when both followers catch up (#2704)
ea53dc1 is described below

commit ea53dc186312548e1286c6b2f4c4f5b14ddfa690
Author: bshashikant <[email protected]>
AuthorDate: Tue Dec 7 12:20:42 2021 +0530

    HDDS-3227. Ensure eviction of stateMachineData from cache only when both 
followers catch up (#2704)
---
 .../common/statemachine/DatanodeConfiguration.java | 24 +++++++
 .../server/ratis/ContainerStateMachine.java        | 73 ++++++++++++++++------
 2 files changed, 78 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index a2509a1..14ae4c9 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -52,6 +52,9 @@ public class DatanodeConfiguration {
   public static final String DISK_CHECK_TIMEOUT_KEY =
       "hdds.datanode.disk.check.timeout";
 
+  public static final String WAIT_ON_ALL_FOLLOWERS =
+      "hdds.datanode.wait.on.all.followers";
+
   static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;
 
   static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;
@@ -60,6 +63,8 @@ public class DatanodeConfiguration {
 
   static final int FAILED_VOLUMES_TOLERATED_DEFAULT = -1;
 
+  static final boolean WAIT_ON_ALL_FOLLOWERS_DEFAULT = false;
+
   static final long DISK_CHECK_MIN_GAP_DEFAULT =
       Duration.ofMinutes(15).toMillis();
 
@@ -238,6 +243,25 @@ public class DatanodeConfiguration {
   private boolean isChunkDataValidationCheck =
       CHUNK_DATA_VALIDATION_CHECK_DEFAULT;
 
+  @Config(key = "wait.on.all.followers",
+      defaultValue = "false",
+      type = ConfigType.BOOLEAN,
+      tags = { DATANODE },
+      description = "Defines whether the leader datanode will wait for both"
+          + "followers to catch up before removing the stateMachineData from "
+          + "the cache."
+  )
+
+  private boolean waitOnAllFollowers = WAIT_ON_ALL_FOLLOWERS_DEFAULT;
+
+  public boolean waitOnAllFollowers() {
+    return waitOnAllFollowers;
+  }
+
+  public void setWaitOnAllFollowers(boolean val) {
+    this.waitOnAllFollowers = val;
+  }
+
   @PostConstruct
   public void validate() {
     if (replicationMaxStreams < 1) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 3b35c97..301fc59 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -23,6 +23,8 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hdds.utils.ResourceLimitCache;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.util.Time;
 
@@ -148,6 +151,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
   private final AtomicBoolean stateMachineHealthy;
 
   private final Semaphore applyTransactionSemaphore;
+  private final boolean waitOnBothFollowers;
   /**
    * CSM metrics.
    */
@@ -195,6 +199,9 @@ public class ContainerStateMachine extends BaseStateMachine 
{
 
     this.executor = Executors.newFixedThreadPool(numContainerOpExecutors);
     this.containerTaskQueues = new ConcurrentHashMap<>();
+    this.waitOnBothFollowers = conf.getObject(
+        DatanodeConfiguration.class).waitOnAllFollowers();
+
   }
 
   @Override
@@ -711,6 +718,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     // with some information like its peers and termIndex). So, calling
     // updateLastApplied updates lastAppliedTermIndex.
     updateLastApplied();
+    removeStateMachineDataIfNeeded(index);
   }
 
   private CompletableFuture<ContainerCommandResponseProto> submitTask(
@@ -736,23 +744,47 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     return f;
   }
 
+  // Removes the stateMachine data from cache once both followers catch up
+  // to the particular index.
+  private void removeStateMachineDataIfNeeded(long index) {
+    if (waitOnBothFollowers) {
+      try {
+        RaftServer.Division division = 
ratisServer.getServer().getDivision(gid);
+        if (division.getInfo().isLeader()) {
+          long minIndex = Arrays.stream(division.getInfo()
+              .getFollowerNextIndices()).min().getAsLong();
+          LOG.debug("Removing data corresponding to log index {} min index {} "
+                  + "from cache", index, minIndex);
+          stateMachineDataCache.removeIf(k -> k >= (Math.min(minIndex, 
index)));
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
   /*
    * ApplyTransaction calls in Ratis are sequential.
    */
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     long index = trx.getLogEntry().getIndex();
-    // Since leader and one of the followers has written the data, it can
-    // be removed from the stateMachineDataMap.
-    stateMachineDataCache.remove(index);
-
-    DispatcherContext.Builder builder =
-        new DispatcherContext.Builder()
-            .setTerm(trx.getLogEntry().getTerm())
-            .setLogIndex(index);
-
-    long applyTxnStartTime = Time.monotonicNowNanos();
     try {
+      // Remove the stateMachine data once both followers have caught up. If 
any
+      // one of the follower is behind, the pending queue will max out as
+      // configurable limit on pending request size and count and then will
+      // block and client will backoff as a result of that.
+      removeStateMachineDataIfNeeded(index);
+      // if waitOnBothFollower is false, remove the entry from the cache
+      // as soon as its applied and such entry exists in the cache.
+      if (!waitOnBothFollowers) {
+        stateMachineDataCache.removeIf(k -> k >= index);
+      }
+      DispatcherContext.Builder builder =
+          new DispatcherContext.Builder().setTerm(trx.getLogEntry().getTerm())
+              .setLogIndex(index);
+
+      long applyTxnStartTime = Time.monotonicNowNanos();
       applyTransactionSemaphore.acquire();
       metrics.incNumApplyTransactionsOps();
       ContainerCommandRequestProto requestProto =
@@ -763,8 +795,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       if (cmdType == Type.WriteChunk) {
         Preconditions
             .checkArgument(requestProto.getWriteChunk().getData().isEmpty());
-        builder
-            .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
+        builder.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
       }
       if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile
           || cmdType == Type.PutBlock || cmdType == Type.CreateContainer) {
@@ -774,8 +805,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
           new CompletableFuture<>();
       final Consumer<Exception> exceptionHandler = e -> {
         LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
-                + "{} exception {}", gid, requestProto.getCmdType(),
-            index, e);
+            + "{} exception {}", gid, requestProto.getCmdType(), index, e);
         stateMachineHealthy.compareAndSet(true, false);
         metrics.incNumApplyTransactionsFails();
         applyTransactionFuture.completeExceptionally(e);
@@ -783,8 +813,8 @@ public class ContainerStateMachine extends BaseStateMachine 
{
 
       // Ensure the command gets executed in a separate thread than
       // stateMachineUpdater thread which is calling applyTransaction here.
-      final CompletableFuture<ContainerCommandResponseProto> future
-          = submitTask(requestProto, builder, exceptionHandler);
+      final CompletableFuture<ContainerCommandResponseProto> future =
+          submitTask(requestProto, builder, exceptionHandler);
       future.thenApply(r -> {
         if (trx.getServerRole() == RaftPeerRole.LEADER
             && trx.getStateMachineContext() != null) {
@@ -834,12 +864,11 @@ public class ContainerStateMachine extends 
BaseStateMachine {
           }
         }
         return applyTransactionFuture;
-      }).whenComplete((r, t) ->  {
+      }).whenComplete((r, t) -> {
         if (t != null) {
           stateMachineHealthy.set(false);
           LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
-                  + "{} exception {}", gid, requestProto.getCmdType(),
-              index, t);
+              + "{} exception {}", gid, requestProto.getCmdType(), index, t);
         }
         applyTransactionSemaphore.release();
         metrics.recordApplyTransactionCompletion(
@@ -863,6 +892,12 @@ public class ContainerStateMachine extends 
BaseStateMachine {
   }
 
   @Override
+  public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
+    // once the leader steps down , clear the cache
+    evictStateMachineCache();
+  }
+
+  @Override
   public CompletableFuture<Void> truncate(long index) {
     stateMachineDataCache.removeIf(k -> k >= index);
     return CompletableFuture.completedFuture(null);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to