This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e41ef2b11c HDDS-4368. SCM should avoid sending delete transactions for 
under-replicated containers (#5293)
e41ef2b11c is described below

commit e41ef2b11cba2bf16307ad69318e64384c1b70b1
Author: Aryan Gupta <[email protected]>
AuthorDate: Tue Oct 10 17:12:05 2023 +0530

    HDDS-4368. SCM should avoid sending delete transactions for 
under-replicated containers (#5293)
---
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 65 +++++++++++++---------
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 63 +++++++++++++++++----
 .../dev-support/findbugsExcludeFile.xml            |  2 +-
 ...llower.java => TestDeleteWithInAdequateDN.java} | 50 ++++-------------
 4 files changed, 102 insertions(+), 78 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 99075acee6..7db51a203b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
@@ -400,37 +402,40 @@ public class DeletedBlockLogImpl
   public void close() throws IOException {
   }
 
-  private void getTransaction(
-      DeletedBlocksTransaction tx,
+  private void getTransaction(DeletedBlocksTransaction tx,
       DatanodeDeletedBlockTransactions transactions,
-      Set<DatanodeDetails> dnList) {
-    try {
-      DeletedBlocksTransaction updatedTxn = DeletedBlocksTransaction
-          .newBuilder(tx)
-          .setCount(transactionToRetryCountMap.getOrDefault(tx.getTxID(), 0))
-          .build();
-      Set<ContainerReplica> replicas = containerManager
-          .getContainerReplicas(
-              ContainerID.valueOf(updatedTxn.getContainerID()));
-      for (ContainerReplica replica : replicas) {
-        UUID dnID = replica.getDatanodeDetails().getUuid();
-        if (!dnList.contains(replica.getDatanodeDetails())) {
-          continue;
-        }
-        Set<UUID> dnsWithTransactionCommitted =
-            transactionToDNsCommitMap.get(updatedTxn.getTxID());
-        if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted
-            .contains(dnID)) {
-          // Transaction need not be sent to dns which have
-          // already committed it
-          transactions.addTransactionToDN(dnID, updatedTxn);
-        }
+      Set<DatanodeDetails> dnList, Set<ContainerReplica> replicas) {
+    DeletedBlocksTransaction updatedTxn =
+        DeletedBlocksTransaction.newBuilder(tx)
+            .setCount(transactionToRetryCountMap.getOrDefault(tx.getTxID(), 0))
+            .build();
+    for (ContainerReplica replica : replicas) {
+      UUID dnID = replica.getDatanodeDetails().getUuid();
+      if (!dnList.contains(replica.getDatanodeDetails())) {
+        continue;
+      }
+      Set<UUID> dnsWithTransactionCommitted =
+          transactionToDNsCommitMap.get(updatedTxn.getTxID());
+      if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted
+          .contains(dnID)) {
+        // Transaction need not be sent to dns which have
+        // already committed it
+        transactions.addTransactionToDN(dnID, updatedTxn);
       }
-    } catch (IOException e) {
-      LOG.warn("Got container info error.", e);
     }
   }
 
+  private Boolean checkInadequateReplica(Set<ContainerReplica> replicas,
+      DeletedBlocksTransaction txn) throws ContainerNotFoundException {
+    ContainerInfo containerInfo = containerManager
+        .getContainer(ContainerID.valueOf(txn.getContainerID()));
+    ReplicationManager replicationManager =
+        scmContext.getScm().getReplicationManager();
+    ContainerHealthResult result = replicationManager
+        .getContainerReplicationHealth(containerInfo, replicas);
+    return result.getHealthState() != 
ContainerHealthResult.HealthState.HEALTHY;
+  }
+
   @Override
   public DatanodeDeletedBlockTransactions getTransactions(
       int blockDeletionLimit, Set<DatanodeDetails> dnList)
@@ -460,7 +465,13 @@ public class DeletedBlockLogImpl
               txIDs.add(txn.getTxID());
             } else if (txn.getCount() > -1 && txn.getCount() <= maxRetry
                 && !containerManager.getContainer(id).isOpen()) {
-              getTransaction(txn, transactions, dnList);
+              Set<ContainerReplica> replicas = containerManager
+                  .getContainerReplicas(
+                      ContainerID.valueOf(txn.getContainerID()));
+              if (checkInadequateReplica(replicas, txn)) {
+                continue;
+              }
+              getTransaction(txn, transactions, dnList, replicas);
               transactionToDNsCommitMap
                   .putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
             }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index e519d7c56e..e1d15146d0 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -30,10 +30,14 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferStub;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -71,6 +75,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.when;
@@ -95,6 +100,8 @@ public class TestDeletedBlockLog {
   private static final int THREE = ReplicationFactor.THREE_VALUE;
   private static final int ONE = ReplicationFactor.ONE_VALUE;
 
+  private ReplicationManager replicationManager;
+
   @BeforeEach
   public void setup() throws Exception {
     testDir = GenericTestUtils.getTestDir(
@@ -103,7 +110,11 @@ public class TestDeletedBlockLog {
     conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
     conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
-    scm = HddsTestUtils.getScm(conf);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    SCMConfigurator configurator = new SCMConfigurator();
+    configurator.setSCMHAManager(SCMHAManagerStub.getInstance(true));
+    configurator.setReplicationManager(replicationManager);
+    scm = HddsTestUtils.getScm(conf, configurator);
     containerManager = Mockito.mock(ContainerManager.class);
     containerTable = scm.getScmMetadataStore().getContainerTable();
     scmHADBTransactionBuffer =
@@ -326,6 +337,7 @@ public class TestDeletedBlockLog {
     scmHADBTransactionBuffer.flush();
     // After flush there should be 30 transactions in deleteTable
     // All containers should have positive deleteTransactionId
+    mockContainerHealthResult(true);
     Assertions.assertEquals(30 * THREE, getAllTransactions().size());
     for (ContainerInfo containerInfo : containerManager.getContainers()) {
       Assertions.assertTrue(containerInfo.getDeleteTransactionId() > 0);
@@ -338,6 +350,7 @@ public class TestDeletedBlockLog {
 
     // Create 30 TXs in the log.
     addTransactions(generateData(30), true);
+    mockContainerHealthResult(true);
 
     // This will return all TXs, total num 30.
     List<DeletedBlocksTransaction> blocks = getAllTransactions();
@@ -370,12 +383,24 @@ public class TestDeletedBlockLog {
     Assertions.assertEquals(0, blocks.size());
   }
 
+  private void mockContainerHealthResult(Boolean healthy) {
+    ContainerInfo containerInfo = Mockito.mock(ContainerInfo.class);
+    ContainerHealthResult healthResult =
+        new ContainerHealthResult.HealthyResult(containerInfo);
+    if (!healthy) {
+      healthResult = new ContainerHealthResult.UnHealthyResult(containerInfo);
+    }
+    Mockito.doReturn(healthResult).when(replicationManager)
+        .getContainerReplicationHealth(any(), any());
+  }
+
   @Test
   public void testResetCount() throws Exception {
     int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
 
     // Create 30 TXs in the log.
     addTransactions(generateData(30), true);
+    mockContainerHealthResult(true);
 
     // This will return all TXs, total num 30.
     List<DeletedBlocksTransaction> blocks = getAllTransactions();
@@ -418,6 +443,7 @@ public class TestDeletedBlockLog {
   @Test
   public void testCommitTransactions() throws Exception {
     addTransactions(generateData(50), true);
+    mockContainerHealthResult(true);
     List<DeletedBlocksTransaction> blocks =
         getTransactions(20 * BLOCKS_PER_TXN * THREE);
     // Add an invalid txn.
@@ -445,11 +471,12 @@ public class TestDeletedBlockLog {
   public void testDNOnlyOneNodeHealthy() throws Exception {
     Map<Long, List<Long>> deletedBlocks = generateData(50);
     addTransactions(deletedBlocks, true);
+    mockContainerHealthResult(false);
     DatanodeDeletedBlockTransactions transactions
         = deletedBlockLog.getTransactions(
         30 * BLOCKS_PER_TXN * THREE,
         dnList.subList(0, 1).stream().collect(Collectors.toSet()));
-    Assertions.assertEquals(1, 
transactions.getDatanodeTransactionMap().size());
+    Assertions.assertEquals(0, 
transactions.getDatanodeTransactionMap().size());
   }
 
   @Test
@@ -458,14 +485,11 @@ public class TestDeletedBlockLog {
     addTransactions(deletedBlocks, true);
     long containerID;
     // let the first 30 container only consisting of only two unhealthy 
replicas
-    int count = 30;
-    for (Map.Entry<Long, List<Long>> entry :deletedBlocks.entrySet()) {
-      if (count <= 0) {
-        break;
-      }
+    int count = 0;
+    for (Map.Entry<Long, List<Long>> entry : deletedBlocks.entrySet()) {
       containerID = entry.getKey();
-      mockInadequateReplicaUnhealthyContainerInfo(containerID);
-      count -= 1;
+      mockInadequateReplicaUnhealthyContainerInfo(containerID, count);
+      count += 1;
     }
     // getTransactions will get existing container replicas then add 
transaction
     // to DN.
@@ -478,11 +502,16 @@ public class TestDeletedBlockLog {
 
     // The rest txn shall be: 41-50. 41-50. 41-50
     List<DeletedBlocksTransaction> blocks = getAllTransactions();
-    Assertions.assertEquals(30, blocks.size());
+    // First 30 txns aren't considered for deletion as they don't have required
+    // container replica's so getAllTransactions() won't be able to fetch them
+    // and rest 20 txns are already committed and removed so in total
+    // getAllTransactions() will fetch 0 txns.
+    Assertions.assertEquals(0, blocks.size());
   }
 
   @Test
   public void testRandomOperateTransactions() throws Exception {
+    mockContainerHealthResult(true);
     Random random = new Random();
     int added = 0, committed = 0;
     List<DeletedBlocksTransaction> blocks = new ArrayList<>();
@@ -522,6 +551,7 @@ public class TestDeletedBlockLog {
   @Test
   public void testPersistence() throws Exception {
     addTransactions(generateData(50), true);
+    mockContainerHealthResult(true);
     // close db and reopen it again to make sure
     // transactions are stored persistently.
     deletedBlockLog.close();
@@ -560,6 +590,7 @@ public class TestDeletedBlockLog {
   @Test
   public void testDeletedBlockTransactions()
       throws IOException, TimeoutException {
+    mockContainerHealthResult(true);
     int txNum = 10;
     List<DeletedBlocksTransaction> blocks;
     DatanodeDetails dnId1 = dnList.get(0), dnId2 = dnList.get(1);
@@ -647,8 +678,8 @@ public class TestDeletedBlockLog {
         .thenReturn(replicaSet);
   }
 
-  private void mockInadequateReplicaUnhealthyContainerInfo(long containerID)
-      throws IOException {
+  private void mockInadequateReplicaUnhealthyContainerInfo(long containerID,
+      int count) throws IOException {
     List<DatanodeDetails> dns = dnList.subList(0, 2);
     Pipeline pipeline = Pipeline.newBuilder()
         .setReplicationConfig(
@@ -674,6 +705,14 @@ public class TestDeletedBlockLog {
             .setDatanodeDetails(datanodeDetails)
             .build())
         .collect(Collectors.toSet());
+    ContainerHealthResult healthResult;
+    if (count < 30) {
+      healthResult = new ContainerHealthResult.UnHealthyResult(containerInfo);
+    } else {
+      healthResult = new ContainerHealthResult.HealthyResult(containerInfo);
+    }
+    Mockito.doReturn(healthResult).when(replicationManager)
+        .getContainerReplicationHealth(containerInfo, replicaSet);
     when(containerManager.getContainerReplicas(
         ContainerID.valueOf(containerID)))
         .thenReturn(replicaSet);
diff --git a/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml 
b/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
index 9436840c9c..aa8b0991db 100644
--- a/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
@@ -134,7 +134,7 @@
     <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
   </Match>
   <Match>
-    <Class 
name="org.apache.hadoop.ozone.client.rpc.TestDeleteWithSlowFollower"/>
+    <Class 
name="org.apache.hadoop.ozone.client.rpc.TestDeleteWithInAdequateDN"/>
     <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
   </Match>
   <Match>
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithInAdequateDN.java
similarity index 87%
rename from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
rename to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithInAdequateDN.java
index ac4e11dd83..131ce70553 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithInAdequateDN.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.HddsDatanodeService;
@@ -55,7 +54,6 @@ import 
org.apache.hadoop.ozone.container.common.interfaces.Container;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -77,10 +75,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
- * Tests delete key operation with a slow follower in the datanode
- * pipeline.
+ * Tests delete key operation with inadequate datanodes.
  */
-public class TestDeleteWithSlowFollower {
+public class TestDeleteWithInAdequateDN {
 
   private static MiniOzoneCluster cluster;
   private static OzoneConfiguration conf;
@@ -124,7 +121,6 @@ public class TestDeleteWithSlowFollower {
 
     DatanodeRatisServerConfig ratisServerConfig =
         conf.getObject(DatanodeRatisServerConfig.class);
-    ratisServerConfig.setFollowerSlownessTimeout(Duration.ofSeconds(1000));
     ratisServerConfig.setNoLeaderTimeout(Duration.ofSeconds(1000));
     ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
     ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3));
@@ -189,19 +185,17 @@ public class TestDeleteWithSlowFollower {
   }
 
   /**
-   * The test simulates a slow follower by first writing key thereby creating a
-   * a container on 3 dns of the cluster. Then, a dn is shutdown and a close
-   * container cmd gets issued so that in the leader and the alive follower,
-   * container gets closed. And then, key is deleted and
-   * the node is started up again so that it
-   * rejoins the ring and starts applying the transaction from where it left
-   * by fetching the entries from the leader. Until and unless this follower
-   * catches up and its replica gets closed,
-   * the data is not deleted from any of the nodes which have the
-   * closed replica.
+   * The test simulates an inadequate DN scenario by first writing key thereby
+   * creating a container on 3 dns of the cluster. Then, a dn is shutdown and a
+   * close container cmd gets issued so that in the leader and the alive
+   * follower, container gets closed. And then, key is deleted and the node is
+   * started up again so that it rejoins the ring and starts applying the
+   * transaction from where it left by fetching the entries from the leader.
+   * Until and unless this follower catches up and its replica gets closed, the
+   * data is not deleted from any of the nodes which have the closed replica.
    */
   @Test
-  public void testDeleteKeyWithSlowFollower() throws Exception {
+  public void testDeleteKeyWithInAdequateDN() throws Exception {
     String keyName = "ratis";
     OzoneOutputStream key =
         objectStore.getVolume(volumeName).getBucket(bucketName)
@@ -220,7 +214,7 @@ public class TestDeleteWithSlowFollower {
     OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
     long containerID = omKeyLocationInfo.getContainerID();
     // A container is created on the datanode. Now figure out a follower node 
to
-    // kill/slow down.
+    // kill.
     HddsDatanodeService follower = null;
     HddsDatanodeService leader = null;
 
@@ -279,31 +273,11 @@ public class TestDeleteWithSlowFollower {
             .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
     Container container =
         
ozoneContainer.getContainerSet().getContainer(blockID.getContainerID());
-    KeyValueContainerData containerData =
-        ((KeyValueContainerData) container.getContainerData());
-    long delTrxId = containerData.getDeleteTransactionId();
-    long numPendingDeletionBlocks = 
containerData.getNumPendingDeletionBlocks();
     BlockData blockData =
         keyValueHandler.getBlockManager().getBlock(container, blockID);
     //cluster.getOzoneManager().deleteKey(keyArgs);
     client.getObjectStore().getVolume(volumeName).getBucket(bucketName).
             deleteKey("ratis");
-    GenericTestUtils.waitFor(() -> {
-      try {
-        if (SCMHAUtils.isSCMHAEnabled(cluster.getConf())) {
-          cluster.getStorageContainerManager().getScmHAManager()
-              .asSCMHADBTransactionBuffer().flush();
-        }
-        return
-            dnStateMachine.getCommandDispatcher()
-                .getDeleteBlocksCommandHandler().getInvocationCount() >= 1;
-      } catch (IOException e) {
-        return false;
-      }
-    }, 500, 100000);
-    Assert.assertTrue(containerData.getDeleteTransactionId() > delTrxId);
-    Assert.assertTrue(
-        containerData.getNumPendingDeletionBlocks() > 
numPendingDeletionBlocks);
     // make sure the chunk was never deleted on the leader even though
     // deleteBlock handler is invoked
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to