This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 9680edf  HDDS-6384. EC: Ensure EC container usage is updated correctly 
when handling reports (#3147)
9680edf is described below

commit 9680edf850b88aa4598d6feb455618d0d3d31db7
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Fri Mar 4 19:30:56 2022 +0000

    HDDS-6384. EC: Ensure EC container usage is updated correctly when handling 
reports (#3147)
---
 .../hadoop/hdds/scm/container/ContainerInfo.java   |  22 +++
 .../container/AbstractContainerReportHandler.java  |  86 +++++++---
 .../org/apache/hadoop/hdds/scm/HddsTestUtils.java  |  10 ++
 .../scm/container/TestContainerReportHandler.java  | 181 ++++++++++++++++++++-
 4 files changed, 276 insertions(+), 23 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
index 7715341..1e2264b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
@@ -168,6 +168,28 @@ public class ContainerInfo implements 
Comparator<ContainerInfo>,
     return pipelineID;
   }
 
+  /**
+   * Returns the usedBytes for the container. The value returned is derived
+   * from the replicas reported from the datanodes for the container.
+   *
+   * The size of a container can change over time. For an open container we
+   * assume the size of the container will grow, and hence the value will be
+   * the maximum of the values reported from its replicas.
+   *
+   * A closed container can only reduce in size as its blocks are removed. For
+   * a closed container, the value will be the minimum of the values reported
+   * from its replicas.
+   *
+   * An EC container, is made from a group data and parity containers where the
+   * first data and all parity containers should be the same size. The other
+   * data containers can be smaller or the same size. When calculating the EC
+   * container size, we use the min / max of the first data and parity
+   * containers,ignoring the others. For EC containers, this value actually
+   * represents the size of the largest container in the container group, 
rather
+   * than the total space used by all containers in the group.
+   *
+   * @return bytes used in the container.
+   */
   public long getUsedBytes() {
     return usedBytes;
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index b40fd73..b46bc79 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
@@ -121,30 +122,77 @@ public class AbstractContainerReportHandler {
         containerInfo.updateSequenceId(
             replicaProto.getBlockCommitSequenceId());
       }
+      if (containerInfo.getReplicationConfig().getReplicationType()
+          == HddsProtos.ReplicationType.EC) {
+        updateECContainerStats(containerInfo, replicaProto, datanodeDetails);
+      } else {
+        updateRatisContainerStats(containerInfo, replicaProto, 
datanodeDetails);
+      }
+    }
+  }
+
+  private void updateRatisContainerStats(ContainerInfo containerInfo,
+      ContainerReplicaProto newReplica, DatanodeDetails newSource)
+      throws ContainerNotFoundException {
+    List<ContainerReplica> otherReplicas =
+        getOtherReplicas(containerInfo.containerID(), newSource);
+    long usedBytes = newReplica.getUsed();
+    long keyCount = newReplica.getKeyCount();
+    for (ContainerReplica r : otherReplicas) {
+      usedBytes = calculateUsage(containerInfo, usedBytes, r.getBytesUsed());
+      keyCount = calculateUsage(containerInfo, keyCount, r.getKeyCount());
+    }
+    updateContainerUsedAndKeys(containerInfo, usedBytes, keyCount);
+  }
+
+  private void updateECContainerStats(ContainerInfo containerInfo,
+      ContainerReplicaProto newReplica, DatanodeDetails newSource)
+      throws ContainerNotFoundException {
+    int dataNum =
+        ((ECReplicationConfig)containerInfo.getReplicationConfig()).getData();
+    // The first EC index and the parity indexes must all be the same size
+    // while the other data indexes may be smaller due to partial stripes.
+    // When calculating the stats, we only use the first data and parity and
+    // ignore the others. We only need to run the check if we are processing
+    // the first data or parity replicas.
+    if (newReplica.getReplicaIndex() == 1
+        || newReplica.getReplicaIndex() > dataNum) {
       List<ContainerReplica> otherReplicas =
-          getOtherReplicas(containerId, datanodeDetails);
-      long usedBytes = replicaProto.getUsed();
-      long keyCount = replicaProto.getKeyCount();
+          getOtherReplicas(containerInfo.containerID(), newSource);
+      long usedBytes = newReplica.getUsed();
+      long keyCount = newReplica.getKeyCount();
       for (ContainerReplica r : otherReplicas) {
-        // Open containers are generally growing in key count and size, the
-        // overall size should be the min of all reported replicas.
-        if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
-          usedBytes = Math.min(usedBytes, r.getBytesUsed());
-          keyCount = Math.min(keyCount, r.getKeyCount());
-        } else {
-          // Containers which are not open can only shrink in size, so use the
-          // largest values reported.
-          usedBytes = Math.max(usedBytes, r.getBytesUsed());
-          keyCount = Math.max(keyCount, r.getKeyCount());
+        if (r.getReplicaIndex() > 1 && r.getReplicaIndex() <= dataNum) {
+          // Ignore all data replicas except the first for stats
+          continue;
         }
+        usedBytes = calculateUsage(containerInfo, usedBytes, r.getBytesUsed());
+        keyCount = calculateUsage(containerInfo, keyCount, r.getKeyCount());
       }
+      updateContainerUsedAndKeys(containerInfo, usedBytes, keyCount);
+    }
+  }
 
-      if (containerInfo.getUsedBytes() != usedBytes) {
-        containerInfo.setUsedBytes(usedBytes);
-      }
-      if (containerInfo.getNumberOfKeys() != keyCount) {
-        containerInfo.setNumberOfKeys(keyCount);
-      }
+  private long calculateUsage(ContainerInfo containerInfo, long lastValue,
+      long thisValue) {
+    if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
+      // Open containers are generally growing in key count and size, the
+      // overall size should be the min of all reported replicas.
+      return Math.min(lastValue, thisValue);
+    } else {
+      // Containers which are not open can only shrink in size, so use the
+      // largest values reported.
+      return Math.max(lastValue, thisValue);
+    }
+  }
+
+  private void updateContainerUsedAndKeys(ContainerInfo containerInfo,
+      long usedBytes, long keyCount) {
+    if (containerInfo.getUsedBytes() != usedBytes) {
+      containerInfo.setUsedBytes(usedBytes);
+    }
+    if (containerInfo.getNumberOfKeys() != keyCount) {
+      containerInfo.setNumberOfKeys(keyCount);
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
index feb58fc..fb45095 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
@@ -649,6 +650,15 @@ public final class HddsTestUtils {
         .build();
   }
 
+  public static ContainerInfo getECContainer(
+      final HddsProtos.LifeCycleState state, PipelineID pipelineID,
+      ECReplicationConfig replicationConfig) {
+    return getDefaultContainerInfoBuilder(state)
+        .setReplicationConfig(replicationConfig)
+        .setPipelineID(pipelineID)
+        .build();
+  }
+
   public static Set<ContainerReplica> getReplicas(
       final ContainerID containerId,
       final ContainerReplicaProto.State state,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index b4cbf27..db86a54 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -53,13 +54,16 @@ import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static junit.framework.TestCase.assertEquals;
+import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer;
 import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
 import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
 
@@ -632,6 +636,158 @@ public class TestContainerReportHandler {
   }
 
   @Test
+  public void openECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
+      throws IOException {
+    final ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+    final ContainerReportHandler reportHandler = new ContainerReportHandler(
+        nodeManager, containerManager);
+
+    Pipeline pipeline = pipelineManager.createPipeline(repConfig);
+    Map<Integer, DatanodeDetails> dns = new HashMap<>();
+    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+        NodeStatus.inServiceHealthy()).iterator();
+    for (int i = 1; i <= repConfig.getRequiredNodes(); i++) {
+      dns.put(i, nodeIterator.next());
+    }
+    final ContainerReplicaProto.State replicaState
+        = ContainerReplicaProto.State.OPEN;
+    final ContainerInfo containerOne =
+        getECContainer(LifeCycleState.OPEN, pipeline.getId(), repConfig);
+
+    containerStateManager.addContainer(containerOne.getProtobuf());
+    // Container loaded, no replicas reported from DNs. Expect zeros for
+    // usage values.
+    assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+
+    // Report from data index 2 - should not update stats
+    reportHandler.onMessage(getContainerReportFromDatanode(
+        containerOne.containerID(), replicaState,
+        dns.get(2), 50L, 60L, 2), publisher);
+    assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+
+    // Report from replica 1, it should update
+    reportHandler.onMessage(getContainerReportFromDatanode(
+        containerOne.containerID(), replicaState,
+        dns.get(1), 50L, 60L, 1), publisher);
+    assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+
+    // Parity 1 report a greater value, but as the container is own the stats
+    // should be the min value.
+    // Report from replica 1, it should update
+    reportHandler.onMessage(getContainerReportFromDatanode(
+        containerOne.containerID(), replicaState,
+        dns.get(4), 80L, 90L, 4), publisher);
+    assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+
+    // Parity 2 reports a lesser value, so the stored values should update
+    reportHandler.onMessage(getContainerReportFromDatanode(
+        containerOne.containerID(), replicaState,
+        dns.get(5), 40, 30, 5), publisher);
+    assertEquals(40L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(30L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+
+    // Report from data index 3 - should not update stats even though it has
+    // lesser values
+    reportHandler.onMessage(getContainerReportFromDatanode(
+        containerOne.containerID(), replicaState,
+        dns.get(2), 10L, 10L, 2), publisher);
+    assertEquals(40L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(30L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+  }
+
+  @Test
+  public void closedECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
+      throws IOException {
+    final ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+    final ContainerReportHandler reportHandler = new ContainerReportHandler(
+        nodeManager, containerManager);
+
+    Pipeline pipeline = pipelineManager.createPipeline(repConfig);
+    Map<Integer, DatanodeDetails> dns = new HashMap<>();
+    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
+        NodeStatus.inServiceHealthy()).iterator();
+    for (int i = 1; i <= repConfig.getRequiredNodes(); i++) {
+      dns.put(i, nodeIterator.next());
+    }
+    final ContainerReplicaProto.State replicaState
+        = ContainerReplicaProto.State.OPEN;
+    final ContainerInfo containerOne =
+        getECContainer(LifeCycleState.CLOSED, pipeline.getId(), repConfig);
+
+    containerStateManager.addContainer(containerOne.getProtobuf());
+    // Container loaded, no replicas reported from DNs. Expect zeros for
+    // usage values.
+    assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+
+    // Report from data index 2 - should not update stats
+    reportHandler.onMessage(getContainerReportFromDatanode(
+        containerOne.containerID(), replicaState,
+        dns.get(2), 50L, 60L, 2), publisher);
+    assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+
+    // Report from replica 1, it should update
+    reportHandler.onMessage(getContainerReportFromDatanode(
+        containerOne.containerID(), replicaState,
+        dns.get(1), 50L, 60L, 1), publisher);
+    assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+
+    // Parity 1 report a greater value, as the container is closed the stats
+    // should be the max value.
+    // Report from replica 1, it should update
+    reportHandler.onMessage(getContainerReportFromDatanode(
+        containerOne.containerID(), replicaState,
+        dns.get(4), 80L, 90L, 4), publisher);
+    assertEquals(80L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(90L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+
+    // Parity 2 reports a lesser value, so the stored values should not update
+    reportHandler.onMessage(getContainerReportFromDatanode(
+        containerOne.containerID(), replicaState,
+        dns.get(5), 40, 30, 5), publisher);
+    assertEquals(80L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(90L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+
+    // Report from data index 3 - should not update stats even though it has
+    // greater values
+    reportHandler.onMessage(getContainerReportFromDatanode(
+        containerOne.containerID(), replicaState,
+        dns.get(2), 110L, 120L, 2), publisher);
+    assertEquals(80L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(90L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
+  }
+
+  @Test
   public void testStaleReplicaOfDeletedContainer() throws 
NodeNotFoundException,
       IOException {
 
@@ -653,7 +809,7 @@ public class TestContainerReportHandler {
     // Expects the replica will be deleted.
     final ContainerReportsProto containerReport = getContainerReportsProto(
         containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
-        datanodeOne.getUuidString());
+        datanodeOne.getUuidString(), 0);
     final ContainerReportFromDatanode containerReportFromDatanode =
         new ContainerReportFromDatanode(datanodeOne, containerReport);
     reportHandler.onMessage(containerReportFromDatanode, publisher);
@@ -668,8 +824,16 @@ public class TestContainerReportHandler {
   private ContainerReportFromDatanode getContainerReportFromDatanode(
       ContainerID containerId, ContainerReplicaProto.State state,
       DatanodeDetails dn, long bytesUsed, long keyCount) {
+    return getContainerReportFromDatanode(containerId, state, dn, bytesUsed,
+        keyCount, 0);
+  }
+
+  private ContainerReportFromDatanode getContainerReportFromDatanode(
+      ContainerID containerId, ContainerReplicaProto.State state,
+      DatanodeDetails dn, long bytesUsed, long keyCount, int replicaIndex) {
     ContainerReportsProto containerReport = getContainerReportsProto(
-        containerId, state, dn.getUuidString(), bytesUsed, keyCount);
+        containerId, state, dn.getUuidString(), bytesUsed, keyCount,
+        replicaIndex);
 
     return new ContainerReportFromDatanode(dn, containerReport);
   }
@@ -678,12 +842,20 @@ public class TestContainerReportHandler {
       final ContainerID containerId, final ContainerReplicaProto.State state,
       final String originNodeId) {
     return getContainerReportsProto(containerId, state, originNodeId,
-        2000000000L, 100000000L);
+        2000000000L, 100000000L, 0);
+  }
+
+  protected static ContainerReportsProto getContainerReportsProto(
+      final ContainerID containerId, final ContainerReplicaProto.State state,
+      final String originNodeId, int replicaIndex) {
+    return getContainerReportsProto(containerId, state, originNodeId,
+        2000000000L, 100000000L, replicaIndex);
   }
 
   protected static ContainerReportsProto getContainerReportsProto(
       final ContainerID containerId, final ContainerReplicaProto.State state,
-      final String originNodeId, final long usedBytes, final long keyCount) {
+      final String originNodeId, final long usedBytes, final long keyCount,
+      final int replicaIndex) {
     final ContainerReportsProto.Builder crBuilder =
         ContainerReportsProto.newBuilder();
     final ContainerReplicaProto replicaProto =
@@ -701,6 +873,7 @@ public class TestContainerReportHandler {
             .setWriteBytes(2000000000L)
             .setBlockCommitSequenceId(10000L)
             .setDeleteTransactionId(0)
+            .setReplicaIndex(replicaIndex)
             .build();
     return crBuilder.addReports(replicaProto).build();
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to