This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e4a1993e79 HDDS-7618. Replication Commands should timeout if not 
processed on datanodes in time (#4069)
e4a1993e79 is described below

commit e4a1993e798bc843a1f124e0e348a99735b3e998
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Tue Dec 13 09:41:42 2022 +0000

    HDDS-7618. Replication Commands should timeout if not processed on 
datanodes in time (#4069)
---
 .../common/statemachine/DatanodeStateMachine.java  | 10 ++-
 .../DeleteContainerCommandHandler.java             | 26 +++++++-
 .../ReconstructECContainersCommandHandler.java     |  6 +-
 .../ReplicateContainerCommandHandler.java          |  3 +-
 .../states/endpoint/HeartbeatEndpointTask.java     |  4 ++
 .../ECReconstructionCommandInfo.java               | 22 ++++---
 .../ECReconstructionCoordinatorTask.java           | 20 +++++-
 .../reconstruction/ECReconstructionSupervisor.java | 11 ++--
 .../replication/ReplicationSupervisor.java         | 40 +++++++-----
 .../replication/ReplicationSupervisorMetrics.java  |  5 +-
 .../container/replication/ReplicationTask.java     | 27 ++++++--
 .../hadoop/ozone/protocol/commands/SCMCommand.java | 35 +++++++++++
 .../TestDeleteContainerCommandHandler.java         | 71 +++++++++++++++++++++
 .../TestECReconstructionSupervisor.java            | 73 ++++++++++++++++++++--
 .../ReplicationSupervisorScheduling.java           | 10 ++-
 .../replication/TestReplicationSupervisor.java     | 44 ++++++++++++-
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |  1 +
 .../container/replication/ReplicationManager.java  | 36 +++++++++++
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  9 ++-
 .../replication/TestReplicationManager.java        |  9 +++
 .../hdds/scm/TestSCMDatanodeProtocolServer.java    | 54 ++++++++++++++++
 .../ozone/freon/ClosedContainerReplicator.java     | 15 ++++-
 22 files changed, 468 insertions(+), 63 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 4d72bb317f..107dba186c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.common.statemachine;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Clock;
+import java.time.ZoneId;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -40,6 +42,7 @@ import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.ozone.HddsDatanodeStopService;
+import org.apache.hadoop.ozone.common.MonotonicClock;
 import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
 import org.apache.hadoop.ozone.container.common.report.ReportManager;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
@@ -139,6 +142,7 @@ public class DatanodeStateMachine implements Closeable {
     this.conf = conf;
     this.datanodeDetails = datanodeDetails;
 
+    Clock clock = new MonotonicClock(ZoneId.systemDefault());
     // Expected to be initialized already.
     layoutStorage = new DatanodeLayoutStorage(conf,
         datanodeDetails.getUuidString());
@@ -180,7 +184,7 @@ public class DatanodeStateMachine implements Closeable {
         conf.getObject(ReplicationConfig.class);
     supervisor =
         new ReplicationSupervisor(container.getContainerSet(), context,
-            replicatorMetrics, replicationConfig);
+            replicatorMetrics, replicationConfig, clock);
 
     replicationSupervisorMetrics =
         ReplicationSupervisorMetrics.create(supervisor);
@@ -193,7 +197,7 @@ public class DatanodeStateMachine implements Closeable {
     ecReconstructionSupervisor =
         new ECReconstructionSupervisor(container.getContainerSet(), context,
             replicationConfig.getReplicationMaxStreams(),
-            ecReconstructionCoordinator);
+            ecReconstructionCoordinator, clock);
 
 
     // When we add new handlers just adding a new handler here should do the
@@ -207,7 +211,7 @@ public class DatanodeStateMachine implements Closeable {
         .addHandler(new ReconstructECContainersCommandHandler(conf,
             ecReconstructionSupervisor))
         .addHandler(new DeleteContainerCommandHandler(
-            dnConf.getContainerDeleteThreads()))
+            dnConf.getContainerDeleteThreads(), clock))
         .addHandler(new ClosePipelineCommandHandler())
         .addHandler(new CreatePipelineCommandHandler(conf))
         .addHandler(new SetNodeOperationalStateCommandHandler(conf))
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
index 58ad2d18e4..b209e6ee34 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -48,15 +49,22 @@ public class DeleteContainerCommandHandler implements 
CommandHandler {
       LoggerFactory.getLogger(DeleteContainerCommandHandler.class);
 
   private final AtomicInteger invocationCount = new AtomicInteger(0);
+  private final AtomicInteger timeoutCount = new AtomicInteger(0);
   private final AtomicLong totalTime = new AtomicLong(0);
   private final ExecutorService executor;
+  private final Clock clock;
 
-  public DeleteContainerCommandHandler(int threadPoolSize) {
-    this.executor = Executors.newFixedThreadPool(
+  public DeleteContainerCommandHandler(int threadPoolSize, Clock clock) {
+    this(clock, Executors.newFixedThreadPool(
         threadPoolSize, new ThreadFactoryBuilder()
-            .setNameFormat("DeleteContainerThread-%d").build());
+            .setNameFormat("DeleteContainerThread-%d").build()));
   }
 
+  protected DeleteContainerCommandHandler(Clock clock,
+      ExecutorService executor) {
+    this.executor = executor;
+    this.clock = clock;
+  }
   @Override
   public void handle(final SCMCommand command,
                      final OzoneContainer ozoneContainer,
@@ -69,6 +77,14 @@ public class DeleteContainerCommandHandler implements 
CommandHandler {
       final long startTime = Time.monotonicNow();
       invocationCount.incrementAndGet();
       try {
+        if (command.hasExpired(clock.millis())) {
+          LOG.info("Not processing the delete container command for " +
+              "container {} as the current time {}ms is after the command " +
+              "deadline {}ms", deleteContainerCommand.getContainerID(),
+              clock.millis(), command.getDeadline());
+          timeoutCount.incrementAndGet();
+          return;
+        }
         controller.deleteContainer(deleteContainerCommand.getContainerID(),
             deleteContainerCommand.isForce());
       } catch (IOException e) {
@@ -94,6 +110,10 @@ public class DeleteContainerCommandHandler implements 
CommandHandler {
     return this.invocationCount.get();
   }
 
+  public int getTimeoutCount() {
+    return this.timeoutCount.get();
+  }
+
   @Override
   public long getAverageRunTime() {
     final int invocations = invocationCount.get();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
index 57d4d16f8a..c6abfc27c3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
@@ -47,11 +47,7 @@ public class ReconstructECContainersCommandHandler 
implements CommandHandler {
     ReconstructECContainersCommand ecContainersCommand =
         (ReconstructECContainersCommand) command;
     ECReconstructionCommandInfo reconstructionCommandInfo =
-        new ECReconstructionCommandInfo(ecContainersCommand.getContainerID(),
-            ecContainersCommand.getEcReplicationConfig(),
-            ecContainersCommand.getMissingContainerIndexes(),
-            ecContainersCommand.getSources(),
-            ecContainersCommand.getTargetDatanodes());
+        new ECReconstructionCommandInfo(ecContainersCommand);
     this.supervisor.addTask(reconstructionCommandInfo);
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index 44c783846a..df589e287d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -71,7 +71,8 @@ public class ReplicateContainerCommandHandler implements 
CommandHandler {
         "Replication command is received for container %s "
             + "without source datanodes.", containerID);
 
-    supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes));
+    ReplicationTask task = new ReplicationTask(replicateCommand);
+    supervisor.addTask(task);
   }
 
   @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index ccb0e8b7d7..a694ba00be 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -426,6 +426,7 @@ public class HeartbeatEndpointTask
    * Common processing for SCM commands.
    *  - set term
    *  - set encoded token
+   *  - any deadline which is relevant to the command
    *  - add to context's queue
    */
   private void processCommonCommand(
@@ -436,6 +437,9 @@ public class HeartbeatEndpointTask
     if (response.hasEncodedToken()) {
       cmd.setEncodedToken(response.getEncodedToken());
     }
+    if (response.hasDeadlineMsSinceEpoch()) {
+      cmd.setDeadline(response.getDeadlineMsSinceEpoch());
+    }
     context.addCommand(cmd);
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
index c95f9646f8..8a4d26b55c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
@@ -35,17 +35,21 @@ public class ECReconstructionCommandInfo {
   private List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
       sources;
   private List<DatanodeDetails> targetDatanodes;
+  private long deadlineMsSinceEpoch = 0;
 
-  public ECReconstructionCommandInfo(long containerID,
-      ECReplicationConfig ecReplicationConfig, byte[] missingContainerIndexes,
-      List<DatanodeDetailsAndReplicaIndex> sources,
-      List<DatanodeDetails> targetDatanodes) {
-    this.containerID = containerID;
-    this.ecReplicationConfig = ecReplicationConfig;
+  public ECReconstructionCommandInfo(ReconstructECContainersCommand cmd) {
+    this.containerID = cmd.getContainerID();
+    this.ecReplicationConfig = cmd.getEcReplicationConfig();
     this.missingContainerIndexes =
-        Arrays.copyOf(missingContainerIndexes, missingContainerIndexes.length);
-    this.sources = sources;
-    this.targetDatanodes = targetDatanodes;
+        Arrays.copyOf(cmd.getMissingContainerIndexes(),
+            cmd.getMissingContainerIndexes().length);
+    this.sources = cmd.getSources();
+    this.targetDatanodes = cmd.getTargetDatanodes();
+    this.deadlineMsSinceEpoch = cmd.getDeadline();
+  }
+
+  public long getDeadline() {
+    return deadlineMsSinceEpoch;
   }
 
   public long getContainerID() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
index e0aa14419a..4e46860c88 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Clock;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -36,17 +37,21 @@ public class ECReconstructionCoordinatorTask implements 
Runnable {
   static final Logger LOG =
       LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class);
   private final ConcurrentHashMap.KeySetView<Object, Boolean> 
inprogressCounter;
-  private ECReconstructionCoordinator reconstructionCoordinator;
-  private ECReconstructionCommandInfo reconstructionCommandInfo;
+  private final ECReconstructionCoordinator reconstructionCoordinator;
+  private final ECReconstructionCommandInfo reconstructionCommandInfo;
+  private long deadlineMsSinceEpoch = 0;
+  private final Clock clock;
 
   public ECReconstructionCoordinatorTask(
       ECReconstructionCoordinator coordinator,
       ECReconstructionCommandInfo reconstructionCommandInfo,
       ConcurrentHashMap.KeySetView<Object, Boolean>
-          inprogressReconstructionCoordinatorCounter) {
+          inprogressReconstructionCoordinatorCounter,
+      Clock clock) {
     this.reconstructionCoordinator = coordinator;
     this.reconstructionCommandInfo = reconstructionCommandInfo;
     this.inprogressCounter = inprogressReconstructionCoordinatorCounter;
+    this.clock = clock;
   }
 
   @Override
@@ -69,6 +74,15 @@ public class ECReconstructionCoordinatorTask implements 
Runnable {
           containerID);
     }
     try {
+      if (reconstructionCommandInfo.getDeadline() > 0
+          && clock.millis() > reconstructionCommandInfo.getDeadline()) {
+        LOG.info("Ignoring this reconstruct container command for container" +
+                " {} since the current time {}ms is past the deadline {}ms",
+            containerID, clock.millis(),
+            reconstructionCommandInfo.getDeadline());
+        return;
+      }
+
       SortedMap<Integer, DatanodeDetails> sourceNodeMap =
           reconstructionCommandInfo.getSources().stream().collect(Collectors
               .toMap(DatanodeDetailsAndReplicaIndex::getReplicaIndex,
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
index e2de7ac695..e36636d693 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
@@ -23,6 +23,7 @@ import 
org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Clock;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -39,6 +40,7 @@ public class ECReconstructionSupervisor implements Closeable {
   private final StateContext context;
   private final ExecutorService executor;
   private final ECReconstructionCoordinator reconstructionCoordinator;
+  private final Clock clock;
   /**
    * how many coordinator tasks currently being running.
    */
@@ -47,18 +49,19 @@ public class ECReconstructionSupervisor implements 
Closeable {
 
   public ECReconstructionSupervisor(ContainerSet containerSet,
       StateContext context, ExecutorService executor,
-      ECReconstructionCoordinator coordinator) {
+      ECReconstructionCoordinator coordinator, Clock clock) {
     this.containerSet = containerSet;
     this.context = context;
     this.executor = executor;
     this.reconstructionCoordinator = coordinator;
     this.inProgressReconstrucionCoordinatorCounter =
         ConcurrentHashMap.newKeySet();
+    this.clock = clock;
   }
 
   public ECReconstructionSupervisor(ContainerSet containerSet,
       StateContext context, int poolSize,
-      ECReconstructionCoordinator coordinator) {
+      ECReconstructionCoordinator coordinator, Clock clock) {
     // TODO: ReplicationSupervisor and this class can be refactored to have a
     //  common interface.
     this(containerSet, context,
@@ -66,7 +69,7 @@ public class ECReconstructionSupervisor implements Closeable {
             new LinkedBlockingQueue<>(),
             new ThreadFactoryBuilder().setDaemon(true)
                 .setNameFormat("ECContainerReconstructionThread-%d").build()),
-        coordinator);
+        coordinator, clock);
   }
 
   public void stop() {
@@ -86,7 +89,7 @@ public class ECReconstructionSupervisor implements Closeable {
         .add(taskInfo.getContainerID())) {
       executor.execute(
           new ECReconstructionCoordinatorTask(getReconstructionCoordinator(),
-              taskInfo, inProgressReconstrucionCoordinatorCounter));
+              taskInfo, inProgressReconstrucionCoordinatorCounter, clock));
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 5432656e03..444ab303f5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
+import java.time.Clock;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap.KeySetView;
 import java.util.concurrent.ExecutorService;
@@ -49,10 +50,12 @@ public class ReplicationSupervisor {
   private final ContainerReplicator replicator;
   private final ExecutorService executor;
   private final StateContext context;
+  private final Clock clock;
 
   private final AtomicLong requestCounter = new AtomicLong();
   private final AtomicLong successCounter = new AtomicLong();
   private final AtomicLong failureCounter = new AtomicLong();
+  private final AtomicLong timeoutCounter = new AtomicLong();
 
   /**
    * A set of container IDs that are currently being downloaded
@@ -64,35 +67,27 @@ public class ReplicationSupervisor {
   @VisibleForTesting
   ReplicationSupervisor(
       ContainerSet containerSet, StateContext context,
-      ContainerReplicator replicator, ExecutorService executor) {
+      ContainerReplicator replicator, ExecutorService executor,
+      Clock clock) {
     this.containerSet = containerSet;
     this.replicator = replicator;
     this.containersInFlight = ConcurrentHashMap.newKeySet();
     this.executor = executor;
     this.context = context;
+    this.clock = clock;
   }
 
   public ReplicationSupervisor(
       ContainerSet containerSet, StateContext context,
-      ContainerReplicator replicator, ReplicationConfig replicationConfig) {
-    this(containerSet, context, replicator,
-        replicationConfig.getReplicationMaxStreams());
-  }
-
-  public ReplicationSupervisor(
-      ContainerSet containerSet, StateContext context,
-      ContainerReplicator replicator, int poolSize) {
+      ContainerReplicator replicator, ReplicationConfig replicationConfig,
+      Clock clock) {
     this(containerSet, context, replicator, new ThreadPoolExecutor(
-        poolSize, poolSize, 60, TimeUnit.SECONDS,
+        replicationConfig.getReplicationMaxStreams(),
+        replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS,
         new LinkedBlockingQueue<>(),
         new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("ContainerReplicationThread-%d")
-            .build()));
-  }
-
-  public ReplicationSupervisor(ContainerSet containerSet,
-      ContainerReplicator replicator, int poolSize) {
-    this(containerSet, null, replicator, poolSize);
+            .build()), clock);
   }
 
   /**
@@ -148,6 +143,14 @@ public class ReplicationSupervisor {
       try {
         requestCounter.incrementAndGet();
 
+        if (task.getDeadline() > 0 && clock.millis() > task.getDeadline()) {
+          LOG.info("Ignoring this replicate container command for container" +
+              " {} since the current time {}ms is past the deadline {}ms",
+              containerId, clock.millis(), task.getDeadline());
+          timeoutCounter.incrementAndGet();
+          return;
+        }
+
         if (context != null) {
           DatanodeDetails dn = context.getParent().getDatanodeDetails();
           if (dn.getPersistedOpState() !=
@@ -206,4 +209,9 @@ public class ReplicationSupervisor {
   public long getReplicationFailureCount() {
     return failureCounter.get();
   }
+
+  public long getReplicationTimeoutCount() {
+    return timeoutCounter.get();
+  }
+
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
index df48abda4f..0576308bd2 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
@@ -66,6 +66,9 @@ public class ReplicationSupervisorMetrics implements 
MetricsSource {
             supervisor.getQueueSize())
         .addGauge(Interns.info("numRequestedReplications",
             "Number of requested replications"),
-            supervisor.getReplicationRequestCount());
+            supervisor.getReplicationRequestCount())
+        .addGauge(Interns.info("numTimeoutReplications",
+            "Number of replication requests timed out before being processed"),
+            supervisor.getReplicationTimeoutCount());
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
index e6e0d0526b..b194c2e207 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Objects;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 
 /**
  * The task to download a container from the sources.
@@ -32,21 +33,39 @@ public class ReplicationTask {
 
   private final long containerId;
 
-  private List<DatanodeDetails> sources;
+  private final List<DatanodeDetails> sources;
 
   private final Instant queued = Instant.now();
 
+  private long deadlineMsSinceEpoch = 0;
+
   /**
    * Counter for the transferred bytes.
    */
   private long transferredBytes;
 
-  public ReplicationTask(
+  public ReplicationTask(ReplicateContainerCommand cmd) {
+    this.containerId = cmd.getContainerID();
+    this.sources = cmd.getSourceDatanodes();
+    this.deadlineMsSinceEpoch = cmd.getDeadline();
+  }
+
+  /**
+   * Intended to only be used in tests.
+   */
+  protected ReplicationTask(
       long containerId,
       List<DatanodeDetails> sources
   ) {
-    this.containerId = containerId;
-    this.sources = sources;
+    this(new ReplicateContainerCommand(containerId, sources));
+  }
+
+  /**
+   * Returns any deadline set on this task, in milliseconds since the epoch.
+   * A returned value of zero indicates no deadline.
+   */
+  public long getDeadline() {
+    return deadlineMsSinceEpoch;
   }
 
   @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 744118e301..ab214ef2f6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -38,6 +38,8 @@ public abstract class SCMCommand<T extends Message> implements
 
   private String encodedToken = "";
 
+  private long deadlineMsSinceEpoch = 0;
+
   SCMCommand() {
     this.id = HddsIdFactory.getLongId();
   }
@@ -88,4 +90,37 @@ public abstract class SCMCommand<T extends Message> 
implements
   public void setEncodedToken(String encodedToken) {
     this.encodedToken = encodedToken;
   }
+
+  /**
+   * Allows a deadline to be set on the command. The deadline is set as the
+   * milliseconds since the epoch when the command must have been completed by.
+   * It is up to the code processing the command to enforce the deadline by
+   * calling the hasExpired() method, and the code sending the command to set
+   * the deadline. The default deadline is zero, which means no deadline.
+   * @param deadlineMs The ms since epoch when the command must have completed
+   *                   by.
+   */
+  public void setDeadline(long deadlineMs) {
+    this.deadlineMsSinceEpoch = deadlineMs;
+  }
+
+  /**
+   * @return The deadline set for this command, or zero if no command has been
+   *         set.
+   */
+  public long getDeadline() {
+    return deadlineMsSinceEpoch;
+  }
+
+  /**
+   * If a deadline has been set to a non zero value, test if the current time
+   * passed is beyond the deadline or not.
+   * @param currentEpochMs current time in milliseconds since the epoch.
+   * @return false if there is no deadline, or it has not expired. True if the
+   *         set deadline has expired.
+   */
+  public boolean hasExpired(long currentEpochMs) {
+    return deadlineMsSinceEpoch > 0 &&
+        currentEpochMs > deadlineMsSinceEpoch;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
new file mode 100644
index 0000000000..282e00f734
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.ozone.test.TestClock;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+
+import static 
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import static org.mockito.Mockito.times;
+
+/**
+ * Test for the DeleteContainerCommandHandler.
+ */
+public class TestDeleteContainerCommandHandler {
+
+  @Test
+  public void testExpiredCommandsAreNotProcessed() throws IOException {
+    TestClock clock = new TestClock(Instant.now(), ZoneId.systemDefault());
+    DeleteContainerCommandHandler handler =
+        new DeleteContainerCommandHandler(clock, newDirectExecutorService());
+    OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class);
+    ContainerController controller = Mockito.mock(ContainerController.class);
+    Mockito.when(ozoneContainer.getController()).thenReturn(controller);
+
+    DeleteContainerCommand command1 = new DeleteContainerCommand(1L);
+    command1.setDeadline(clock.millis() + 10000);
+    DeleteContainerCommand command2 = new DeleteContainerCommand(2L);
+    command2.setDeadline(clock.millis() + 20000);
+    DeleteContainerCommand command3 = new DeleteContainerCommand(3L);
+    // No deadline on the 3rd command
+
+    clock.fastForward(15000);
+    handler.handle(command1, ozoneContainer, null, null);
+    Assertions.assertEquals(1, handler.getTimeoutCount());
+    handler.handle(command2, ozoneContainer, null, null);
+    handler.handle(command3, ozoneContainer, null, null);
+    Assertions.assertEquals(1, handler.getTimeoutCount());
+    Assertions.assertEquals(3, handler.getInvocationCount());
+    Mockito.verify(controller, times(0))
+        .deleteContainer(1L, false);
+    Mockito.verify(controller, times(1))
+        .deleteContainer(2L, false);
+    Mockito.verify(controller, times(1))
+        .deleteContainer(3L, false);
+  }
+
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
index c40ceb2ea3..f0b6ff3c36 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
@@ -21,20 +21,39 @@ import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.TestClock;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
 import java.util.SortedMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
 
+import static 
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+
 /**
  * Tests the ECReconstructionSupervisor.
  */
 public class TestECReconstructionSupervisor {
 
+  private TestClock clock;
+
+  @BeforeEach
+  public void setup() {
+    clock = new TestClock(Instant.now(), ZoneId.systemDefault());
+  }
+
+
   @Test
   public void testAddTaskShouldExecuteTheGivenTask()
       throws InterruptedException, TimeoutException, IOException {
@@ -58,15 +77,61 @@ public class TestECReconstructionSupervisor {
                 super.reconstructECContainerGroup(containerID, repConfig,
                     sourceNodeMap, targetNodeMap);
               }
-            }) {
+            }, clock) {
         };
-    supervisor.addTask(
-        new ECReconstructionCommandInfo(1, new ECReplicationConfig(3, 2),
-            new byte[0], ImmutableList.of(), ImmutableList.of()));
+    ReconstructECContainersCommand command = new 
ReconstructECContainersCommand(
+        1L, ImmutableList.of(), ImmutableList.of(), new byte[0],
+        new ECReplicationConfig(3, 2));
+    supervisor.addTask(new ECReconstructionCommandInfo(command));
     runnableInvoked.await();
     Assertions.assertEquals(1, supervisor.getInFlightReplications());
     holdProcessing.countDown();
     GenericTestUtils
         .waitFor(() -> supervisor.getInFlightReplications() == 0, 100, 15000);
   }
+
+  @Test
+  public void testTasksWithDeadlineExceededAreNotRun() throws IOException {
+    ECReconstructionCoordinator coordinator =
+        Mockito.mock(ECReconstructionCoordinator.class);
+    ECReconstructionSupervisor supervisor =
+        new ECReconstructionSupervisor(null, null,
+            newDirectExecutorService(), coordinator, clock);
+
+    ReconstructECContainersCommand command = new 
ReconstructECContainersCommand(
+        1L, ImmutableList.of(), ImmutableList.of(), new byte[0],
+        new ECReplicationConfig(3, 2));
+    ECReconstructionCommandInfo task1 =
+        new ECReconstructionCommandInfo(command);
+
+    command = new ReconstructECContainersCommand(
+        2L, ImmutableList.of(), ImmutableList.of(), new byte[0],
+        new ECReplicationConfig(3, 2));
+    command.setDeadline(clock.millis() + 10000);
+    ECReconstructionCommandInfo task2 =
+        new ECReconstructionCommandInfo(command);
+
+    command = new ReconstructECContainersCommand(
+        3L, ImmutableList.of(), ImmutableList.of(), new byte[0],
+        new ECReplicationConfig(3, 2));
+    command.setDeadline(clock.millis() + 20000);
+    ECReconstructionCommandInfo task3 =
+        new ECReconstructionCommandInfo(command);
+
+    clock.fastForward(15000);
+    supervisor.addTask(task1);
+    supervisor.addTask(task2);
+    supervisor.addTask(task3);
+
+    // No deadline for container 1, it should run.
+    Mockito.verify(coordinator, times(1))
+        .reconstructECContainerGroup(eq(1L), any(), any(), any());
+    // Deadline passed for container 2, it should not run.
+    Mockito.verify(coordinator, times(0))
+        .reconstructECContainerGroup(eq(2L), any(), any(), any());
+    // Deadline not passed for container 3, it should run.
+    Mockito.verify(coordinator, times(1))
+        .reconstructECContainerGroup(eq(3L), any(), any(), any());
+  }
+
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
index 457f0a58ae..1ec6fecd40 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -24,8 +25,10 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.ozone.common.MonotonicClock;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 
 import org.junit.jupiter.api.Assertions;
@@ -43,6 +46,9 @@ public class ReplicationSupervisorScheduling {
 
   @Test
   public void test() throws InterruptedException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    ReplicationServer.ReplicationConfig replicationConfig
+        = conf.getObject(ReplicationServer.ReplicationConfig.class);
     List<DatanodeDetails> datanodes = new ArrayList<>();
     datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
     datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
@@ -69,7 +75,7 @@ public class ReplicationSupervisorScheduling {
 
     ContainerSet cs = new ContainerSet(1000);
 
-    ReplicationSupervisor rs = new ReplicationSupervisor(cs,
+    ReplicationSupervisor rs = new ReplicationSupervisor(cs, null,
 
         //simplified executor emulating the current sequential download +
         //import.
@@ -107,7 +113,7 @@ public class ReplicationSupervisorScheduling {
             }
           }
 
-        }, 10);
+        }, replicationConfig, new MonotonicClock(ZoneId.systemDefault()));
 
     final long start = System.currentTimeMillis();
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index 51f78440bb..6ca5085478 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.container.replication;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.ZoneId;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.AbstractExecutorService;
@@ -38,7 +40,9 @@ import 
org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.TestClock;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -76,6 +80,7 @@ public class TestReplicationSupervisor {
   private ContainerSet set;
 
   private final ContainerLayoutVersion layout;
+  private TestClock clock;
 
   public TestReplicationSupervisor(ContainerLayoutVersion layout) {
     this.layout = layout;
@@ -88,6 +93,7 @@ public class TestReplicationSupervisor {
 
   @Before
   public void setUp() throws Exception {
+    clock = new TestClock(Instant.now(), ZoneId.systemDefault());
     set = new ContainerSet(1000);
   }
 
@@ -231,7 +237,7 @@ public class TestReplicationSupervisor {
   public void testDownloadAndImportReplicatorFailure() {
     ReplicationSupervisor supervisor =
         new ReplicationSupervisor(set, null, mutableReplicator,
-            newDirectExecutorService());
+            newDirectExecutorService(), clock);
 
     // Mock to fetch an exception in the importContainer method.
     SimpleContainerDownloader moc =
@@ -256,6 +262,39 @@ public class TestReplicationSupervisor {
         .contains("Container 1 replication was unsuccessful."));
   }
 
+  @Test
+  public void testTaskBeyondDeadline() {
+    ReplicationSupervisor supervisor =
+        supervisorWithReplicator(FakeReplicator::new);
+
+    ReplicateContainerCommand cmd = new ReplicateContainerCommand(1L,
+        emptyList());
+    cmd.setDeadline(clock.millis() + 10000);
+    ReplicationTask task1 = new ReplicationTask(cmd);
+    cmd = new ReplicateContainerCommand(2L, emptyList());
+    cmd.setDeadline(clock.millis() + 20000);
+    ReplicationTask task2 = new ReplicationTask(cmd);
+    cmd = new ReplicateContainerCommand(3L, emptyList());
+    // No deadline set
+    ReplicationTask task3 = new ReplicationTask(cmd);
+    // no deadline set
+
+    clock.fastForward(15000);
+
+    supervisor.addTask(task1);
+    supervisor.addTask(task2);
+    supervisor.addTask(task3);
+
+    Assert.assertEquals(3, supervisor.getReplicationRequestCount());
+    Assert.assertEquals(2, supervisor.getReplicationSuccessCount());
+    Assert.assertEquals(0, supervisor.getReplicationFailureCount());
+    Assert.assertEquals(0, supervisor.getInFlightReplications());
+    Assert.assertEquals(0, supervisor.getQueueSize());
+    Assert.assertEquals(1, supervisor.getReplicationTimeoutCount());
+    Assert.assertEquals(2, set.containerCount());
+
+  }
+
   private ReplicationSupervisor supervisorWithReplicator(
       Function<ReplicationSupervisor, ContainerReplicator> replicatorFactory) {
     return supervisorWith(replicatorFactory, newDirectExecutorService());
@@ -265,7 +304,8 @@ public class TestReplicationSupervisor {
       Function<ReplicationSupervisor, ContainerReplicator> replicatorFactory,
       ExecutorService executor) {
     ReplicationSupervisor supervisor =
-        new ReplicationSupervisor(set, null, mutableReplicator, executor);
+        new ReplicationSupervisor(set, null, mutableReplicator, executor,
+            clock);
     replicatorRef.set(replicatorFactory.apply(supervisor));
     return supervisor;
   }
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 6465eeb40b..7f8476d995 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -343,6 +343,7 @@ message SCMCommandProto {
   // SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM.
   optional int64 term = 15;
   optional string encodedToken = 16;
+  optional int64 deadlineMsSinceEpoch = 17;
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index f78536912e..261803a3ba 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigType;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.PostConstruct;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
@@ -417,6 +418,8 @@ public class ReplicationManager implements SCMService {
     LOG.info("Sending command of type {} for container {} to {}",
         command.getType(), containerInfo, target);
     command.setTerm(getScmTerm());
+    command.setDeadline(clock.millis() +
+        Math.round(rmConf.eventTimeout * rmConf.commandDeadlineFactor));
     final CommandForDatanode<?> datanodeCommand =
         new CommandForDatanode<>(target.getUuid(), command);
     eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
@@ -765,6 +768,30 @@ public class ReplicationManager implements SCMService {
       this.eventTimeout = timeout.toMillis();
     }
 
+    /**
+     * Deadline which should be set on commands sent from ReplicationManager
+     * to the datanodes, as a percentage of the event.timeout. If the command
+     * has not been processed on the datanode by this time, it will be dropped
+     * by the datanode and Replication Manager will need to resend it.
+     */
+    @Config(key = "command.deadline.factor",
+        type = ConfigType.DOUBLE,
+        defaultValue = "0.9",
+        tags = {SCM, OZONE},
+        description = "Fraction of the hdds.scm.replication.event.timeout "
+            + "from the current time which should be set as a deadline for "
+            + "commands sent from ReplicationManager to datanodes. "
+            + "Commands which are not processed before this deadline will be "
+            + "dropped by the datanodes. Should be a value > 0 and <= 1.")
+    private double commandDeadlineFactor = 0.9;
+    public double getCommandDeadlineFactor() {
+      return commandDeadlineFactor;
+    }
+
+    public void setCommandDeadlineFactor(double val) {
+      commandDeadlineFactor = val;
+    }
+
     /**
      * The number of container replica which must be available for a node to
      * enter maintenance.
@@ -811,6 +838,15 @@ public class ReplicationManager implements SCMService {
     )
     private int maintenanceRemainingRedundancy = 1;
 
+    @PostConstruct
+    public void validate() {
+      if (!(commandDeadlineFactor > 0) || (commandDeadlineFactor > 1)) {
+        throw new IllegalArgumentException("command.deadline.factor is set to "
+            + commandDeadlineFactor
+            + " and must be greater than 0 and less than equal to 1");
+      }
+    }
+
     public void setMaintenanceRemainingRedundancy(int redundancy) {
       this.maintenanceRemainingRedundancy = redundancy;
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index f2654fac02..467adbe23a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -272,7 +272,7 @@ public class SCMDatanodeProtocolServer implements
       SCMHeartbeatRequestProto heartbeat) throws IOException, TimeoutException 
{
     List<SCMCommandProto> cmdResponses = new ArrayList<>();
     for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) {
-      cmdResponses.add(getCommandResponse(cmd));
+      cmdResponses.add(getCommandResponse(cmd, scm));
     }
     boolean auditSuccess = true;
     Map<String, String> auditMap = Maps.newHashMap();
@@ -305,14 +305,17 @@ public class SCMDatanodeProtocolServer implements
    * @throws IOException
    */
   @VisibleForTesting
-  public SCMCommandProto getCommandResponse(SCMCommand cmd)
-      throws IOException, TimeoutException {
+  public static SCMCommandProto getCommandResponse(SCMCommand cmd,
+      OzoneStorageContainerManager scm) throws IOException, TimeoutException {
     SCMCommandProto.Builder builder = SCMCommandProto.newBuilder()
         .setEncodedToken(cmd.getEncodedToken());
 
     // In HA mode, it is the term of current leader SCM.
     // In non-HA mode, it is the default value 0.
     builder.setTerm(cmd.getTerm());
+    // The default deadline is 0, which means no deadline. Individual commands
+    // may have a deadline set.
+    builder.setDeadlineMsSinceEpoch(cmd.getDeadline());
 
     switch (cmd.getType()) {
     case reregisterCommand:
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 82f971be79..92ec499570 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -477,6 +477,15 @@ public class TestReplicationManager {
 
     replicationManager.sendDatanodeCommand(command, containerInfo, target);
 
+    // Ensure that the command deadline is set to current time
+    // + evenTime * factor
+    ReplicationManager.ReplicationManagerConfiguration rmConf = configuration
+        .getObject(ReplicationManager.ReplicationManagerConfiguration.class);
+    long expectedDeadline = clock.millis() +
+        Math.round(rmConf.getEventTimeout() *
+            rmConf.getCommandDeadlineFactor());
+    Assert.assertEquals(expectedDeadline, command.getDeadline());
+
     List<ContainerReplicaOp> ops = containerReplicaPendingOps.getPendingOps(
         containerInfo.containerID());
     Mockito.verify(eventPublisher).fireEvent(any(), any());
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
new file mode 100644
index 0000000000..cfa34fc445
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test for StorageContainerDatanodeProtocolProtos.
+ */
+public class TestSCMDatanodeProtocolServer {
+
+  @Test
+  public void ensureTermAndDeadlineOnCommands()
+      throws IOException, TimeoutException {
+    OzoneStorageContainerManager scm =
+        Mockito.mock(OzoneStorageContainerManager.class);
+
+    ReplicateContainerCommand command = new ReplicateContainerCommand(1L,
+        Collections.emptyList());
+    command.setTerm(5L);
+    command.setDeadline(1234L);
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto proto =
+        SCMDatanodeProtocolServer.getCommandResponse(command, scm);
+
+    Assert.assertEquals(StorageContainerDatanodeProtocolProtos.SCMCommandProto
+        .Type.replicateContainerCommand, proto.getCommandType());
+    Assert.assertEquals(5L, proto.getTerm());
+    Assert.assertEquals(1234L, proto.getDeadlineMsSinceEpoch());
+  }
+}
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index a7d332e783..ebfa2d8637 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.common.MonotonicClock;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -37,9 +38,11 @@ import 
org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
 import 
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
+import org.apache.hadoop.ozone.container.replication.ReplicationServer;
 import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
 import org.apache.hadoop.ozone.container.replication.ReplicationTask;
 import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.jetbrains.annotations.NotNull;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Option;
@@ -48,6 +51,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -123,8 +127,9 @@ public class ClosedContainerReplicator extends 
BaseFreonGenerator implements
         //if datanode is specified, replicate only container if it has a
         //replica.
         if (datanode.isEmpty() || datanodeUUIDs.contains(datanode)) {
-          replicationTasks.add(new ReplicationTask(container.getContainerID(),
-              datanodesWithContainer));
+          replicationTasks.add(new ReplicationTask(
+              new ReplicateContainerCommand(container.getContainerID(),
+                  datanodesWithContainer)));
         }
       }
 
@@ -203,7 +208,11 @@ public class ClosedContainerReplicator extends 
BaseFreonGenerator implements
             new SimpleContainerDownloader(conf, null),
             new TarContainerPacker());
 
-    supervisor = new ReplicationSupervisor(containerSet, replicator, 10);
+    ReplicationServer.ReplicationConfig replicationConfig
+        = conf.getObject(ReplicationServer.ReplicationConfig.class);
+    supervisor = new ReplicationSupervisor(containerSet, null,
+        replicator, replicationConfig,
+        new MonotonicClock(ZoneId.systemDefault()));
   }
 
   private void replicateContainer(long counter) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to