This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e41ef2b11c HDDS-4368. SCM should avoid sending delete transactions for
under-replicated containers (#5293)
e41ef2b11c is described below
commit e41ef2b11cba2bf16307ad69318e64384c1b70b1
Author: Aryan Gupta <[email protected]>
AuthorDate: Tue Oct 10 17:12:05 2023 +0530
HDDS-4368. SCM should avoid sending delete transactions for
under-replicated containers (#5293)
---
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 65 +++++++++++++---------
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 63 +++++++++++++++++----
.../dev-support/findbugsExcludeFile.xml | 2 +-
...llower.java => TestDeleteWithInAdequateDN.java} | 50 ++++-------------
4 files changed, 102 insertions(+), 78 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 99075acee6..7db51a203b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
@@ -400,37 +402,40 @@ public class DeletedBlockLogImpl
public void close() throws IOException {
}
- private void getTransaction(
- DeletedBlocksTransaction tx,
+ private void getTransaction(DeletedBlocksTransaction tx,
DatanodeDeletedBlockTransactions transactions,
- Set<DatanodeDetails> dnList) {
- try {
- DeletedBlocksTransaction updatedTxn = DeletedBlocksTransaction
- .newBuilder(tx)
- .setCount(transactionToRetryCountMap.getOrDefault(tx.getTxID(), 0))
- .build();
- Set<ContainerReplica> replicas = containerManager
- .getContainerReplicas(
- ContainerID.valueOf(updatedTxn.getContainerID()));
- for (ContainerReplica replica : replicas) {
- UUID dnID = replica.getDatanodeDetails().getUuid();
- if (!dnList.contains(replica.getDatanodeDetails())) {
- continue;
- }
- Set<UUID> dnsWithTransactionCommitted =
- transactionToDNsCommitMap.get(updatedTxn.getTxID());
- if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted
- .contains(dnID)) {
- // Transaction need not be sent to dns which have
- // already committed it
- transactions.addTransactionToDN(dnID, updatedTxn);
- }
+ Set<DatanodeDetails> dnList, Set<ContainerReplica> replicas) {
+ DeletedBlocksTransaction updatedTxn =
+ DeletedBlocksTransaction.newBuilder(tx)
+ .setCount(transactionToRetryCountMap.getOrDefault(tx.getTxID(), 0))
+ .build();
+ for (ContainerReplica replica : replicas) {
+ UUID dnID = replica.getDatanodeDetails().getUuid();
+ if (!dnList.contains(replica.getDatanodeDetails())) {
+ continue;
+ }
+ Set<UUID> dnsWithTransactionCommitted =
+ transactionToDNsCommitMap.get(updatedTxn.getTxID());
+ if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted
+ .contains(dnID)) {
+ // Transaction need not be sent to dns which have
+ // already committed it
+ transactions.addTransactionToDN(dnID, updatedTxn);
}
- } catch (IOException e) {
- LOG.warn("Got container info error.", e);
}
}
+ private Boolean checkInadequateReplica(Set<ContainerReplica> replicas,
+ DeletedBlocksTransaction txn) throws ContainerNotFoundException {
+ ContainerInfo containerInfo = containerManager
+ .getContainer(ContainerID.valueOf(txn.getContainerID()));
+ ReplicationManager replicationManager =
+ scmContext.getScm().getReplicationManager();
+ ContainerHealthResult result = replicationManager
+ .getContainerReplicationHealth(containerInfo, replicas);
+ return result.getHealthState() !=
ContainerHealthResult.HealthState.HEALTHY;
+ }
+
@Override
public DatanodeDeletedBlockTransactions getTransactions(
int blockDeletionLimit, Set<DatanodeDetails> dnList)
@@ -460,7 +465,13 @@ public class DeletedBlockLogImpl
txIDs.add(txn.getTxID());
} else if (txn.getCount() > -1 && txn.getCount() <= maxRetry
&& !containerManager.getContainer(id).isOpen()) {
- getTransaction(txn, transactions, dnList);
+ Set<ContainerReplica> replicas = containerManager
+ .getContainerReplicas(
+ ContainerID.valueOf(txn.getContainerID()));
+ if (checkInadequateReplica(replicas, txn)) {
+ continue;
+ }
+ getTransaction(txn, transactions, dnList, replicas);
transactionToDNsCommitMap
.putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index e519d7c56e..e1d15146d0 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -30,10 +30,14 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferStub;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -71,6 +75,7 @@ import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@@ -95,6 +100,8 @@ public class TestDeletedBlockLog {
private static final int THREE = ReplicationFactor.THREE_VALUE;
private static final int ONE = ReplicationFactor.ONE_VALUE;
+ private ReplicationManager replicationManager;
+
@BeforeEach
public void setup() throws Exception {
testDir = GenericTestUtils.getTestDir(
@@ -103,7 +110,11 @@ public class TestDeletedBlockLog {
conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
- scm = HddsTestUtils.getScm(conf);
+ replicationManager = Mockito.mock(ReplicationManager.class);
+ SCMConfigurator configurator = new SCMConfigurator();
+ configurator.setSCMHAManager(SCMHAManagerStub.getInstance(true));
+ configurator.setReplicationManager(replicationManager);
+ scm = HddsTestUtils.getScm(conf, configurator);
containerManager = Mockito.mock(ContainerManager.class);
containerTable = scm.getScmMetadataStore().getContainerTable();
scmHADBTransactionBuffer =
@@ -326,6 +337,7 @@ public class TestDeletedBlockLog {
scmHADBTransactionBuffer.flush();
// After flush there should be 30 transactions in deleteTable
// All containers should have positive deleteTransactionId
+ mockContainerHealthResult(true);
Assertions.assertEquals(30 * THREE, getAllTransactions().size());
for (ContainerInfo containerInfo : containerManager.getContainers()) {
Assertions.assertTrue(containerInfo.getDeleteTransactionId() > 0);
@@ -338,6 +350,7 @@ public class TestDeletedBlockLog {
// Create 30 TXs in the log.
addTransactions(generateData(30), true);
+ mockContainerHealthResult(true);
// This will return all TXs, total num 30.
List<DeletedBlocksTransaction> blocks = getAllTransactions();
@@ -370,12 +383,24 @@ public class TestDeletedBlockLog {
Assertions.assertEquals(0, blocks.size());
}
+ private void mockContainerHealthResult(Boolean healthy) {
+ ContainerInfo containerInfo = Mockito.mock(ContainerInfo.class);
+ ContainerHealthResult healthResult =
+ new ContainerHealthResult.HealthyResult(containerInfo);
+ if (!healthy) {
+ healthResult = new ContainerHealthResult.UnHealthyResult(containerInfo);
+ }
+ Mockito.doReturn(healthResult).when(replicationManager)
+ .getContainerReplicationHealth(any(), any());
+ }
+
@Test
public void testResetCount() throws Exception {
int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
// Create 30 TXs in the log.
addTransactions(generateData(30), true);
+ mockContainerHealthResult(true);
// This will return all TXs, total num 30.
List<DeletedBlocksTransaction> blocks = getAllTransactions();
@@ -418,6 +443,7 @@ public class TestDeletedBlockLog {
@Test
public void testCommitTransactions() throws Exception {
addTransactions(generateData(50), true);
+ mockContainerHealthResult(true);
List<DeletedBlocksTransaction> blocks =
getTransactions(20 * BLOCKS_PER_TXN * THREE);
// Add an invalid txn.
@@ -445,11 +471,12 @@ public class TestDeletedBlockLog {
public void testDNOnlyOneNodeHealthy() throws Exception {
Map<Long, List<Long>> deletedBlocks = generateData(50);
addTransactions(deletedBlocks, true);
+ mockContainerHealthResult(false);
DatanodeDeletedBlockTransactions transactions
= deletedBlockLog.getTransactions(
30 * BLOCKS_PER_TXN * THREE,
dnList.subList(0, 1).stream().collect(Collectors.toSet()));
- Assertions.assertEquals(1,
transactions.getDatanodeTransactionMap().size());
+ Assertions.assertEquals(0,
transactions.getDatanodeTransactionMap().size());
}
@Test
@@ -458,14 +485,11 @@ public class TestDeletedBlockLog {
addTransactions(deletedBlocks, true);
long containerID;
// let the first 30 container only consisting of only two unhealthy
replicas
- int count = 30;
- for (Map.Entry<Long, List<Long>> entry :deletedBlocks.entrySet()) {
- if (count <= 0) {
- break;
- }
+ int count = 0;
+ for (Map.Entry<Long, List<Long>> entry : deletedBlocks.entrySet()) {
containerID = entry.getKey();
- mockInadequateReplicaUnhealthyContainerInfo(containerID);
- count -= 1;
+ mockInadequateReplicaUnhealthyContainerInfo(containerID, count);
+ count += 1;
}
// getTransactions will get existing container replicas then add
transaction
// to DN.
@@ -478,11 +502,16 @@ public class TestDeletedBlockLog {
// The rest txn shall be: 41-50. 41-50. 41-50
List<DeletedBlocksTransaction> blocks = getAllTransactions();
- Assertions.assertEquals(30, blocks.size());
+ // First 30 txns aren't considered for deletion as they don't have required
+ // container replica's so getAllTransactions() won't be able to fetch them
+ // and rest 20 txns are already committed and removed so in total
+ // getAllTransactions() will fetch 0 txns.
+ Assertions.assertEquals(0, blocks.size());
}
@Test
public void testRandomOperateTransactions() throws Exception {
+ mockContainerHealthResult(true);
Random random = new Random();
int added = 0, committed = 0;
List<DeletedBlocksTransaction> blocks = new ArrayList<>();
@@ -522,6 +551,7 @@ public class TestDeletedBlockLog {
@Test
public void testPersistence() throws Exception {
addTransactions(generateData(50), true);
+ mockContainerHealthResult(true);
// close db and reopen it again to make sure
// transactions are stored persistently.
deletedBlockLog.close();
@@ -560,6 +590,7 @@ public class TestDeletedBlockLog {
@Test
public void testDeletedBlockTransactions()
throws IOException, TimeoutException {
+ mockContainerHealthResult(true);
int txNum = 10;
List<DeletedBlocksTransaction> blocks;
DatanodeDetails dnId1 = dnList.get(0), dnId2 = dnList.get(1);
@@ -647,8 +678,8 @@ public class TestDeletedBlockLog {
.thenReturn(replicaSet);
}
- private void mockInadequateReplicaUnhealthyContainerInfo(long containerID)
- throws IOException {
+ private void mockInadequateReplicaUnhealthyContainerInfo(long containerID,
+ int count) throws IOException {
List<DatanodeDetails> dns = dnList.subList(0, 2);
Pipeline pipeline = Pipeline.newBuilder()
.setReplicationConfig(
@@ -674,6 +705,14 @@ public class TestDeletedBlockLog {
.setDatanodeDetails(datanodeDetails)
.build())
.collect(Collectors.toSet());
+ ContainerHealthResult healthResult;
+ if (count < 30) {
+ healthResult = new ContainerHealthResult.UnHealthyResult(containerInfo);
+ } else {
+ healthResult = new ContainerHealthResult.HealthyResult(containerInfo);
+ }
+ Mockito.doReturn(healthResult).when(replicationManager)
+ .getContainerReplicationHealth(containerInfo, replicaSet);
when(containerManager.getContainerReplicas(
ContainerID.valueOf(containerID)))
.thenReturn(replicaSet);
diff --git a/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
b/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
index 9436840c9c..aa8b0991db 100644
--- a/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
@@ -134,7 +134,7 @@
<Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
</Match>
<Match>
- <Class
name="org.apache.hadoop.ozone.client.rpc.TestDeleteWithSlowFollower"/>
+ <Class
name="org.apache.hadoop.ozone.client.rpc.TestDeleteWithInAdequateDN"/>
<Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
</Match>
<Match>
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithInAdequateDN.java
similarity index 87%
rename from
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
rename to
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithInAdequateDN.java
index ac4e11dd83..131ce70553 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithInAdequateDN.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.HddsDatanodeService;
@@ -55,7 +54,6 @@ import
org.apache.hadoop.ozone.container.common.interfaces.Container;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -77,10 +75,9 @@ import org.junit.BeforeClass;
import org.junit.Test;
/**
- * Tests delete key operation with a slow follower in the datanode
- * pipeline.
+ * Tests delete key operation with inadequate datanodes.
*/
-public class TestDeleteWithSlowFollower {
+public class TestDeleteWithInAdequateDN {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
@@ -124,7 +121,6 @@ public class TestDeleteWithSlowFollower {
DatanodeRatisServerConfig ratisServerConfig =
conf.getObject(DatanodeRatisServerConfig.class);
- ratisServerConfig.setFollowerSlownessTimeout(Duration.ofSeconds(1000));
ratisServerConfig.setNoLeaderTimeout(Duration.ofSeconds(1000));
ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3));
@@ -189,19 +185,17 @@ public class TestDeleteWithSlowFollower {
}
/**
- * The test simulates a slow follower by first writing key thereby creating a
- * a container on 3 dns of the cluster. Then, a dn is shutdown and a close
- * container cmd gets issued so that in the leader and the alive follower,
- * container gets closed. And then, key is deleted and
- * the node is started up again so that it
- * rejoins the ring and starts applying the transaction from where it left
- * by fetching the entries from the leader. Until and unless this follower
- * catches up and its replica gets closed,
- * the data is not deleted from any of the nodes which have the
- * closed replica.
+ * The test simulates an inadequate DN scenario by first writing key thereby
+ * creating a container on 3 dns of the cluster. Then, a dn is shutdown and a
+ * close container cmd gets issued so that in the leader and the alive
+ * follower, container gets closed. And then, key is deleted and the node is
+ * started up again so that it rejoins the ring and starts applying the
+ * transaction from where it left by fetching the entries from the leader.
+ * Until and unless this follower catches up and its replica gets closed, the
+ * data is not deleted from any of the nodes which have the closed replica.
*/
@Test
- public void testDeleteKeyWithSlowFollower() throws Exception {
+ public void testDeleteKeyWithInAdequateDN() throws Exception {
String keyName = "ratis";
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
@@ -220,7 +214,7 @@ public class TestDeleteWithSlowFollower {
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
long containerID = omKeyLocationInfo.getContainerID();
// A container is created on the datanode. Now figure out a follower node
to
- // kill/slow down.
+ // kill.
HddsDatanodeService follower = null;
HddsDatanodeService leader = null;
@@ -279,31 +273,11 @@ public class TestDeleteWithSlowFollower {
.getHandler(ContainerProtos.ContainerType.KeyValueContainer);
Container container =
ozoneContainer.getContainerSet().getContainer(blockID.getContainerID());
- KeyValueContainerData containerData =
- ((KeyValueContainerData) container.getContainerData());
- long delTrxId = containerData.getDeleteTransactionId();
- long numPendingDeletionBlocks =
containerData.getNumPendingDeletionBlocks();
BlockData blockData =
keyValueHandler.getBlockManager().getBlock(container, blockID);
//cluster.getOzoneManager().deleteKey(keyArgs);
client.getObjectStore().getVolume(volumeName).getBucket(bucketName).
deleteKey("ratis");
- GenericTestUtils.waitFor(() -> {
- try {
- if (SCMHAUtils.isSCMHAEnabled(cluster.getConf())) {
- cluster.getStorageContainerManager().getScmHAManager()
- .asSCMHADBTransactionBuffer().flush();
- }
- return
- dnStateMachine.getCommandDispatcher()
- .getDeleteBlocksCommandHandler().getInvocationCount() >= 1;
- } catch (IOException e) {
- return false;
- }
- }, 500, 100000);
- Assert.assertTrue(containerData.getDeleteTransactionId() > delTrxId);
- Assert.assertTrue(
- containerData.getNumPendingDeletionBlocks() >
numPendingDeletionBlocks);
// make sure the chunk was never deleted on the leader even though
// deleteBlock handler is invoked
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]