This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d9a48cc95d HDDS-8155. SCM Block deleting service add transaction
blocks for stale/dead/decommissioning DNs (#4403)
d9a48cc95d is described below
commit d9a48cc95d2ac22f54e20385b0ba9384e021a24f
Author: Sumit Agrawal <[email protected]>
AuthorDate: Thu Mar 23 13:21:03 2023 +0530
HDDS-8155. SCM Block deleting service add transaction blocks for
stale/dead/decommissioning DNs (#4403)
---
.../apache/hadoop/hdds/scm/block/DeletedBlockLog.java | 7 ++++++-
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 14 ++++++++++----
.../hadoop/hdds/scm/block/SCMBlockDeletingService.java | 16 ++++++++--------
.../apache/hadoop/hdds/scm/node/DeadNodeHandler.java | 8 ++++++++
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 17 +++++++++++++++--
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 7 ++++++-
.../hadoop/ozone/container/TestECContainerRecovery.java | 7 +++++--
7 files changed, 58 insertions(+), 18 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index ea2013917d..20a68e5010 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdds.scm.block;
+import java.util.Set;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
.DeleteBlockTransactionResult;
@@ -43,11 +45,14 @@ public interface DeletedBlockLog extends Closeable {
* Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions.
* Once DatanodeDeletedBlockTransactions is full, the scan behavior will
* stop.
+ *
* @param blockDeletionLimit Maximum number of blocks to fetch
+ * @param dnList healthy dn list
* @return Mapping from containerId to latest transactionId for the
container.
* @throws IOException
*/
- DatanodeDeletedBlockTransactions getTransactions(int blockDeletionLimit)
+ DatanodeDeletedBlockTransactions getTransactions(
+ int blockDeletionLimit, Set<DatanodeDetails> dnList)
throws IOException, TimeoutException;
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 589a9f3f76..52b8be9d64 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -401,8 +401,10 @@ public class DeletedBlockLogImpl
public void close() throws IOException {
}
- private void getTransaction(DeletedBlocksTransaction tx,
- DatanodeDeletedBlockTransactions transactions) {
+ private void getTransaction(
+ DeletedBlocksTransaction tx,
+ DatanodeDeletedBlockTransactions transactions,
+ Set<DatanodeDetails> dnList) {
try {
DeletedBlocksTransaction updatedTxn = DeletedBlocksTransaction
.newBuilder(tx)
@@ -413,6 +415,9 @@ public class DeletedBlockLogImpl
ContainerID.valueOf(updatedTxn.getContainerID()));
for (ContainerReplica replica : replicas) {
UUID dnID = replica.getDatanodeDetails().getUuid();
+ if (!dnList.contains(replica.getDatanodeDetails())) {
+ continue;
+ }
Set<UUID> dnsWithTransactionCommitted =
transactionToDNsCommitMap.get(updatedTxn.getTxID());
if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted
@@ -429,7 +434,8 @@ public class DeletedBlockLogImpl
@Override
public DatanodeDeletedBlockTransactions getTransactions(
- int blockDeletionLimit) throws IOException, TimeoutException {
+ int blockDeletionLimit, Set<DatanodeDetails> dnList)
+ throws IOException, TimeoutException {
lock.lock();
try {
DatanodeDeletedBlockTransactions transactions =
@@ -455,7 +461,7 @@ public class DeletedBlockLogImpl
txIDs.add(txn.getTxID());
} else if (txn.getCount() > -1 && txn.getCount() <= maxRetry
&& !containerManager.getContainer(id).isOpen()) {
- getTransaction(txn, transactions);
+ getTransaction(txn, transactions, dnList);
transactionToDNsCommitMap
.putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 1b587c7b4b..abb77abefd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -134,14 +135,18 @@ public class SCMBlockDeletingService extends
BackgroundService
if (LOG.isDebugEnabled()) {
LOG.debug("Running DeletedBlockTransactionScanner");
}
- // TODO - DECOMM - should we be deleting blocks from decom nodes
- // and what about entering maintenance.
List<DatanodeDetails> datanodes =
nodeManager.getNodes(NodeStatus.inServiceHealthy());
if (datanodes != null) {
+ // When DN node is healthy and in-service, and previous commands
+ // are handled for deleteBlocks Type, then it will be considered
+ // in this iteration
+ final Set<DatanodeDetails> included = datanodes.stream().filter(
+ dn -> nodeManager.getCommandQueueCount(dn.getUuid(),
+ Type.deleteBlocksCommand) == 0).collect(Collectors.toSet());
try {
DatanodeDeletedBlockTransactions transactions =
- deletedBlockLog.getTransactions(blockDeleteLimitSize);
+ deletedBlockLog.getTransactions(blockDeleteLimitSize, included);
if (transactions.isEmpty()) {
return EmptyTaskResult.newResult();
@@ -156,10 +161,6 @@ public class SCMBlockDeletingService extends
BackgroundService
processedTxIDs.addAll(dnTXs.stream()
.map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toSet()));
- // TODO commandQueue needs a cap.
- // We should stop caching new commands if num of un-processed
- // command is bigger than a limit, e.g 50. In case datanode goes
- // offline for sometime, the cached commands be flooded.
SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
command.setTerm(scmContext.getTermOfLeader());
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
@@ -176,7 +177,6 @@ public class SCMBlockDeletingService extends
BackgroundService
}
}
}
- // TODO: Fix ME!!!
LOG.info("Totally added {} blocks to be deleted for"
+ " {} datanodes, task elapsed time: {}ms",
transactions.getBlocksDeleted(),
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 55ae6d1b0f..dd332913b2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.node;
import java.io.IOException;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
@@ -35,6 +36,7 @@ import
org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +89,12 @@ public class DeadNodeHandler implements
EventHandler<DatanodeDetails> {
if (!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
removeContainerReplicas(datanodeDetails);
}
+
+ // remove commands in command queue for the DN
+ final List<SCMCommand> cmdList = nodeManager.getCommandQueue(
+ datanodeDetails.getUuid());
+ LOG.info("Clearing command queue of size {} for DN {}",
+ cmdList.size(), datanodeDetails);
//move dead datanode out of ClusterNetworkTopology
NetworkTopology nt = nodeManager.getClusterNetworkTopologyMap();
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 033c1a0745..e519d7c56e 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -296,7 +296,8 @@ public class TestDeletedBlockLog {
private List<DeletedBlocksTransaction> getTransactions(
int maximumAllowedBlocksNum) throws IOException, TimeoutException {
DatanodeDeletedBlockTransactions transactions =
- deletedBlockLog.getTransactions(maximumAllowedBlocksNum);
+ deletedBlockLog.getTransactions(maximumAllowedBlocksNum,
+ dnList.stream().collect(Collectors.toSet()));
List<DeletedBlocksTransaction> txns = new LinkedList<>();
for (DatanodeDetails dn : dnList) {
txns.addAll(Optional.ofNullable(
@@ -440,6 +441,17 @@ public class TestDeletedBlockLog {
Assertions.assertEquals(0, blocks.size());
}
+ @Test
+ public void testDNOnlyOneNodeHealthy() throws Exception {
+ Map<Long, List<Long>> deletedBlocks = generateData(50);
+ addTransactions(deletedBlocks, true);
+ DatanodeDeletedBlockTransactions transactions
+ = deletedBlockLog.getTransactions(
+ 30 * BLOCKS_PER_TXN * THREE,
+ dnList.subList(0, 1).stream().collect(Collectors.toSet()));
+ Assertions.assertEquals(1,
transactions.getDatanodeTransactionMap().size());
+ }
+
@Test
public void testInadequateReplicaCommit() throws Exception {
Map<Long, List<Long>> deletedBlocks = generateData(50);
@@ -461,7 +473,8 @@ public class TestDeletedBlockLog {
// For the rest txn, txn will be got from all dns.
// Committed txn will be: 1-40. 1-40. 31-40
commitTransactions(deletedBlockLog.getTransactions(
- 30 * BLOCKS_PER_TXN * THREE));
+ 30 * BLOCKS_PER_TXN * THREE,
+ dnList.stream().collect(Collectors.toSet())));
// The rest txn shall be: 41-50. 41-50. 41-50
List<DeletedBlocksTransaction> blocks = getAllTransactions();
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 0b29853339..168fdd11a5 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -25,6 +25,7 @@ import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLU
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.security.authentication.client
.AuthenticationException;
import org.apache.ozone.test.GenericTestUtils;
@@ -244,9 +246,10 @@ public class TestDeadNodeHandler {
ContainerID.valueOf(container3.getContainerID()));
Assertions.assertEquals(1, container3Replicas.size());
-
// Now set the node to anything other than IN_MAINTENANCE and the relevant
// replicas should be removed
+ DeleteBlocksCommand cmd = new DeleteBlocksCommand(Collections.emptyList());
+ nodeManager.addDatanodeCommand(datanode1.getUuid(), cmd);
nodeManager.setNodeOperationalState(datanode1,
HddsProtos.NodeOperationalState.IN_SERVICE);
deadNodeHandler.onMessage(datanode1, publisher);
@@ -254,6 +257,8 @@ public class TestDeadNodeHandler {
//deadNodeHandler.onMessage call will not change this
Assertions.assertFalse(
nodeManager.getClusterNetworkTopologyMap().contains(datanode1));
+ Assertions.assertEquals(0,
+ nodeManager.getCommandQueueCount(datanode1.getUuid(), cmd.getType()));
container1Replicas = containerManager
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
index 5cccdf716a..73d9252e57 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
@@ -248,12 +248,15 @@ public class TestECContainerRecovery {
cluster.restartHddsDatanode(pipeline.getFirstNode(), true);
// Check container is over replicated.
waitForContainerCount(6, container.containerID(), scm);
+
+ // Resume RM and wait the over replicated replica deleted.
+ // ReplicationManager fix container replica state if different
+ scm.getReplicationManager().start();
+
// Wait for all the replicas to be closed.
container = scm.getContainerInfo(container.getContainerID());
waitForDNContainerState(container, scm);
- // Resume RM and wait the over replicated replica deleted.
- scm.getReplicationManager().start();
waitForContainerCount(5, container.containerID(), scm);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]