This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 9680edf HDDS-6384. EC: Ensure EC container usage is updated correctly
when handling reports (#3147)
9680edf is described below
commit 9680edf850b88aa4598d6feb455618d0d3d31db7
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Fri Mar 4 19:30:56 2022 +0000
HDDS-6384. EC: Ensure EC container usage is updated correctly when handling
reports (#3147)
---
.../hadoop/hdds/scm/container/ContainerInfo.java | 22 +++
.../container/AbstractContainerReportHandler.java | 86 +++++++---
.../org/apache/hadoop/hdds/scm/HddsTestUtils.java | 10 ++
.../scm/container/TestContainerReportHandler.java | 181 ++++++++++++++++++++-
4 files changed, 276 insertions(+), 23 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
index 7715341..1e2264b 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
@@ -168,6 +168,28 @@ public class ContainerInfo implements
Comparator<ContainerInfo>,
return pipelineID;
}
+ /**
+ * Returns the usedBytes for the container. The value returned is derived
+ * from the replicas reported from the datanodes for the container.
+ *
+ * The size of a container can change over time. For an open container we
+ * assume the size of the container will grow, and hence the value will be
+ * the maximum of the values reported from its replicas.
+ *
+ * A closed container can only reduce in size as its blocks are removed. For
+ * a closed container, the value will be the minimum of the values reported
+ * from its replicas.
+ *
+ * An EC container, is made from a group data and parity containers where the
+ * first data and all parity containers should be the same size. The other
+ * data containers can be smaller or the same size. When calculating the EC
+ * container size, we use the min / max of the first data and parity
+ * containers,ignoring the others. For EC containers, this value actually
+ * represents the size of the largest container in the container group,
rather
+ * than the total space used by all containers in the group.
+ *
+ * @return bytes used in the container.
+ */
public long getUsedBytes() {
return usedBytes;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index b40fd73..b46bc79 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.container;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
@@ -121,30 +122,77 @@ public class AbstractContainerReportHandler {
containerInfo.updateSequenceId(
replicaProto.getBlockCommitSequenceId());
}
+ if (containerInfo.getReplicationConfig().getReplicationType()
+ == HddsProtos.ReplicationType.EC) {
+ updateECContainerStats(containerInfo, replicaProto, datanodeDetails);
+ } else {
+ updateRatisContainerStats(containerInfo, replicaProto,
datanodeDetails);
+ }
+ }
+ }
+
+ private void updateRatisContainerStats(ContainerInfo containerInfo,
+ ContainerReplicaProto newReplica, DatanodeDetails newSource)
+ throws ContainerNotFoundException {
+ List<ContainerReplica> otherReplicas =
+ getOtherReplicas(containerInfo.containerID(), newSource);
+ long usedBytes = newReplica.getUsed();
+ long keyCount = newReplica.getKeyCount();
+ for (ContainerReplica r : otherReplicas) {
+ usedBytes = calculateUsage(containerInfo, usedBytes, r.getBytesUsed());
+ keyCount = calculateUsage(containerInfo, keyCount, r.getKeyCount());
+ }
+ updateContainerUsedAndKeys(containerInfo, usedBytes, keyCount);
+ }
+
+ private void updateECContainerStats(ContainerInfo containerInfo,
+ ContainerReplicaProto newReplica, DatanodeDetails newSource)
+ throws ContainerNotFoundException {
+ int dataNum =
+ ((ECReplicationConfig)containerInfo.getReplicationConfig()).getData();
+ // The first EC index and the parity indexes must all be the same size
+ // while the other data indexes may be smaller due to partial stripes.
+ // When calculating the stats, we only use the first data and parity and
+ // ignore the others. We only need to run the check if we are processing
+ // the first data or parity replicas.
+ if (newReplica.getReplicaIndex() == 1
+ || newReplica.getReplicaIndex() > dataNum) {
List<ContainerReplica> otherReplicas =
- getOtherReplicas(containerId, datanodeDetails);
- long usedBytes = replicaProto.getUsed();
- long keyCount = replicaProto.getKeyCount();
+ getOtherReplicas(containerInfo.containerID(), newSource);
+ long usedBytes = newReplica.getUsed();
+ long keyCount = newReplica.getKeyCount();
for (ContainerReplica r : otherReplicas) {
- // Open containers are generally growing in key count and size, the
- // overall size should be the min of all reported replicas.
- if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
- usedBytes = Math.min(usedBytes, r.getBytesUsed());
- keyCount = Math.min(keyCount, r.getKeyCount());
- } else {
- // Containers which are not open can only shrink in size, so use the
- // largest values reported.
- usedBytes = Math.max(usedBytes, r.getBytesUsed());
- keyCount = Math.max(keyCount, r.getKeyCount());
+ if (r.getReplicaIndex() > 1 && r.getReplicaIndex() <= dataNum) {
+ // Ignore all data replicas except the first for stats
+ continue;
}
+ usedBytes = calculateUsage(containerInfo, usedBytes, r.getBytesUsed());
+ keyCount = calculateUsage(containerInfo, keyCount, r.getKeyCount());
}
+ updateContainerUsedAndKeys(containerInfo, usedBytes, keyCount);
+ }
+ }
- if (containerInfo.getUsedBytes() != usedBytes) {
- containerInfo.setUsedBytes(usedBytes);
- }
- if (containerInfo.getNumberOfKeys() != keyCount) {
- containerInfo.setNumberOfKeys(keyCount);
- }
+ private long calculateUsage(ContainerInfo containerInfo, long lastValue,
+ long thisValue) {
+ if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
+ // Open containers are generally growing in key count and size, the
+ // overall size should be the min of all reported replicas.
+ return Math.min(lastValue, thisValue);
+ } else {
+ // Containers which are not open can only shrink in size, so use the
+ // largest values reported.
+ return Math.max(lastValue, thisValue);
+ }
+ }
+
+ private void updateContainerUsedAndKeys(ContainerInfo containerInfo,
+ long usedBytes, long keyCount) {
+ if (containerInfo.getUsedBytes() != usedBytes) {
+ containerInfo.setUsedBytes(usedBytes);
+ }
+ if (containerInfo.getNumberOfKeys() != keyCount) {
+ containerInfo.setNumberOfKeys(keyCount);
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
index feb58fc..fb45095 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
@@ -649,6 +650,15 @@ public final class HddsTestUtils {
.build();
}
+ public static ContainerInfo getECContainer(
+ final HddsProtos.LifeCycleState state, PipelineID pipelineID,
+ ECReplicationConfig replicationConfig) {
+ return getDefaultContainerInfoBuilder(state)
+ .setReplicationConfig(replicationConfig)
+ .setPipelineID(pipelineID)
+ .build();
+ }
+
public static Set<ContainerReplica> getReplicas(
final ContainerID containerId,
final ContainerReplicaProto.State state,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index b4cbf27..db86a54 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -53,13 +54,16 @@ import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static junit.framework.TestCase.assertEquals;
+import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
@@ -632,6 +636,158 @@ public class TestContainerReportHandler {
}
@Test
+ public void openECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
+ throws IOException {
+ final ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+ final ContainerReportHandler reportHandler = new ContainerReportHandler(
+ nodeManager, containerManager);
+
+ Pipeline pipeline = pipelineManager.createPipeline(repConfig);
+ Map<Integer, DatanodeDetails> dns = new HashMap<>();
+ final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+ NodeStatus.inServiceHealthy()).iterator();
+ for (int i = 1; i <= repConfig.getRequiredNodes(); i++) {
+ dns.put(i, nodeIterator.next());
+ }
+ final ContainerReplicaProto.State replicaState
+ = ContainerReplicaProto.State.OPEN;
+ final ContainerInfo containerOne =
+ getECContainer(LifeCycleState.OPEN, pipeline.getId(), repConfig);
+
+ containerStateManager.addContainer(containerOne.getProtobuf());
+ // Container loaded, no replicas reported from DNs. Expect zeros for
+ // usage values.
+ assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+
+ // Report from data index 2 - should not update stats
+ reportHandler.onMessage(getContainerReportFromDatanode(
+ containerOne.containerID(), replicaState,
+ dns.get(2), 50L, 60L, 2), publisher);
+ assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+
+ // Report from replica 1, it should update
+ reportHandler.onMessage(getContainerReportFromDatanode(
+ containerOne.containerID(), replicaState,
+ dns.get(1), 50L, 60L, 1), publisher);
+ assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+
+ // Parity 1 report a greater value, but as the container is own the stats
+ // should be the min value.
+ // Report from replica 1, it should update
+ reportHandler.onMessage(getContainerReportFromDatanode(
+ containerOne.containerID(), replicaState,
+ dns.get(4), 80L, 90L, 4), publisher);
+ assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+
+ // Parity 2 reports a lesser value, so the stored values should update
+ reportHandler.onMessage(getContainerReportFromDatanode(
+ containerOne.containerID(), replicaState,
+ dns.get(5), 40, 30, 5), publisher);
+ assertEquals(40L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(30L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+
+ // Report from data index 3 - should not update stats even though it has
+ // lesser values
+ reportHandler.onMessage(getContainerReportFromDatanode(
+ containerOne.containerID(), replicaState,
+ dns.get(2), 10L, 10L, 2), publisher);
+ assertEquals(40L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(30L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+ }
+
+ @Test
+ public void closedECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
+ throws IOException {
+ final ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+ final ContainerReportHandler reportHandler = new ContainerReportHandler(
+ nodeManager, containerManager);
+
+ Pipeline pipeline = pipelineManager.createPipeline(repConfig);
+ Map<Integer, DatanodeDetails> dns = new HashMap<>();
+ final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+ NodeStatus.inServiceHealthy()).iterator();
+ for (int i = 1; i <= repConfig.getRequiredNodes(); i++) {
+ dns.put(i, nodeIterator.next());
+ }
+ final ContainerReplicaProto.State replicaState
+ = ContainerReplicaProto.State.OPEN;
+ final ContainerInfo containerOne =
+ getECContainer(LifeCycleState.CLOSED, pipeline.getId(), repConfig);
+
+ containerStateManager.addContainer(containerOne.getProtobuf());
+ // Container loaded, no replicas reported from DNs. Expect zeros for
+ // usage values.
+ assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+
+ // Report from data index 2 - should not update stats
+ reportHandler.onMessage(getContainerReportFromDatanode(
+ containerOne.containerID(), replicaState,
+ dns.get(2), 50L, 60L, 2), publisher);
+ assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+
+ // Report from replica 1, it should update
+ reportHandler.onMessage(getContainerReportFromDatanode(
+ containerOne.containerID(), replicaState,
+ dns.get(1), 50L, 60L, 1), publisher);
+ assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+
+ // Parity 1 report a greater value, as the container is closed the stats
+ // should be the max value.
+ // Report from replica 1, it should update
+ reportHandler.onMessage(getContainerReportFromDatanode(
+ containerOne.containerID(), replicaState,
+ dns.get(4), 80L, 90L, 4), publisher);
+ assertEquals(80L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(90L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+
+ // Parity 2 reports a lesser value, so the stored values should not update
+ reportHandler.onMessage(getContainerReportFromDatanode(
+ containerOne.containerID(), replicaState,
+ dns.get(5), 40, 30, 5), publisher);
+ assertEquals(80L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(90L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+
+ // Report from data index 3 - should not update stats even though it has
+ // greater values
+ reportHandler.onMessage(getContainerReportFromDatanode(
+ containerOne.containerID(), replicaState,
+ dns.get(2), 110L, 120L, 2), publisher);
+ assertEquals(80L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(90L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
+ }
+
+ @Test
public void testStaleReplicaOfDeletedContainer() throws
NodeNotFoundException,
IOException {
@@ -653,7 +809,7 @@ public class TestContainerReportHandler {
// Expects the replica will be deleted.
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
- datanodeOne.getUuidString());
+ datanodeOne.getUuidString(), 0);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
@@ -668,8 +824,16 @@ public class TestContainerReportHandler {
private ContainerReportFromDatanode getContainerReportFromDatanode(
ContainerID containerId, ContainerReplicaProto.State state,
DatanodeDetails dn, long bytesUsed, long keyCount) {
+ return getContainerReportFromDatanode(containerId, state, dn, bytesUsed,
+ keyCount, 0);
+ }
+
+ private ContainerReportFromDatanode getContainerReportFromDatanode(
+ ContainerID containerId, ContainerReplicaProto.State state,
+ DatanodeDetails dn, long bytesUsed, long keyCount, int replicaIndex) {
ContainerReportsProto containerReport = getContainerReportsProto(
- containerId, state, dn.getUuidString(), bytesUsed, keyCount);
+ containerId, state, dn.getUuidString(), bytesUsed, keyCount,
+ replicaIndex);
return new ContainerReportFromDatanode(dn, containerReport);
}
@@ -678,12 +842,20 @@ public class TestContainerReportHandler {
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId) {
return getContainerReportsProto(containerId, state, originNodeId,
- 2000000000L, 100000000L);
+ 2000000000L, 100000000L, 0);
+ }
+
+ protected static ContainerReportsProto getContainerReportsProto(
+ final ContainerID containerId, final ContainerReplicaProto.State state,
+ final String originNodeId, int replicaIndex) {
+ return getContainerReportsProto(containerId, state, originNodeId,
+ 2000000000L, 100000000L, replicaIndex);
}
protected static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
- final String originNodeId, final long usedBytes, final long keyCount) {
+ final String originNodeId, final long usedBytes, final long keyCount,
+ final int replicaIndex) {
final ContainerReportsProto.Builder crBuilder =
ContainerReportsProto.newBuilder();
final ContainerReplicaProto replicaProto =
@@ -701,6 +873,7 @@ public class TestContainerReportHandler {
.setWriteBytes(2000000000L)
.setBlockCommitSequenceId(10000L)
.setDeleteTransactionId(0)
+ .setReplicaIndex(replicaIndex)
.build();
return crBuilder.addReports(replicaProto).build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]