aryangupta1998 commented on code in PR #7665:
URL: https://github.com/apache/ozone/pull/7665#discussion_r1911496529
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -121,6 +120,33 @@ public DeletedBlockLogImpl(ConfigurationSource conf,
containerManager, this.scmContext, metrics, scmCommandTimeoutMs);
}
+ @Override
+ public List<DeletedBlocksTransaction> getFailedTransactionsBatch(
+ int batchSize, long startTxId) throws IOException {
+ List<DeletedBlocksTransaction> failedTXs = new ArrayList<>();
+
+ lock.lock();
+ try {
+ try (
+ TableIterator<Long, ? extends Table.KeyValue<Long,
DeletedBlocksTransaction>> iter =
+ deletedBlockLogStateManager.getReadOnlyIterator()) {
+
+ iter.seek(startTxId);
+
+ while (iter.hasNext() && failedTXs.size() < batchSize) {
+ DeletedBlocksTransaction delTX = iter.next().getValue();
+ if (delTX.getCount() == -1) {
+ failedTXs.add(delTX);
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ return failedTXs;
+ }
+
Review Comment:
Done
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java:
##########
@@ -176,19 +202,72 @@ public void incrementCount(List<Long> txIDs)
*/
@Override
public int resetCount(List<Long> txIDs) throws IOException {
- lock.lock();
+ final int batchSize = 100;
+ int totalProcessed = 0;
+ long startTxId = 0;
+
try {
+ // If txIDs are not provided, fetch all failed transactions in batches
if (txIDs == null || txIDs.isEmpty()) {
- txIDs = getFailedTransactions(LIST_ALL_FAILED_TRANSACTIONS, 0).stream()
- .map(DeletedBlocksTransaction::getTxID)
- .collect(Collectors.toList());
+ List<DeletedBlocksTransaction> batch;
+ do {
+ // Fetch the batch of failed transactions
+ batch = getFailedTransactionsBatch(batchSize, startTxId);
+
+ // If the batch is empty, skip further processing
+ if (!batch.isEmpty()) {
+ List<Long> batchTxIDs = batch.stream()
+ .map(DeletedBlocksTransaction::getTxID)
+ .collect(Collectors.toList());
+
+ lock.lock();
+ try {
+ transactionStatusManager.resetRetryCount(batchTxIDs);
+ int batchProcessed =
deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
+ new ArrayList<>(batchTxIDs));
+
+ totalProcessed += batchProcessed;
+ } finally {
+ lock.unlock();
+ }
+
+ // Update startTxId to continue from the last processed
transaction in the next iteration
+ startTxId = batch.get(batch.size() - 1).getTxID() + 1;
+ }
+
+ } while (!batch.isEmpty());
+ } else {
+ // Process txIDs provided by the user in batches
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]