This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new bb71b65dc6 HDDS-7882. Delete empty containers that are stuck in
CLOSING state. (#4684)
bb71b65dc6 is described below
commit bb71b65dc64e4166ac1042dcd3e133c8ff8c493d
Author: Nandakumar <[email protected]>
AuthorDate: Fri Jun 2 16:30:05 2023 +0530
HDDS-7882. Delete empty containers that are stuck in CLOSING state. (#4684)
---
.../hadoop/hdds/scm/container/ContainerInfo.java | 60 +++++++++++---
.../hdds/scm/container/TestContainerInfo.java | 95 +++++++++++++++-------
.../replication/LegacyReplicationManager.java | 40 +++++++++
.../container/replication/ReplicationManager.java | 9 +-
.../health/ClosingContainerHandler.java | 57 +++++++++++--
.../scm/container/states/ContainerStateMap.java | 20 ++---
.../scm/container/TestContainerStateManager.java | 2 -
.../scm/container/balancer/TestMoveManager.java | 9 +-
.../container/replication/ReplicationTestUtil.java | 37 ++++-----
.../health/TestClosingContainerHandler.java | 60 +++++++++++++-
.../main/java/org/apache/ozone/test/TestClock.java | 5 ++
11 files changed, 295 insertions(+), 99 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
index 3f6c2a7df9..8bcc396142 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import java.time.Clock;
import java.time.Instant;
import java.util.Comparator;
@@ -28,13 +29,12 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto2Codec;
-import org.apache.hadoop.util.Time;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.base.Preconditions;
import static java.lang.Math.max;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.ratis.util.Preconditions;
/**
* Class wraps ozone container info.
@@ -53,9 +53,17 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
}
private HddsProtos.LifeCycleState state;
+ // The wall-clock ms since the epoch at which the current state enters.
+ private Instant stateEnterTime;
+ @JsonIgnore
+ private HddsProtos.LifeCycleState previousState;
+ @JsonIgnore
+ private Instant previousStateEnterTime;
@JsonIgnore
private final PipelineID pipelineID;
private final ReplicationConfig replicationConfig;
+ @JsonIgnore
+ private final Clock clock;
/*
usedBytes is a volatile field. Writes and Reads of volatile long are atomic
and each read of a volatile will see the last write to that volatile by any
@@ -65,8 +73,6 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
private volatile long usedBytes;
private long numberOfKeys;
private Instant lastUsed;
- // The wall-clock ms since the epoch at which the current state enters.
- private final Instant stateEnterTime;
private String owner;
// This is JsonIgnored as originally this class held a long in instead of
// a containerID object. By emitting this in Json, it changes the JSON
output.
@@ -85,7 +91,7 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
private long sequenceId;
@SuppressWarnings("parameternumber")
- ContainerInfo(
+ private ContainerInfo(
long containerID,
HddsProtos.LifeCycleState state,
PipelineID pipelineID,
@@ -95,18 +101,20 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
String owner,
long deleteTransactionId,
long sequenceId,
- ReplicationConfig repConfig) {
+ ReplicationConfig repConfig,
+ Clock clock) {
this.containerID = ContainerID.valueOf(containerID);
this.pipelineID = pipelineID;
this.usedBytes = usedBytes;
this.numberOfKeys = numberOfKeys;
- this.lastUsed = Instant.ofEpochMilli(Time.now());
+ this.lastUsed = clock.instant();
this.state = state;
this.stateEnterTime = Instant.ofEpochMilli(stateEnterTime);
this.owner = owner;
this.deleteTransactionId = deleteTransactionId;
this.sequenceId = sequenceId;
this.replicationConfig = repConfig;
+ this.clock = clock;
}
public static ContainerInfo fromProtobuf(HddsProtos.ContainerInfoProto info)
{
@@ -146,7 +154,11 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
}
public void setState(HddsProtos.LifeCycleState state) {
+ previousState = this.state;
+ previousStateEnterTime = this.stateEnterTime;
+
this.state = state;
+ this.stateEnterTime = clock.instant();
}
public Instant getStateEnterTime() {
@@ -240,7 +252,7 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
}
public void updateLastUsedTime() {
- lastUsed = Instant.ofEpochMilli(Time.now());
+ lastUsed = clock.instant();
}
@JsonIgnore
@@ -286,8 +298,8 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
return "ContainerInfo{"
+ "id=" + containerID
+ ", state=" + state
- + ", pipelineID=" + pipelineID
+ ", stateEnterTime=" + stateEnterTime
+ + ", pipelineID=" + pipelineID
+ ", owner=" + owner
+ '}';
}
@@ -342,6 +354,20 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
return COMPARATOR.compare(this, o);
}
+ /**
+ * Restore previous state.
+ */
+ public void revertState() {
+ if (previousState == null || previousStateEnterTime == null) {
+ throw new IllegalStateException("previous state unknown");
+ }
+
+ state = previousState;
+ stateEnterTime = previousStateEnterTime;
+ previousState = null;
+ previousStateEnterTime = null;
+ }
+
/**
* Builder class for ContainerInfo.
*/
@@ -349,7 +375,8 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
private HddsProtos.LifeCycleState state;
private long used;
private long keys;
- private long stateEnterTime;
+ private Clock clock = Clock.systemUTC();
+ private long stateEnterTime = clock.millis();
private String owner;
private long containerID;
private long deleteTransactionId;
@@ -368,7 +395,7 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
}
public Builder setContainerID(long id) {
- Preconditions.checkState(id >= 0);
+ Preconditions.assertTrue(id >= 0, () -> id + " < 0");
this.containerID = id;
return this;
}
@@ -408,10 +435,19 @@ public final class ContainerInfo implements
Comparable<ContainerInfo> {
return this;
}
+ /**
+ * Also resets {@code stateEnterTime}, so make sure to set clock first.
+ */
+ public Builder setClock(Clock clock) {
+ this.clock = clock;
+ this.stateEnterTime = clock.millis();
+ return this;
+ }
+
public ContainerInfo build() {
return new ContainerInfo(containerID, state, pipelineID,
used, keys, stateEnterTime, owner, deleteTransactionId,
- sequenceId, replicationConfig);
+ sequenceId, replicationConfig, clock);
}
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
index dfc5a0c7a9..c75bdc4f4a 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
@@ -19,17 +19,22 @@ package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.junit.jupiter.api.Assertions;
+import org.apache.ozone.test.TestClock;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Tests for the ContainerInfo class.
@@ -38,49 +43,79 @@ import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.R
public class TestContainerInfo {
@Test
- public void getProtobufMessageEC() throws IOException {
- ContainerInfo container =
- createContainerInfo(RatisReplicationConfig.getInstance(THREE));
+ void getProtobufRatis() {
+ ContainerInfo container = newBuilderForTest()
+ .setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
+ .build();
+
HddsProtos.ContainerInfoProto proto = container.getProtobuf();
// No EC Config
- Assertions.assertFalse(proto.hasEcReplicationConfig());
- Assertions.assertEquals(THREE, proto.getReplicationFactor());
- Assertions.assertEquals(RATIS, proto.getReplicationType());
+ assertFalse(proto.hasEcReplicationConfig());
+ assertEquals(THREE, proto.getReplicationFactor());
+ assertEquals(RATIS, proto.getReplicationType());
// Reconstruct object from Proto
ContainerInfo recovered = ContainerInfo.fromProtobuf(proto);
- Assertions.assertEquals(RATIS, recovered.getReplicationType());
- Assertions.assertTrue(
- recovered.getReplicationConfig() instanceof RatisReplicationConfig);
+ assertEquals(RATIS, recovered.getReplicationType());
+ assertEquals(RatisReplicationConfig.class,
+ recovered.getReplicationConfig().getClass());
+ assertEquals(THREE, recovered.getReplicationFactor());
+ }
+ @Test
+ void getProtobufEC() {
// EC Config
- container = createContainerInfo(new ECReplicationConfig(3, 2));
- proto = container.getProtobuf();
+ ContainerInfo container = newBuilderForTest()
+ .setReplicationConfig(new ECReplicationConfig(3, 2))
+ .build();
- Assertions.assertEquals(3, proto.getEcReplicationConfig().getData());
- Assertions.assertEquals(2, proto.getEcReplicationConfig().getParity());
- Assertions.assertFalse(proto.hasReplicationFactor());
- Assertions.assertEquals(EC, proto.getReplicationType());
+ HddsProtos.ContainerInfoProto proto = container.getProtobuf();
+
+ assertEquals(3, proto.getEcReplicationConfig().getData());
+ assertEquals(2, proto.getEcReplicationConfig().getParity());
+ assertFalse(proto.hasReplicationFactor());
+ assertEquals(EC, proto.getReplicationType());
// Reconstruct object from Proto
- recovered = ContainerInfo.fromProtobuf(proto);
- Assertions.assertEquals(EC, recovered.getReplicationType());
- Assertions.assertTrue(
- recovered.getReplicationConfig() instanceof ECReplicationConfig);
+ ContainerInfo recovered = ContainerInfo.fromProtobuf(proto);
+ assertEquals(EC, recovered.getReplicationType());
+ assertEquals(ECReplicationConfig.class,
+ recovered.getReplicationConfig().getClass());
ECReplicationConfig config =
(ECReplicationConfig)recovered.getReplicationConfig();
- Assertions.assertEquals(3, config.getData());
- Assertions.assertEquals(2, config.getParity());
+ assertEquals(3, config.getData());
+ assertEquals(2, config.getParity());
+ }
+
+ @Test
+ void restoreState() {
+ TestClock clock = TestClock.newInstance();
+ ContainerInfo subject = newBuilderForTest()
+ .setClock(clock)
+ .build();
+
+ final HddsProtos.LifeCycleState initialState = subject.getState();
+ final Instant initialStateEnterTime = subject.getStateEnterTime();
+
+ clock.fastForward(Duration.ofMinutes(1));
+ subject.setState(CLOSING);
+
+ assertEquals(CLOSING, subject.getState());
+ assertEquals(clock.instant(), subject.getStateEnterTime());
+
+ subject.revertState();
+ assertEquals(initialState, subject.getState());
+ assertEquals(initialStateEnterTime, subject.getStateEnterTime());
+
+ assertThrows(IllegalStateException.class, subject::revertState);
}
- private ContainerInfo createContainerInfo(ReplicationConfig repConfig) {
- ContainerInfo.Builder builder = new ContainerInfo.Builder();
- builder.setContainerID(1234)
- .setReplicationConfig(repConfig)
+ public static ContainerInfo.Builder newBuilderForTest() {
+ return new ContainerInfo.Builder()
+ .setContainerID(1234)
.setPipelineID(PipelineID.randomId())
- .setState(HddsProtos.LifeCycleState.OPEN)
+ .setState(OPEN)
.setOwner("scm");
- return builder.build();
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index c8d7186d07..ef762a976c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -72,6 +72,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -385,6 +387,18 @@ public class LegacyReplicationManager {
container, replica.getDatanodeDetails(), false);
}
}
+
+ /*
+ * Empty containers in CLOSING state should be CLOSED.
+ *
+ * These are containers that are allocated in SCM but never got
+ * created on Datanodes. Since these containers don't have any
+ * replica associated with them, they are stuck in CLOSING state
+ * forever as there is no replicas to CLOSE.
+ */
+ if (replicas.isEmpty() && (container.getNumberOfKeys() == 0)) {
+ closeEmptyContainer(container);
+ }
return;
}
@@ -2235,4 +2249,30 @@ public class LegacyReplicationManager {
container.getContainerID(), ex);
}
}
+
+ private void closeEmptyContainer(ContainerInfo containerInfo) {
+ /*
+ * We should wait for sometime before moving the container to CLOSED state.
+ * This will give enough time for Datanodes to report the container,
+ * in cases where the container creation was successful on Datanodes.
+ *
+ * Should we have a separate configuration for this wait time?
+ * For now, we are using ReplicationManagerThread Interval * 5 as the wait
+ * time.
+ */
+
+ final Duration waitTime = rmConf.getInterval().multipliedBy(5);
+ final Instant closingTime = containerInfo.getStateEnterTime();
+
+ try {
+ if (clock.instant().isAfter(closingTime.plus(waitTime))) {
+ containerManager.updateContainerState(containerInfo.containerID(),
+ HddsProtos.LifeCycleEvent.CLOSE);
+ }
+ } catch (IOException | InvalidStateTransitionException |
+ TimeoutException e) {
+ LOG.error("Failed to CLOSE the container {}",
+ containerInfo.containerID(), e);
+ }
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index a29e880660..aab18b1bc5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -278,7 +278,7 @@ public class ReplicationManager implements SCMService {
// containers when they are checked by RM.
containerCheckChain = new OpenContainerHandler(this);
containerCheckChain
- .addNext(new ClosingContainerHandler(this))
+ .addNext(new ClosingContainerHandler(this, clock))
.addNext(new QuasiClosedContainerHandler(this))
.addNext(new MismatchedReplicasHandler(this))
.addNext(new EmptyContainerHandler(this))
@@ -927,10 +927,11 @@ public class ReplicationManager implements SCMService {
* interval and processes all the containers in the system.
*/
private synchronized void run() {
+ final long interval = rmConf.getInterval().toMillis();
try {
while (running) {
processAll();
- wait(rmConf.getInterval());
+ wait(interval);
}
} catch (Throwable t) {
if (t instanceof InterruptedException) {
@@ -1296,8 +1297,8 @@ public class ReplicationManager implements SCMService {
return maintenanceRemainingRedundancy;
}
- public long getInterval() {
- return interval;
+ public Duration getInterval() {
+ return Duration.ofMillis(interval);
}
public long getUnderReplicatedInterval() {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosingContainerHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosingContainerHandler.java
index e9db83c5f7..27ef6032a0 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosingContainerHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosingContainerHandler.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hdds.scm.container.replication.health;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -28,6 +30,10 @@ import
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+
/**
* Class used in Replication Manager to close replicas of CLOSING containers.
*/
@@ -36,9 +42,11 @@ public class ClosingContainerHandler extends AbstractCheck {
LoggerFactory.getLogger(ClosingContainerHandler.class);
private final ReplicationManager replicationManager;
+ private final Clock clock;
- public ClosingContainerHandler(ReplicationManager replicationManager) {
- this.replicationManager = replicationManager;
+ public ClosingContainerHandler(ReplicationManager rm, Clock clock) {
+ replicationManager = rm;
+ this.clock = clock;
}
/**
@@ -53,16 +61,16 @@ public class ClosingContainerHandler extends AbstractCheck {
public boolean handle(ContainerCheckRequest request) {
ContainerInfo containerInfo = request.getContainerInfo();
- if (containerInfo.getState() != HddsProtos.LifeCycleState.CLOSING) {
+ if (containerInfo.getState() != LifeCycleState.CLOSING) {
return false;
}
LOG.debug("Checking container {} in ClosingContainerHandler",
containerInfo);
- boolean forceClose = request.getContainerInfo().getReplicationConfig()
- .getReplicationType() != HddsProtos.ReplicationType.RATIS;
+ boolean forceClose = containerInfo.getReplicationConfig()
+ .getReplicationType() != ReplicationType.RATIS;
- if (request.getContainerReplicas().size() == 0) {
+ if (request.getContainerReplicas().isEmpty()) {
request.getReport().incrementAndSample(
ReplicationManagerReport.HealthState.MISSING,
containerInfo.containerID());
@@ -74,6 +82,41 @@ public class ClosingContainerHandler extends AbstractCheck {
containerInfo, replica.getDatanodeDetails(), forceClose);
}
}
+
+ /*
+ * Empty containers in CLOSING state should be CLOSED.
+ *
+ * These are containers that are allocated in SCM but never got created
+ * on Datanodes. Since these containers don't have any replica associated
+ * with them, they are stuck in CLOSING state forever as there is no
+ * replicas to CLOSE.
+ *
+ * We should wait for sometime before moving the container to CLOSED state.
+ * This will give enough time for Datanodes to report the container,
+ * in cases where the container creation was successful on Datanodes.
+ *
+ * Should we have a separate configuration for this wait time?
+ * For now, we are using ReplicationManagerThread Interval * 5 as the wait
+ * time.
+ */
+ if (request.getContainerReplicas().isEmpty() &&
+ containerInfo.getNumberOfKeys() == 0 &&
+ hasWaitTimeElapsed(containerInfo)) {
+
+ LOG.debug("Container appears to be empty, has no replicas, and has been "
+ + "closing, so moving to closed state: {}", containerInfo);
+
+ replicationManager.updateContainerState(
+ containerInfo.containerID(), LifeCycleEvent.CLOSE);
+ }
+
return true;
}
+
+ private boolean hasWaitTimeElapsed(ContainerInfo containerInfo) {
+ Duration waitTime = replicationManager.getConfig().getInterval()
+ .multipliedBy(5);
+ Instant closingTime = containerInfo.getStateEnterTime();
+ return clock.instant().isAfter(closingTime.plus(waitTime));
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index d7a434b9d8..2d74160fc4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -168,9 +168,6 @@ public class ContainerStateMap {
/**
* Returns the latest list of DataNodes where replica for given containerId
* exist.
- *
- * @param containerID
- * @return Set<DatanodeDetails>
*/
public Set<ContainerReplica> getContainerReplicas(
final ContainerID containerID) {
@@ -182,9 +179,6 @@ public class ContainerStateMap {
* Adds given datanodes as nodes where replica for given containerId exist.
* Logs a debug entry if a datanode is already added as replica for given
* ContainerId.
- *
- * @param containerID
- * @param replica
*/
public void updateContainerReplica(final ContainerID containerID,
final ContainerReplica replica) {
@@ -199,10 +193,6 @@ public class ContainerStateMap {
/**
* Remove a container Replica for given DataNode.
- *
- * @param containerID
- * @param replica
- * @return True of dataNode is removed successfully else false.
*/
public void removeContainerReplica(final ContainerID containerID,
final ContainerReplica replica) {
@@ -291,7 +281,7 @@ public class ContainerStateMap {
"old state. Old = {}, Attempted state = {}", currentState,
newState);
- currentInfo.setState(currentState);
+ currentInfo.revertState();
// if this line throws, the state map can be in an inconsistent
// state, since we will have modified the attribute by the
@@ -444,16 +434,16 @@ public class ContainerStateMap {
* Sorts a list of Sets based on Size. This is useful when we are
* intersecting the sets.
*
- * @param sets - varagrs of sets
+ * @param sets - varargs of sets
* @return Returns a sorted array of sets based on the size of the set.
*/
- @SuppressWarnings("unchecked")
- private NavigableSet<ContainerID>[] sortBySize(
+ @SafeVarargs
+ private final NavigableSet<ContainerID>[] sortBySize(
final NavigableSet<ContainerID>... sets) {
for (int x = 0; x < sets.length - 1; x++) {
for (int y = 0; y < sets.length - x - 1; y++) {
if (sets[y].size() > sets[y + 1].size()) {
- final NavigableSet temp = sets[y];
+ final NavigableSet<ContainerID> temp = sets[y];
sets[y] = sets[y + 1];
sets[y + 1] = temp;
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index 0fbba14606..f7f2714119 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
-import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -172,7 +171,6 @@ public class TestContainerStateManager {
.setPipelineID(pipeline.getId())
.setUsedBytes(0)
.setNumberOfKeys(0)
- .setStateEnterTime(Time.now())
.setOwner("root")
.setContainerID(1)
.setDeleteTransactionId(0)
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
index b97e2b4c63..35ca373dce 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
@@ -42,8 +42,6 @@ import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;
-import java.time.Instant;
-import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -82,7 +80,7 @@ public class TestMoveManager {
@Before
public void setup() throws ContainerNotFoundException,
NodeNotFoundException {
- clock = new TestClock(Instant.now(), ZoneId.systemDefault());
+ clock = TestClock.newInstance();
containerInfo = ReplicationTestUtil.createContainerInfo(
RatisReplicationConfig.getInstance(THREE), 1,
HddsProtos.LifeCycleState.CLOSED);
@@ -278,8 +276,9 @@ public class TestMoveManager {
containerInfo.containerID());
// check for an under replicated EC container
- containerInfo = ReplicationTestUtil.createContainer(
- HddsProtos.LifeCycleState.CLOSED, new ECReplicationConfig(3, 2));
+ containerInfo = ReplicationTestUtil.createContainerInfo(
+ new ECReplicationConfig(3, 2), 1,
+ HddsProtos.LifeCycleState.CLOSED);
replicas.clear();
replicas.addAll(ReplicationTestUtil.createReplicas(
containerInfo.containerID(), ContainerReplicaProto.State.CLOSED,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index a0a6fe6f7c..d0516dd505 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -29,10 +29,10 @@ import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.TestContainerInfo;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -189,34 +189,31 @@ public final class ReplicationTestUtil {
public static ContainerInfo createContainerInfo(
ReplicationConfig replicationConfig, long containerID,
HddsProtos.LifeCycleState containerState) {
- ContainerInfo.Builder builder = new ContainerInfo.Builder();
- builder.setContainerID(containerID);
- builder.setOwner("Ozone");
- builder.setPipelineID(PipelineID.randomId());
- builder.setReplicationConfig(replicationConfig);
- builder.setState(containerState);
- return builder.build();
+ return TestContainerInfo.newBuilderForTest()
+ .setContainerID(containerID)
+ .setReplicationConfig(replicationConfig)
+ .setState(containerState)
+ .build();
}
public static ContainerInfo createContainerInfo(
ReplicationConfig replicationConfig, long containerID,
HddsProtos.LifeCycleState containerState, long keyCount, long bytesUsed)
{
- ContainerInfo.Builder builder = new ContainerInfo.Builder();
- builder.setContainerID(containerID);
- builder.setOwner("Ozone");
- builder.setPipelineID(PipelineID.randomId());
- builder.setReplicationConfig(replicationConfig);
- builder.setState(containerState);
- builder.setNumberOfKeys(keyCount);
- builder.setUsedBytes(bytesUsed);
- return builder.build();
+ return TestContainerInfo.newBuilderForTest()
+ .setContainerID(containerID)
+ .setReplicationConfig(replicationConfig)
+ .setState(containerState)
+ .setNumberOfKeys(keyCount)
+ .setUsedBytes(bytesUsed)
+ .build();
}
public static ContainerInfo createContainer(HddsProtos.LifeCycleState state,
ReplicationConfig replicationConfig) {
- return new ContainerInfo.Builder()
- .setContainerID(1).setState(state)
- .setReplicationConfig(replicationConfig).build();
+ return TestContainerInfo.newBuilderForTest()
+ .setState(state)
+ .setReplicationConfig(replicationConfig)
+ .build();
}
@SafeVarargs
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosingContainerHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosingContainerHandler.java
index 4b8219d5ac..b4f5a18bb9 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosingContainerHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosingContainerHandler.java
@@ -21,15 +21,18 @@ package
org.apache.hadoop.hdds.scm.container.replication.health;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.TestContainerInfo;
import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil;
+import org.apache.ozone.test.TestClock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -38,31 +41,42 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+import static org.mockito.Mockito.never;
/**
* Tests for {@link ClosingContainerHandler}.
*/
public class TestClosingContainerHandler {
+ private final ReplicationManager.ReplicationManagerConfiguration rmConf =
+ new OzoneConfiguration()
+ .getObject(ReplicationManager.ReplicationManagerConfiguration.class);
+
private ReplicationManager replicationManager;
- private ClosingContainerHandler closingContainerHandler;
+ private ClosingContainerHandler subject;
private static final ECReplicationConfig EC_REPLICATION_CONFIG =
new ECReplicationConfig(3, 2);
private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+ private final TestClock clock = TestClock.newInstance();
+
@BeforeEach
public void setup() {
replicationManager = Mockito.mock(ReplicationManager.class);
- closingContainerHandler = new ClosingContainerHandler(replicationManager);
+ Mockito.when(replicationManager.getConfig())
+ .thenReturn(rmConf);
+ subject = new ClosingContainerHandler(replicationManager, clock);
}
private static Stream<ReplicationConfig> replicationConfigs() {
@@ -210,6 +224,44 @@ public class TestClosingContainerHandler {
});
}
+ @Test
+ public void testEmptyContainerInClosingState() throws InterruptedException {
+
+ /*
+ * Empty Container in CLOSING state should be CLOSED after
+ * a timeout (ReplicationManager Interval * 5)
+ */
+ Duration replicationInterval = Duration.ofSeconds(1);
+ rmConf.setInterval(replicationInterval);
+
+ ContainerInfo containerInfo = TestContainerInfo.newBuilderForTest()
+ .setReplicationConfig(RATIS_REPLICATION_CONFIG)
+ .setContainerID(1)
+ .setState(CLOSING)
+ .setClock(clock)
+ .build();
+ ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+ .setPendingOps(Collections.emptyList())
+ .setReport(new ReplicationManagerReport())
+ .setContainerInfo(containerInfo)
+ .setContainerReplicas(new HashSet<>())
+ .build();
+
+ // not enough time has elapsed
+ Duration notEnoughTime = replicationInterval.multipliedBy(3).plusMillis(1);
+ clock.fastForward(notEnoughTime);
+ assertAndVerify(request, true, 0);
+ Mockito.verify(replicationManager, never())
+ .updateContainerState(containerInfo.containerID(), CLOSE);
+
+ // wait time has elapsed (3x + 2x + a bit)
+ Duration moreTime = replicationInterval.multipliedBy(2);
+ clock.fastForward(moreTime);
+ assertAndVerify(request, true, 0);
+ Mockito.verify(replicationManager, Mockito.times(1))
+ .updateContainerState(containerInfo.containerID(), CLOSE);
+ }
+
/**
* Close commands should be sent for Open or Closing replicas.
*/
@@ -254,7 +306,7 @@ public class TestClosingContainerHandler {
ArgumentCaptor<Boolean> forceCaptor =
ArgumentCaptor.forClass(Boolean.class);
- Assertions.assertTrue(closingContainerHandler.handle(request));
+ Assertions.assertTrue(subject.handle(request));
Mockito.verify(replicationManager, Mockito.times(replicas))
.sendCloseContainerReplicaCommand(Mockito.any(ContainerInfo.class),
Mockito.any(DatanodeDetails.class), forceCaptor.capture());
@@ -264,7 +316,7 @@ public class TestClosingContainerHandler {
private void assertAndVerify(ContainerCheckRequest request,
boolean assertion, int times) {
- Assertions.assertEquals(assertion,
closingContainerHandler.handle(request));
+ Assertions.assertEquals(assertion, subject.handle(request));
Mockito.verify(replicationManager, Mockito.times(times))
.sendCloseContainerReplicaCommand(Mockito.any(ContainerInfo.class),
Mockito.any(DatanodeDetails.class), Mockito.anyBoolean());
diff --git
a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java
b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java
index 8cf0c7e548..483ef92fc8 100644
--- a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java
+++ b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java
@@ -21,6 +21,7 @@ package org.apache.ozone.test;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
+import java.time.ZoneOffset;
import java.time.temporal.TemporalAmount;
/**
@@ -33,6 +34,10 @@ public class TestClock extends Clock {
private Instant instant;
private final ZoneId zoneId;
+ public static TestClock newInstance() {
+ return new TestClock(Instant.now(), ZoneOffset.UTC);
+ }
+
public TestClock(Instant instant, ZoneId zone) {
this.instant = instant;
this.zoneId = zone;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]