This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e4a1993e79 HDDS-7618. Replication Commands should timeout if not
processed on datanodes in time (#4069)
e4a1993e79 is described below
commit e4a1993e798bc843a1f124e0e348a99735b3e998
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Tue Dec 13 09:41:42 2022 +0000
HDDS-7618. Replication Commands should timeout if not processed on
datanodes in time (#4069)
---
.../common/statemachine/DatanodeStateMachine.java | 10 ++-
.../DeleteContainerCommandHandler.java | 26 +++++++-
.../ReconstructECContainersCommandHandler.java | 6 +-
.../ReplicateContainerCommandHandler.java | 3 +-
.../states/endpoint/HeartbeatEndpointTask.java | 4 ++
.../ECReconstructionCommandInfo.java | 22 ++++---
.../ECReconstructionCoordinatorTask.java | 20 +++++-
.../reconstruction/ECReconstructionSupervisor.java | 11 ++--
.../replication/ReplicationSupervisor.java | 40 +++++++-----
.../replication/ReplicationSupervisorMetrics.java | 5 +-
.../container/replication/ReplicationTask.java | 27 ++++++--
.../hadoop/ozone/protocol/commands/SCMCommand.java | 35 +++++++++++
.../TestDeleteContainerCommandHandler.java | 71 +++++++++++++++++++++
.../TestECReconstructionSupervisor.java | 73 ++++++++++++++++++++--
.../ReplicationSupervisorScheduling.java | 10 ++-
.../replication/TestReplicationSupervisor.java | 44 ++++++++++++-
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 1 +
.../container/replication/ReplicationManager.java | 36 +++++++++++
.../hdds/scm/server/SCMDatanodeProtocolServer.java | 9 ++-
.../replication/TestReplicationManager.java | 9 +++
.../hdds/scm/TestSCMDatanodeProtocolServer.java | 54 ++++++++++++++++
.../ozone/freon/ClosedContainerReplicator.java | 15 ++++-
22 files changed, 468 insertions(+), 63 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 4d72bb317f..107dba186c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.common.statemachine;
import java.io.Closeable;
import java.io.IOException;
+import java.time.Clock;
+import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -40,6 +42,7 @@ import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
+import org.apache.hadoop.ozone.common.MonotonicClock;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
@@ -139,6 +142,7 @@ public class DatanodeStateMachine implements Closeable {
this.conf = conf;
this.datanodeDetails = datanodeDetails;
+ Clock clock = new MonotonicClock(ZoneId.systemDefault());
// Expected to be initialized already.
layoutStorage = new DatanodeLayoutStorage(conf,
datanodeDetails.getUuidString());
@@ -180,7 +184,7 @@ public class DatanodeStateMachine implements Closeable {
conf.getObject(ReplicationConfig.class);
supervisor =
new ReplicationSupervisor(container.getContainerSet(), context,
- replicatorMetrics, replicationConfig);
+ replicatorMetrics, replicationConfig, clock);
replicationSupervisorMetrics =
ReplicationSupervisorMetrics.create(supervisor);
@@ -193,7 +197,7 @@ public class DatanodeStateMachine implements Closeable {
ecReconstructionSupervisor =
new ECReconstructionSupervisor(container.getContainerSet(), context,
replicationConfig.getReplicationMaxStreams(),
- ecReconstructionCoordinator);
+ ecReconstructionCoordinator, clock);
// When we add new handlers just adding a new handler here should do the
@@ -207,7 +211,7 @@ public class DatanodeStateMachine implements Closeable {
.addHandler(new ReconstructECContainersCommandHandler(conf,
ecReconstructionSupervisor))
.addHandler(new DeleteContainerCommandHandler(
- dnConf.getContainerDeleteThreads()))
+ dnConf.getContainerDeleteThreads(), clock))
.addHandler(new ClosePipelineCommandHandler())
.addHandler(new CreatePipelineCommandHandler(conf))
.addHandler(new SetNodeOperationalStateCommandHandler(conf))
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
index 58ad2d18e4..b209e6ee34 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Clock;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@@ -48,15 +49,22 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
LoggerFactory.getLogger(DeleteContainerCommandHandler.class);
private final AtomicInteger invocationCount = new AtomicInteger(0);
+ private final AtomicInteger timeoutCount = new AtomicInteger(0);
private final AtomicLong totalTime = new AtomicLong(0);
private final ExecutorService executor;
+ private final Clock clock;
- public DeleteContainerCommandHandler(int threadPoolSize) {
- this.executor = Executors.newFixedThreadPool(
+ public DeleteContainerCommandHandler(int threadPoolSize, Clock clock) {
+ this(clock, Executors.newFixedThreadPool(
threadPoolSize, new ThreadFactoryBuilder()
- .setNameFormat("DeleteContainerThread-%d").build());
+ .setNameFormat("DeleteContainerThread-%d").build()));
}
+ protected DeleteContainerCommandHandler(Clock clock,
+ ExecutorService executor) {
+ this.executor = executor;
+ this.clock = clock;
+ }
@Override
public void handle(final SCMCommand command,
final OzoneContainer ozoneContainer,
@@ -69,6 +77,14 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
final long startTime = Time.monotonicNow();
invocationCount.incrementAndGet();
try {
+ if (command.hasExpired(clock.millis())) {
+ LOG.info("Not processing the delete container command for " +
+ "container {} as the current time {}ms is after the command " +
+ "deadline {}ms", deleteContainerCommand.getContainerID(),
+ clock.millis(), command.getDeadline());
+ timeoutCount.incrementAndGet();
+ return;
+ }
controller.deleteContainer(deleteContainerCommand.getContainerID(),
deleteContainerCommand.isForce());
} catch (IOException e) {
@@ -94,6 +110,10 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
return this.invocationCount.get();
}
+ public int getTimeoutCount() {
+ return this.timeoutCount.get();
+ }
+
@Override
public long getAverageRunTime() {
final int invocations = invocationCount.get();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
index 57d4d16f8a..c6abfc27c3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
@@ -47,11 +47,7 @@ public class ReconstructECContainersCommandHandler
implements CommandHandler {
ReconstructECContainersCommand ecContainersCommand =
(ReconstructECContainersCommand) command;
ECReconstructionCommandInfo reconstructionCommandInfo =
- new ECReconstructionCommandInfo(ecContainersCommand.getContainerID(),
- ecContainersCommand.getEcReplicationConfig(),
- ecContainersCommand.getMissingContainerIndexes(),
- ecContainersCommand.getSources(),
- ecContainersCommand.getTargetDatanodes());
+ new ECReconstructionCommandInfo(ecContainersCommand);
this.supervisor.addTask(reconstructionCommandInfo);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index 44c783846a..df589e287d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -71,7 +71,8 @@ public class ReplicateContainerCommandHandler implements
CommandHandler {
"Replication command is received for container %s "
+ "without source datanodes.", containerID);
- supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes));
+ ReplicationTask task = new ReplicationTask(replicateCommand);
+ supervisor.addTask(task);
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index ccb0e8b7d7..a694ba00be 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -426,6 +426,7 @@ public class HeartbeatEndpointTask
* Common processing for SCM commands.
* - set term
* - set encoded token
+ * - any deadline which is relevant to the command
* - add to context's queue
*/
private void processCommonCommand(
@@ -436,6 +437,9 @@ public class HeartbeatEndpointTask
if (response.hasEncodedToken()) {
cmd.setEncodedToken(response.getEncodedToken());
}
+ if (response.hasDeadlineMsSinceEpoch()) {
+ cmd.setDeadline(response.getDeadlineMsSinceEpoch());
+ }
context.addCommand(cmd);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
index c95f9646f8..8a4d26b55c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java
@@ -35,17 +35,21 @@ public class ECReconstructionCommandInfo {
private List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
sources;
private List<DatanodeDetails> targetDatanodes;
+ private long deadlineMsSinceEpoch = 0;
- public ECReconstructionCommandInfo(long containerID,
- ECReplicationConfig ecReplicationConfig, byte[] missingContainerIndexes,
- List<DatanodeDetailsAndReplicaIndex> sources,
- List<DatanodeDetails> targetDatanodes) {
- this.containerID = containerID;
- this.ecReplicationConfig = ecReplicationConfig;
+ public ECReconstructionCommandInfo(ReconstructECContainersCommand cmd) {
+ this.containerID = cmd.getContainerID();
+ this.ecReplicationConfig = cmd.getEcReplicationConfig();
this.missingContainerIndexes =
- Arrays.copyOf(missingContainerIndexes, missingContainerIndexes.length);
- this.sources = sources;
- this.targetDatanodes = targetDatanodes;
+ Arrays.copyOf(cmd.getMissingContainerIndexes(),
+ cmd.getMissingContainerIndexes().length);
+ this.sources = cmd.getSources();
+ this.targetDatanodes = cmd.getTargetDatanodes();
+ this.deadlineMsSinceEpoch = cmd.getDeadline();
+ }
+
+ public long getDeadline() {
+ return deadlineMsSinceEpoch;
}
public long getContainerID() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
index e0aa14419a..4e46860c88 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Clock;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -36,17 +37,21 @@ public class ECReconstructionCoordinatorTask implements
Runnable {
static final Logger LOG =
LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class);
private final ConcurrentHashMap.KeySetView<Object, Boolean>
inprogressCounter;
- private ECReconstructionCoordinator reconstructionCoordinator;
- private ECReconstructionCommandInfo reconstructionCommandInfo;
+ private final ECReconstructionCoordinator reconstructionCoordinator;
+ private final ECReconstructionCommandInfo reconstructionCommandInfo;
+ private long deadlineMsSinceEpoch = 0;
+ private final Clock clock;
public ECReconstructionCoordinatorTask(
ECReconstructionCoordinator coordinator,
ECReconstructionCommandInfo reconstructionCommandInfo,
ConcurrentHashMap.KeySetView<Object, Boolean>
- inprogressReconstructionCoordinatorCounter) {
+ inprogressReconstructionCoordinatorCounter,
+ Clock clock) {
this.reconstructionCoordinator = coordinator;
this.reconstructionCommandInfo = reconstructionCommandInfo;
this.inprogressCounter = inprogressReconstructionCoordinatorCounter;
+ this.clock = clock;
}
@Override
@@ -69,6 +74,15 @@ public class ECReconstructionCoordinatorTask implements
Runnable {
containerID);
}
try {
+ if (reconstructionCommandInfo.getDeadline() > 0
+ && clock.millis() > reconstructionCommandInfo.getDeadline()) {
+ LOG.info("Ignoring this reconstruct container command for container" +
+ " {} since the current time {}ms is past the deadline {}ms",
+ containerID, clock.millis(),
+ reconstructionCommandInfo.getDeadline());
+ return;
+ }
+
SortedMap<Integer, DatanodeDetails> sourceNodeMap =
reconstructionCommandInfo.getSources().stream().collect(Collectors
.toMap(DatanodeDetailsAndReplicaIndex::getReplicaIndex,
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
index e2de7ac695..e36636d693 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
@@ -23,6 +23,7 @@ import
org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import java.io.Closeable;
import java.io.IOException;
+import java.time.Clock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -39,6 +40,7 @@ public class ECReconstructionSupervisor implements Closeable {
private final StateContext context;
private final ExecutorService executor;
private final ECReconstructionCoordinator reconstructionCoordinator;
+ private final Clock clock;
/**
* how many coordinator tasks currently being running.
*/
@@ -47,18 +49,19 @@ public class ECReconstructionSupervisor implements
Closeable {
public ECReconstructionSupervisor(ContainerSet containerSet,
StateContext context, ExecutorService executor,
- ECReconstructionCoordinator coordinator) {
+ ECReconstructionCoordinator coordinator, Clock clock) {
this.containerSet = containerSet;
this.context = context;
this.executor = executor;
this.reconstructionCoordinator = coordinator;
this.inProgressReconstrucionCoordinatorCounter =
ConcurrentHashMap.newKeySet();
+ this.clock = clock;
}
public ECReconstructionSupervisor(ContainerSet containerSet,
StateContext context, int poolSize,
- ECReconstructionCoordinator coordinator) {
+ ECReconstructionCoordinator coordinator, Clock clock) {
// TODO: ReplicationSupervisor and this class can be refactored to have a
// common interface.
this(containerSet, context,
@@ -66,7 +69,7 @@ public class ECReconstructionSupervisor implements Closeable {
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ECContainerReconstructionThread-%d").build()),
- coordinator);
+ coordinator, clock);
}
public void stop() {
@@ -86,7 +89,7 @@ public class ECReconstructionSupervisor implements Closeable {
.add(taskInfo.getContainerID())) {
executor.execute(
new ECReconstructionCoordinatorTask(getReconstructionCoordinator(),
- taskInfo, inProgressReconstrucionCoordinatorCounter));
+ taskInfo, inProgressReconstrucionCoordinatorCounter, clock));
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 5432656e03..444ab303f5 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.container.replication;
+import java.time.Clock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ExecutorService;
@@ -49,10 +50,12 @@ public class ReplicationSupervisor {
private final ContainerReplicator replicator;
private final ExecutorService executor;
private final StateContext context;
+ private final Clock clock;
private final AtomicLong requestCounter = new AtomicLong();
private final AtomicLong successCounter = new AtomicLong();
private final AtomicLong failureCounter = new AtomicLong();
+ private final AtomicLong timeoutCounter = new AtomicLong();
/**
* A set of container IDs that are currently being downloaded
@@ -64,35 +67,27 @@ public class ReplicationSupervisor {
@VisibleForTesting
ReplicationSupervisor(
ContainerSet containerSet, StateContext context,
- ContainerReplicator replicator, ExecutorService executor) {
+ ContainerReplicator replicator, ExecutorService executor,
+ Clock clock) {
this.containerSet = containerSet;
this.replicator = replicator;
this.containersInFlight = ConcurrentHashMap.newKeySet();
this.executor = executor;
this.context = context;
+ this.clock = clock;
}
public ReplicationSupervisor(
ContainerSet containerSet, StateContext context,
- ContainerReplicator replicator, ReplicationConfig replicationConfig) {
- this(containerSet, context, replicator,
- replicationConfig.getReplicationMaxStreams());
- }
-
- public ReplicationSupervisor(
- ContainerSet containerSet, StateContext context,
- ContainerReplicator replicator, int poolSize) {
+ ContainerReplicator replicator, ReplicationConfig replicationConfig,
+ Clock clock) {
this(containerSet, context, replicator, new ThreadPoolExecutor(
- poolSize, poolSize, 60, TimeUnit.SECONDS,
+ replicationConfig.getReplicationMaxStreams(),
+ replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ContainerReplicationThread-%d")
- .build()));
- }
-
- public ReplicationSupervisor(ContainerSet containerSet,
- ContainerReplicator replicator, int poolSize) {
- this(containerSet, null, replicator, poolSize);
+ .build()), clock);
}
/**
@@ -148,6 +143,14 @@ public class ReplicationSupervisor {
try {
requestCounter.incrementAndGet();
+ if (task.getDeadline() > 0 && clock.millis() > task.getDeadline()) {
+ LOG.info("Ignoring this replicate container command for container" +
+ " {} since the current time {}ms is past the deadline {}ms",
+ containerId, clock.millis(), task.getDeadline());
+ timeoutCounter.incrementAndGet();
+ return;
+ }
+
if (context != null) {
DatanodeDetails dn = context.getParent().getDatanodeDetails();
if (dn.getPersistedOpState() !=
@@ -206,4 +209,9 @@ public class ReplicationSupervisor {
public long getReplicationFailureCount() {
return failureCounter.get();
}
+
+ public long getReplicationTimeoutCount() {
+ return timeoutCounter.get();
+ }
+
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
index df48abda4f..0576308bd2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
@@ -66,6 +66,9 @@ public class ReplicationSupervisorMetrics implements
MetricsSource {
supervisor.getQueueSize())
.addGauge(Interns.info("numRequestedReplications",
"Number of requested replications"),
- supervisor.getReplicationRequestCount());
+ supervisor.getReplicationRequestCount())
+ .addGauge(Interns.info("numTimeoutReplications",
+ "Number of replication requests timed out before being processed"),
+ supervisor.getReplicationTimeoutCount());
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
index e6e0d0526b..b194c2e207 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Objects;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
/**
* The task to download a container from the sources.
@@ -32,21 +33,39 @@ public class ReplicationTask {
private final long containerId;
- private List<DatanodeDetails> sources;
+ private final List<DatanodeDetails> sources;
private final Instant queued = Instant.now();
+ private long deadlineMsSinceEpoch = 0;
+
/**
* Counter for the transferred bytes.
*/
private long transferredBytes;
- public ReplicationTask(
+ public ReplicationTask(ReplicateContainerCommand cmd) {
+ this.containerId = cmd.getContainerID();
+ this.sources = cmd.getSourceDatanodes();
+ this.deadlineMsSinceEpoch = cmd.getDeadline();
+ }
+
+ /**
+ * Intended to only be used in tests.
+ */
+ protected ReplicationTask(
long containerId,
List<DatanodeDetails> sources
) {
- this.containerId = containerId;
- this.sources = sources;
+ this(new ReplicateContainerCommand(containerId, sources));
+ }
+
+ /**
+ * Returns any deadline set on this task, in milliseconds since the epoch.
+ * A returned value of zero indicates no deadline.
+ */
+ public long getDeadline() {
+ return deadlineMsSinceEpoch;
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 744118e301..ab214ef2f6 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -38,6 +38,8 @@ public abstract class SCMCommand<T extends Message> implements
private String encodedToken = "";
+ private long deadlineMsSinceEpoch = 0;
+
SCMCommand() {
this.id = HddsIdFactory.getLongId();
}
@@ -88,4 +90,37 @@ public abstract class SCMCommand<T extends Message>
implements
public void setEncodedToken(String encodedToken) {
this.encodedToken = encodedToken;
}
+
+ /**
+ * Allows a deadline to be set on the command. The deadline is set as the
+ * milliseconds since the epoch when the command must have been completed by.
+ * It is up to the code processing the command to enforce the deadline by
+ * calling the hasExpired() method, and the code sending the command to set
+ * the deadline. The default deadline is zero, which means no deadline.
+ * @param deadlineMs The ms since epoch when the command must have completed
+ * by.
+ */
+ public void setDeadline(long deadlineMs) {
+ this.deadlineMsSinceEpoch = deadlineMs;
+ }
+
+ /**
+ * @return The deadline set for this command, or zero if no command has been
+ * set.
+ */
+ public long getDeadline() {
+ return deadlineMsSinceEpoch;
+ }
+
+ /**
+ * If a deadline has been set to a non zero value, test if the current time
+ * passed is beyond the deadline or not.
+ * @param currentEpochMs current time in milliseconds since the epoch.
+ * @return false if there is no deadline, or it has not expired. True if the
+ * set deadline has expired.
+ */
+ public boolean hasExpired(long currentEpochMs) {
+ return deadlineMsSinceEpoch > 0 &&
+ currentEpochMs > deadlineMsSinceEpoch;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
new file mode 100644
index 0000000000..282e00f734
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.ozone.test.TestClock;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+
+import static
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import static org.mockito.Mockito.times;
+
+/**
+ * Test for the DeleteContainerCommandHandler.
+ */
+public class TestDeleteContainerCommandHandler {
+
+ @Test
+ public void testExpiredCommandsAreNotProcessed() throws IOException {
+ TestClock clock = new TestClock(Instant.now(), ZoneId.systemDefault());
+ DeleteContainerCommandHandler handler =
+ new DeleteContainerCommandHandler(clock, newDirectExecutorService());
+ OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class);
+ ContainerController controller = Mockito.mock(ContainerController.class);
+ Mockito.when(ozoneContainer.getController()).thenReturn(controller);
+
+ DeleteContainerCommand command1 = new DeleteContainerCommand(1L);
+ command1.setDeadline(clock.millis() + 10000);
+ DeleteContainerCommand command2 = new DeleteContainerCommand(2L);
+ command2.setDeadline(clock.millis() + 20000);
+ DeleteContainerCommand command3 = new DeleteContainerCommand(3L);
+ // No deadline on the 3rd command
+
+ clock.fastForward(15000);
+ handler.handle(command1, ozoneContainer, null, null);
+ Assertions.assertEquals(1, handler.getTimeoutCount());
+ handler.handle(command2, ozoneContainer, null, null);
+ handler.handle(command3, ozoneContainer, null, null);
+ Assertions.assertEquals(1, handler.getTimeoutCount());
+ Assertions.assertEquals(3, handler.getInvocationCount());
+ Mockito.verify(controller, times(0))
+ .deleteContainer(1L, false);
+ Mockito.verify(controller, times(1))
+ .deleteContainer(2L, false);
+ Mockito.verify(controller, times(1))
+ .deleteContainer(3L, false);
+ }
+
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
index c40ceb2ea3..f0b6ff3c36 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
@@ -21,20 +21,39 @@ import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.TestClock;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
import java.util.SortedMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
+import static
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+
/**
* Tests the ECReconstructionSupervisor.
*/
public class TestECReconstructionSupervisor {
+ private TestClock clock;
+
+ @BeforeEach
+ public void setup() {
+ clock = new TestClock(Instant.now(), ZoneId.systemDefault());
+ }
+
+
@Test
public void testAddTaskShouldExecuteTheGivenTask()
throws InterruptedException, TimeoutException, IOException {
@@ -58,15 +77,61 @@ public class TestECReconstructionSupervisor {
super.reconstructECContainerGroup(containerID, repConfig,
sourceNodeMap, targetNodeMap);
}
- }) {
+ }, clock) {
};
- supervisor.addTask(
- new ECReconstructionCommandInfo(1, new ECReplicationConfig(3, 2),
- new byte[0], ImmutableList.of(), ImmutableList.of()));
+ ReconstructECContainersCommand command = new
ReconstructECContainersCommand(
+ 1L, ImmutableList.of(), ImmutableList.of(), new byte[0],
+ new ECReplicationConfig(3, 2));
+ supervisor.addTask(new ECReconstructionCommandInfo(command));
runnableInvoked.await();
Assertions.assertEquals(1, supervisor.getInFlightReplications());
holdProcessing.countDown();
GenericTestUtils
.waitFor(() -> supervisor.getInFlightReplications() == 0, 100, 15000);
}
+
+ @Test
+ public void testTasksWithDeadlineExceededAreNotRun() throws IOException {
+ ECReconstructionCoordinator coordinator =
+ Mockito.mock(ECReconstructionCoordinator.class);
+ ECReconstructionSupervisor supervisor =
+ new ECReconstructionSupervisor(null, null,
+ newDirectExecutorService(), coordinator, clock);
+
+ ReconstructECContainersCommand command = new
ReconstructECContainersCommand(
+ 1L, ImmutableList.of(), ImmutableList.of(), new byte[0],
+ new ECReplicationConfig(3, 2));
+ ECReconstructionCommandInfo task1 =
+ new ECReconstructionCommandInfo(command);
+
+ command = new ReconstructECContainersCommand(
+ 2L, ImmutableList.of(), ImmutableList.of(), new byte[0],
+ new ECReplicationConfig(3, 2));
+ command.setDeadline(clock.millis() + 10000);
+ ECReconstructionCommandInfo task2 =
+ new ECReconstructionCommandInfo(command);
+
+ command = new ReconstructECContainersCommand(
+ 3L, ImmutableList.of(), ImmutableList.of(), new byte[0],
+ new ECReplicationConfig(3, 2));
+ command.setDeadline(clock.millis() + 20000);
+ ECReconstructionCommandInfo task3 =
+ new ECReconstructionCommandInfo(command);
+
+ clock.fastForward(15000);
+ supervisor.addTask(task1);
+ supervisor.addTask(task2);
+ supervisor.addTask(task3);
+
+ // No deadline for container 1, it should run.
+ Mockito.verify(coordinator, times(1))
+ .reconstructECContainerGroup(eq(1L), any(), any(), any());
+ // Deadline passed for container 2, it should not run.
+ Mockito.verify(coordinator, times(0))
+ .reconstructECContainerGroup(eq(2L), any(), any(), any());
+ // Deadline not passed for container 3, it should run.
+ Mockito.verify(coordinator, times(1))
+ .reconstructECContainerGroup(eq(3L), any(), any(), any());
+ }
+
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
index 457f0a58ae..1ec6fecd40 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.container.replication;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -24,8 +25,10 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.ozone.common.MonotonicClock;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.junit.jupiter.api.Assertions;
@@ -43,6 +46,9 @@ public class ReplicationSupervisorScheduling {
@Test
public void test() throws InterruptedException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ ReplicationServer.ReplicationConfig replicationConfig
+ = conf.getObject(ReplicationServer.ReplicationConfig.class);
List<DatanodeDetails> datanodes = new ArrayList<>();
datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
@@ -69,7 +75,7 @@ public class ReplicationSupervisorScheduling {
ContainerSet cs = new ContainerSet(1000);
- ReplicationSupervisor rs = new ReplicationSupervisor(cs,
+ ReplicationSupervisor rs = new ReplicationSupervisor(cs, null,
//simplified executor emulating the current sequential download +
//import.
@@ -107,7 +113,7 @@ public class ReplicationSupervisorScheduling {
}
}
- }, 10);
+ }, replicationConfig, new MonotonicClock(ZoneId.systemDefault()));
final long start = System.currentTimeMillis();
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index 51f78440bb..6ca5085478 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.container.replication;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.ZoneId;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.AbstractExecutorService;
@@ -38,7 +40,9 @@ import
org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.TestClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -76,6 +80,7 @@ public class TestReplicationSupervisor {
private ContainerSet set;
private final ContainerLayoutVersion layout;
+ private TestClock clock;
public TestReplicationSupervisor(ContainerLayoutVersion layout) {
this.layout = layout;
@@ -88,6 +93,7 @@ public class TestReplicationSupervisor {
@Before
public void setUp() throws Exception {
+ clock = new TestClock(Instant.now(), ZoneId.systemDefault());
set = new ContainerSet(1000);
}
@@ -231,7 +237,7 @@ public class TestReplicationSupervisor {
public void testDownloadAndImportReplicatorFailure() {
ReplicationSupervisor supervisor =
new ReplicationSupervisor(set, null, mutableReplicator,
- newDirectExecutorService());
+ newDirectExecutorService(), clock);
// Mock to fetch an exception in the importContainer method.
SimpleContainerDownloader moc =
@@ -256,6 +262,39 @@ public class TestReplicationSupervisor {
.contains("Container 1 replication was unsuccessful."));
}
+ @Test
+ public void testTaskBeyondDeadline() {
+ ReplicationSupervisor supervisor =
+ supervisorWithReplicator(FakeReplicator::new);
+
+ ReplicateContainerCommand cmd = new ReplicateContainerCommand(1L,
+ emptyList());
+ cmd.setDeadline(clock.millis() + 10000);
+ ReplicationTask task1 = new ReplicationTask(cmd);
+ cmd = new ReplicateContainerCommand(2L, emptyList());
+ cmd.setDeadline(clock.millis() + 20000);
+ ReplicationTask task2 = new ReplicationTask(cmd);
+ cmd = new ReplicateContainerCommand(3L, emptyList());
+ // No deadline set
+ ReplicationTask task3 = new ReplicationTask(cmd);
+ // no deadline set
+
+ clock.fastForward(15000);
+
+ supervisor.addTask(task1);
+ supervisor.addTask(task2);
+ supervisor.addTask(task3);
+
+ Assert.assertEquals(3, supervisor.getReplicationRequestCount());
+ Assert.assertEquals(2, supervisor.getReplicationSuccessCount());
+ Assert.assertEquals(0, supervisor.getReplicationFailureCount());
+ Assert.assertEquals(0, supervisor.getInFlightReplications());
+ Assert.assertEquals(0, supervisor.getQueueSize());
+ Assert.assertEquals(1, supervisor.getReplicationTimeoutCount());
+ Assert.assertEquals(2, set.containerCount());
+
+ }
+
private ReplicationSupervisor supervisorWithReplicator(
Function<ReplicationSupervisor, ContainerReplicator> replicatorFactory) {
return supervisorWith(replicatorFactory, newDirectExecutorService());
@@ -265,7 +304,8 @@ public class TestReplicationSupervisor {
Function<ReplicationSupervisor, ContainerReplicator> replicatorFactory,
ExecutorService executor) {
ReplicationSupervisor supervisor =
- new ReplicationSupervisor(set, null, mutableReplicator, executor);
+ new ReplicationSupervisor(set, null, mutableReplicator, executor,
+ clock);
replicatorRef.set(replicatorFactory.apply(supervisor));
return supervisor;
}
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 6465eeb40b..7f8476d995 100644
---
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -343,6 +343,7 @@ message SCMCommandProto {
// SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM.
optional int64 term = 15;
optional string encodedToken = 16;
+ optional int64 deadlineMsSinceEpoch = 17;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index f78536912e..261803a3ba 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.PostConstruct;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
@@ -417,6 +418,8 @@ public class ReplicationManager implements SCMService {
LOG.info("Sending command of type {} for container {} to {}",
command.getType(), containerInfo, target);
command.setTerm(getScmTerm());
+ command.setDeadline(clock.millis() +
+ Math.round(rmConf.eventTimeout * rmConf.commandDeadlineFactor));
final CommandForDatanode<?> datanodeCommand =
new CommandForDatanode<>(target.getUuid(), command);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
@@ -765,6 +768,30 @@ public class ReplicationManager implements SCMService {
this.eventTimeout = timeout.toMillis();
}
+ /**
+ * Deadline which should be set on commands sent from ReplicationManager
+ * to the datanodes, as a percentage of the event.timeout. If the command
+ * has not been processed on the datanode by this time, it will be dropped
+ * by the datanode and Replication Manager will need to resend it.
+ */
+ @Config(key = "command.deadline.factor",
+ type = ConfigType.DOUBLE,
+ defaultValue = "0.9",
+ tags = {SCM, OZONE},
+ description = "Fraction of the hdds.scm.replication.event.timeout "
+ + "from the current time which should be set as a deadline for "
+ + "commands sent from ReplicationManager to datanodes. "
+ + "Commands which are not processed before this deadline will be "
+ + "dropped by the datanodes. Should be a value > 0 and <= 1.")
+ private double commandDeadlineFactor = 0.9;
+ public double getCommandDeadlineFactor() {
+ return commandDeadlineFactor;
+ }
+
+ public void setCommandDeadlineFactor(double val) {
+ commandDeadlineFactor = val;
+ }
+
/**
* The number of container replica which must be available for a node to
* enter maintenance.
@@ -811,6 +838,15 @@ public class ReplicationManager implements SCMService {
)
private int maintenanceRemainingRedundancy = 1;
+ @PostConstruct
+ public void validate() {
+ if (!(commandDeadlineFactor > 0) || (commandDeadlineFactor > 1)) {
+ throw new IllegalArgumentException("command.deadline.factor is set to "
+ + commandDeadlineFactor
+ + " and must be greater than 0 and less than equal to 1");
+ }
+ }
+
public void setMaintenanceRemainingRedundancy(int redundancy) {
this.maintenanceRemainingRedundancy = redundancy;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index f2654fac02..467adbe23a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -272,7 +272,7 @@ public class SCMDatanodeProtocolServer implements
SCMHeartbeatRequestProto heartbeat) throws IOException, TimeoutException
{
List<SCMCommandProto> cmdResponses = new ArrayList<>();
for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) {
- cmdResponses.add(getCommandResponse(cmd));
+ cmdResponses.add(getCommandResponse(cmd, scm));
}
boolean auditSuccess = true;
Map<String, String> auditMap = Maps.newHashMap();
@@ -305,14 +305,17 @@ public class SCMDatanodeProtocolServer implements
* @throws IOException
*/
@VisibleForTesting
- public SCMCommandProto getCommandResponse(SCMCommand cmd)
- throws IOException, TimeoutException {
+ public static SCMCommandProto getCommandResponse(SCMCommand cmd,
+ OzoneStorageContainerManager scm) throws IOException, TimeoutException {
SCMCommandProto.Builder builder = SCMCommandProto.newBuilder()
.setEncodedToken(cmd.getEncodedToken());
// In HA mode, it is the term of current leader SCM.
// In non-HA mode, it is the default value 0.
builder.setTerm(cmd.getTerm());
+ // The default deadline is 0, which means no deadline. Individual commands
+ // may have a deadline set.
+ builder.setDeadlineMsSinceEpoch(cmd.getDeadline());
switch (cmd.getType()) {
case reregisterCommand:
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 82f971be79..92ec499570 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -477,6 +477,15 @@ public class TestReplicationManager {
replicationManager.sendDatanodeCommand(command, containerInfo, target);
+ // Ensure that the command deadline is set to current time
+ // + evenTime * factor
+ ReplicationManager.ReplicationManagerConfiguration rmConf = configuration
+ .getObject(ReplicationManager.ReplicationManagerConfiguration.class);
+ long expectedDeadline = clock.millis() +
+ Math.round(rmConf.getEventTimeout() *
+ rmConf.getCommandDeadlineFactor());
+ Assert.assertEquals(expectedDeadline, command.getDeadline());
+
List<ContainerReplicaOp> ops = containerReplicaPendingOps.getPendingOps(
containerInfo.containerID());
Mockito.verify(eventPublisher).fireEvent(any(), any());
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
new file mode 100644
index 0000000000..cfa34fc445
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test for StorageContainerDatanodeProtocolProtos.
+ */
+public class TestSCMDatanodeProtocolServer {
+
+ @Test
+ public void ensureTermAndDeadlineOnCommands()
+ throws IOException, TimeoutException {
+ OzoneStorageContainerManager scm =
+ Mockito.mock(OzoneStorageContainerManager.class);
+
+ ReplicateContainerCommand command = new ReplicateContainerCommand(1L,
+ Collections.emptyList());
+ command.setTerm(5L);
+ command.setDeadline(1234L);
+ StorageContainerDatanodeProtocolProtos.SCMCommandProto proto =
+ SCMDatanodeProtocolServer.getCommandResponse(command, scm);
+
+ Assert.assertEquals(StorageContainerDatanodeProtocolProtos.SCMCommandProto
+ .Type.replicateContainerCommand, proto.getCommandType());
+ Assert.assertEquals(5L, proto.getTerm());
+ Assert.assertEquals(1234L, proto.getDeadlineMsSinceEpoch());
+ }
+}
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index a7d332e783..ebfa2d8637 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.common.MonotonicClock;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -37,9 +38,11 @@ import
org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
import
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
+import org.apache.hadoop.ozone.container.replication.ReplicationServer;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.container.replication.ReplicationTask;
import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.jetbrains.annotations.NotNull;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
@@ -48,6 +51,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -123,8 +127,9 @@ public class ClosedContainerReplicator extends
BaseFreonGenerator implements
//if datanode is specified, replicate only container if it has a
//replica.
if (datanode.isEmpty() || datanodeUUIDs.contains(datanode)) {
- replicationTasks.add(new ReplicationTask(container.getContainerID(),
- datanodesWithContainer));
+ replicationTasks.add(new ReplicationTask(
+ new ReplicateContainerCommand(container.getContainerID(),
+ datanodesWithContainer)));
}
}
@@ -203,7 +208,11 @@ public class ClosedContainerReplicator extends
BaseFreonGenerator implements
new SimpleContainerDownloader(conf, null),
new TarContainerPacker());
- supervisor = new ReplicationSupervisor(containerSet, replicator, 10);
+ ReplicationServer.ReplicationConfig replicationConfig
+ = conf.getObject(ReplicationServer.ReplicationConfig.class);
+ supervisor = new ReplicationSupervisor(containerSet, null,
+ replicator, replicationConfig,
+ new MonotonicClock(ZoneId.systemDefault()));
}
private void replicateContainer(long counter) throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]