xichen01 commented on code in PR #4913:
URL: https://github.com/apache/ozone/pull/4913#discussion_r1333345614
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java:
##########
@@ -188,6 +188,24 @@ public class DatanodeConfiguration {
private long recoveringContainerScrubInterval =
Duration.ofMinutes(10).toMillis();
+ /**
+ * Timeout for the thread used to process the delete block command
+ * to wait for the container lock.
+ * It takes about 200ms to open a RocksDB with HDD media.
+ * Set the default value to 100ms, so that after one times retry
+ * after waiting timeout, the hold time spent waiting for the lock
+ * is not greater than the time spent operating RocksDB
+ */
+ @Config(key = "block.delete.command.handle.lock.timeout",
Review Comment:
This comment may have caused some misunderstanding, I'll modify it.
The retry does not happen immediately, it waits until all
`DeleteBlockTransactions` have been processed, so the retry may happen after a
few seconds or tens of seconds.
Slow RocksDB open (Schema V2 only), rebalancer, `BlockDeletingService`, etc,
may cause here cannot get the lock, retry to increase the chance of success.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java:
##########
@@ -346,10 +370,75 @@ private void processCmd(DeleteCmdInfo cmd) {
}
}
+ @VisibleForTesting
+ public List<DeleteBlockTransactionResult> executeCmdWithRetry(
+ List<DeletedBlocksTransaction> transactions) {
+ List<DeleteBlockTransactionResult> results =
+ new ArrayList<>(transactions.size());
+ Map<Long, DeletedBlocksTransaction> idToTransaction =
+ new HashMap<>(transactions.size());
+ transactions.forEach(tx -> idToTransaction.put(tx.getTxID(), tx));
+ List<DeletedBlocksTransaction> retryTransaction = new ArrayList<>();
+
+ List<Future<DeleteBlockTransactionExecutionResult>> futures =
+ submitTasks(transactions);
+ // Wait for tasks to finish
+ handleTasksResults(futures, result -> {
+ if (result.isLockAcquisitionFailed()) {
+
retryTransaction.add(idToTransaction.get(result.getResult().getTxID()));
+ } else {
+ results.add(result.getResult());
+ }
+ });
+
+ idToTransaction.clear();
+ // Wait for all tasks to complete before retrying, usually it takes
+ // some time for all tasks to complete, then the retry may be successful.
+ // We will only retry once
+ if (!retryTransaction.isEmpty()) {
+ futures = submitTasks(retryTransaction);
+ handleTasksResults(futures, result -> {
+ if (result.isLockAcquisitionFailed()) {
+ blockDeleteMetrics.incrTotalLockTimeoutTransactionCount();
+ }
+ results.add(result.getResult());
+ });
+ }
+ return results;
+ }
+
+ @VisibleForTesting
+ public List<Future<DeleteBlockTransactionExecutionResult>> submitTasks(
+ List<DeletedBlocksTransaction> deletedBlocksTransactions) {
+ List<Future<DeleteBlockTransactionExecutionResult>> futures =
+ new ArrayList<>(deletedBlocksTransactions.size());
+
+ for (DeletedBlocksTransaction tx : deletedBlocksTransactions) {
+ Future<DeleteBlockTransactionExecutionResult> future =
+ executor.submit(new ProcessTransactionTask(tx));
+ futures.add(future);
+ }
+ return futures;
+ }
+
+ public void handleTasksResults(
+ List<Future<DeleteBlockTransactionExecutionResult>> futures,
+ Consumer<DeleteBlockTransactionExecutionResult> handler) {
+ futures.forEach(f -> {
+ try {
+ DeleteBlockTransactionExecutionResult result = f.get();
+ handler.accept(result);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("task failed.", e);
+ Thread.currentThread().interrupt();
+ }
+ });
+ }
+
private void markBlocksForDeletionSchemaV3(
- KeyValueContainerData containerData, DeletedBlocksTransaction delTX,
- int newDeletionBlocks, long txnID)
+ KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
throws IOException {
+ int newDeletionBlocks = 0;
Review Comment:
Already modified.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java:
##########
@@ -232,40 +267,44 @@ public DeleteBlockTransactionResult call() {
ContainerType containerType = cont.getContainerType();
switch (containerType) {
case KeyValueContainer:
+ KeyValueContainer keyValueContainer = (KeyValueContainer)cont;
KeyValueContainerData containerData = (KeyValueContainerData)
cont.getContainerData();
- cont.writeLock();
- try {
- if (containerData.hasSchema(SCHEMA_V1)) {
- markBlocksForDeletionSchemaV1(containerData, tx);
- } else if (containerData.hasSchema(SCHEMA_V2)) {
- markBlocksForDeletionSchemaV2(containerData, tx,
- newDeletionBlocks, tx.getTxID());
- } else if (containerData.hasSchema(SCHEMA_V3)) {
- markBlocksForDeletionSchemaV3(containerData, tx,
- newDeletionBlocks, tx.getTxID());
- } else {
- throw new UnsupportedOperationException(
- "Only schema version 1,2,3 are supported.");
+ if (keyValueContainer.
+ writeLockTryLock(tryLockTimeoutMs, TimeUnit.MILLISECONDS)) {
+ try {
+ String schemaVersion = containerData
+ .getSupportedSchemaVersionOrDefault();
+ if (getSchemaHandlers().containsKey(schemaVersion)) {
+ schemaHandlers.get(schemaVersion).handle(containerData, tx);
+ } else {
+ throw new UnsupportedOperationException(
+ "Only schema version 1,2,3 are supported.");
+ }
+ } finally {
+ cont.writeUnlock();
}
- } finally {
- cont.writeUnlock();
+ txResultBuilder.setContainerID(containerId)
+ .setSuccess(true);
+ } else {
+ lockAcquisitionFailed = true;
+ txResultBuilder.setContainerID(containerId)
+ .setSuccess(false);
}
- txResultBuilder.setContainerID(containerId)
- .setSuccess(true);
break;
default:
LOG.error(
"Delete Blocks Command Handler is not implemented for " +
"containerType {}", containerType);
}
- } catch (IOException e) {
+ } catch (IOException | InterruptedException e) {
Review Comment:
Done.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java:
##########
@@ -74,6 +74,10 @@ public final class BlockDeletingServiceMetrics {
@Metric(about = "The total number of Container chosen to be deleted.")
private MutableGaugeLong totalContainerChosenCount;
+ @Metric(about = "The total number of transactions which was failed " +
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]