This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8bb0587d37 HDDS-11712. Process other DeletedBlocksTransaction before
retrying failed one. (#7532)
8bb0587d37 is described below
commit 8bb0587d37435b32177f20b842063e80a378f296
Author: Ashish Kumar <[email protected]>
AuthorDate: Tue Dec 17 12:31:46 2024 +0530
HDDS-11712. Process other DeletedBlocksTransaction before retrying failed
one. (#7532)
---
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 49 ++++++++++++++-
.../scm/block/DeletedBlockLogStateManagerImpl.java | 3 +-
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 69 ++++++++++++++++++++++
3 files changed, 119 insertions(+), 2 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 45d6a02493..226489482f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -93,6 +93,7 @@ public class DeletedBlockLogImpl
private long scmCommandTimeoutMs = Duration.ofSeconds(300).toMillis();
private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
+ private long lastProcessedTransactionId = -1;
public DeletedBlockLogImpl(ConfigurationSource conf,
StorageContainerManager scm,
@@ -344,6 +345,34 @@ public class DeletedBlockLogImpl
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {
+ if (lastProcessedTransactionId != -1) {
+ iter.seek(lastProcessedTransactionId);
+ /*
+ * We should start from (lastProcessedTransactionId + 1) transaction.
+ * Now the iterator (iter.next call) is pointing at
+ * lastProcessedTransactionId, read the current value to move
+ * the cursor.
+ */
+ if (iter.hasNext()) {
+ /*
+ * There is a possibility that the lastProcessedTransactionId got
+ * deleted from the table, in that case we have to set
+ * lastProcessedTransactionId to next available transaction in the
table.
+ *
+ * By doing this there is a chance that we will skip processing
the new
+ * lastProcessedTransactionId, that should be ok. We can get to it
in the
+ * next run.
+ */
+ lastProcessedTransactionId = iter.next().getKey();
+ }
+
+ // If we have reached the end, go to beginning.
+ if (!iter.hasNext()) {
+ iter.seekToFirst();
+ lastProcessedTransactionId = -1;
+ }
+ }
+
// Get the CmdStatus status of the aggregation, so that the current
// status of the specified transaction can be found faster
Map<UUID, Map<Long, CmdStatus>> commandStatus =
@@ -352,13 +381,14 @@ public class DeletedBlockLogImpl
map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
+ Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
// Here takes block replica count as the threshold to avoid the case
// that part of replicas committed the TXN and recorded in the
// SCMDeletedBlockTransactionStatusManager, while they are counted
// in the threshold.
while (iter.hasNext() &&
transactions.getBlocksDeleted() < blockDeletionLimit) {
- Table.KeyValue<Long, DeletedBlocksTransaction> keyValue =
iter.next();
+ keyValue = iter.next();
DeletedBlocksTransaction txn = keyValue.getValue();
final ContainerID id = ContainerID.valueOf(txn.getContainerID());
try {
@@ -386,7 +416,24 @@ public class DeletedBlockLogImpl
LOG.warn("Container: {} was not found for the transaction: {}.",
id, txn);
txIDs.add(txn.getTxID());
}
+
+ if (lastProcessedTransactionId == keyValue.getKey()) {
+ // We have circled back to the last transaction.
+ break;
+ }
+
+ if (!iter.hasNext() && lastProcessedTransactionId != -1) {
+ /*
+ * We started from in-between and reached end of the table,
+ * now we should go to the start of the table and process
+ * the transactions.
+ */
+ iter.seekToFirst();
+ }
}
+
+ lastProcessedTransactionId = keyValue != null ? keyValue.getKey() : -1;
+
if (!txIDs.isEmpty()) {
deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
metrics.incrBlockDeletionTransactionCompleted(txIDs.size());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
index 6e6440c324..43809acf4b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java
@@ -133,7 +133,8 @@ public class DeletedBlockLogStateManagerImpl
@Override
public void seekToFirst() {
- throw new UnsupportedOperationException("seekToFirst");
+ iter.seekToFirst();
+ findNext();
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index c8e2f267af..2a012cbe18 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -441,6 +441,75 @@ public class TestDeletedBlockLog {
assertEquals(30 * THREE, blocks.size());
}
+
+ @Test
+ public void testSCMDelIteratorProgress() throws Exception {
+ int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
+
+ // CASE1: When all transactions are valid and available
+ // Create 8 TXs in the log.
+ int noOfTransactions = 8;
+ addTransactions(generateData(noOfTransactions), true);
+ mockContainerHealthResult(true);
+ List<DeletedBlocksTransaction> blocks;
+
+ List<Long> txIDs = new ArrayList<>();
+ int i = 1;
+ while (i < noOfTransactions) {
+ // In each iteration read two transaction, API returns all the
transactions in order.
+ // 1st iteration: {1, 2}
+ // 2nd iteration: {3, 4}
+ // 3rd iteration: {5, 6}
+ // 4th iteration: {7, 8}
+ blocks = getTransactions(2 * BLOCKS_PER_TXN * THREE);
+ assertEquals(blocks.get(0).getTxID(), i++);
+ assertEquals(blocks.get(1).getTxID(), i++);
+ }
+
+ // CASE2: When some transactions are not available for delete in the
current iteration,
+ // either due to max retry reach or some other issue.
+ // New transactions Id is { 9, 10, 11, 12, 13, 14, 15, 16}
+ addTransactions(generateData(noOfTransactions), true);
+ mockContainerHealthResult(true);
+
+ // Mark transaction Id 11 as reached max retry count so that it will be
ignored
+ // by scm deleting service while fetching transaction for delete
+ int ignoreTransactionId = 11;
+ txIDs.add((long) ignoreTransactionId);
+ for (i = 0; i < maxRetry; i++) {
+ incrementCount(txIDs);
+ }
+ incrementCount(txIDs);
+
+ i = 9;
+ while (true) {
+ // In each iteration read two transaction.
+ // If any transaction which is not available for delete in the current
iteration,
+ // it will be ignored and will be re-checked again only after complete
table is read.
+ // 1st iteration: {9, 10}
+ // 2nd iteration: {12, 13} Transaction 11 is ignored here
+ // 3rd iteration: {14, 15} Transaction 11 is available here,
+ // but it will be read only when all db records are read till the end.
+ // 4th iteration: {16, 11} Since iterator reached at the end of table
after reading transaction 16,
+ // Iterator starts from beginning again, and it returns transaction 11
as well
+ blocks = getTransactions(2 * BLOCKS_PER_TXN * THREE);
+ if (i == ignoreTransactionId) {
+ i++;
+ }
+ assertEquals(blocks.get(0).getTxID(), i++);
+ if (i == 17) {
+ assertEquals(blocks.get(1).getTxID(), ignoreTransactionId);
+ break;
+ }
+ assertEquals(blocks.get(1).getTxID(), i++);
+
+ if (i == 14) {
+ // Reset transaction 11 so that it will be available in scm key
deleting service in the subsequent iterations.
+ resetCount(txIDs);
+ }
+ }
+ }
+
@Test
public void testCommitTransactions() throws Exception {
deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]