This is an automated email from the ASF dual-hosted git repository. danhaywood pushed a commit to branch CAUSEWAY-3738 in repository https://gitbox.apache.org/repos/asf/causeway.git
commit 1bab78e31aa9406f67f0293764b20f62c60636c6 Author: danhaywood <[email protected]> AuthorDate: Wed May 8 22:53:46 2024 +0100 CAUSEWAY-3738: factors out some helper methods is all --- .../applib/job/RunBackgroundCommandsJob.java | 80 +++++++++++++++------- 1 file changed, 57 insertions(+), 23 deletions(-) diff --git a/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java b/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java index 16c9371d95..4dec5bfeb9 100644 --- a/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java +++ b/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java @@ -25,12 +25,15 @@ import java.util.stream.Collectors; import javax.inject.Inject; +import org.apache.causeway.commons.functional.Try; import org.apache.causeway.extensions.commandlog.applib.dom.CommandLogEntryRepository; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.PersistJobDataAfterExecution; + +import org.springframework.dao.DeadlockLoserDataAccessException; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Propagation; @@ -67,6 +70,9 @@ import lombok.extern.log4j.Log4j2; @Log4j2 public class RunBackgroundCommandsJob implements Job { + final static int RETRY_COUNT = 3; + final static long RETRY_INTERVAL_MILLIS = 1000; + @Inject InteractionService interactionService; @Inject TransactionService transactionService; @Inject ClockService clockService; @@ -83,8 +89,8 @@ public class RunBackgroundCommandsJob implements Job { return; } - UserMemento user = UserMemento.ofNameAndRoleNames("scheduler_user", "admin_role"); - InteractionContext interactionContext = InteractionContext.builder().user(user).build(); + val userMemento = UserMemento.ofNameAndRoleNames("scheduler_user", "admin_role"); + val interactionContext = InteractionContext.builder().user(userMemento).build(); // we obtain the list of Commands first; we use their CommandDto as it is serializable across transactions final Optional<List<CommandDto>> commandDtosIfAny = pendingCommandDtos(interactionContext); @@ -92,35 +98,46 @@ public class RunBackgroundCommandsJob implements Job { // for each command, we execute within its own transaction. Failure of one should not impact the next. commandDtosIfAny.ifPresent(commandDtos -> { for (val commandDto : commandDtos) { - executeWithinOwnTransaction(commandDto, interactionContext); + executeCommandWithinOwnTransaction(commandDto, interactionContext); } }); } - private Optional<List<CommandDto>> pendingCommandDtos(InteractionContext interactionContext) { - final Optional<List<CommandDto>> commandDtosIfAny = - interactionService.callAndCatch(interactionContext, () -> - transactionService.callTransactional(Propagation.REQUIRES_NEW, () -> - commandLogEntryRepository.findBackgroundAndNotYetStarted() - .stream() - .map(CommandLogEntry::getCommandDto) - .collect(Collectors.toList()) - ) - .ifFailureFail() - .valueAsNonNullElseFail() - ) - .ifFailureFail() // we give up if unable to find these - .getValue(); - return commandDtosIfAny; + private Optional<List<CommandDto>> pendingCommandDtos(final InteractionContext interactionContext) { + return interactionService.callAndCatch(interactionContext, () -> + transactionService.callTransactional(Propagation.REQUIRES_NEW, () -> + commandLogEntryRepository.findBackgroundAndNotYetStarted() + .stream() + .map(CommandLogEntry::getCommandDto) + .collect(Collectors.toList()) + ) + .ifFailureFail() + .valueAsNonNullElseFail() + ) + .ifFailureFail() // we give up if unable to find these + .getValue(); } - private void executeWithinOwnTransaction(CommandDto commandDto, InteractionContext interactionContext) { - interactionService.runAndCatch(interactionContext, () -> { - executeCommandWithinOwnTransactionElseFail(commandDto); - }) - .ifFailure(throwable -> { + private void executeCommandWithinOwnTransaction( + final CommandDto commandDto, + final InteractionContext interactionContext + ) { + int retryCount = RETRY_COUNT; + while(retryCount > 0) { + Try<?> result = interactionService.runAndCatch(interactionContext, () -> { + executeCommandWithinOwnTransactionElseFail(commandDto); + }); + if (isEncounteredDeadlock(result)) { + retryCount--; + log.debug("Deadlock occurred, retrying command: " + CommandDtoUtils.dtoMapper().toString(commandDto)); + sleep(RETRY_INTERVAL_MILLIS); + } else { + retryCount=0; // ie break + result.ifFailure(throwable -> { logAndCaptureFailure(throwable, commandDto, interactionContext); }); + } + } } private void executeCommandWithinOwnTransactionElseFail(CommandDto commandDto) { @@ -157,4 +174,21 @@ public class RunBackgroundCommandsJob implements Job { }); } + private static boolean isEncounteredDeadlock(Try<?> result) { + if (!result.isFailure()) { + return false; + } + return result.getFailure() + .map(throwable -> throwable instanceof DeadlockLoserDataAccessException) + .orElse(false); + } + + private static void sleep(long retryIntervalMs) { + try { + Thread.sleep(retryIntervalMs); + } catch (InterruptedException e) { + // do nothing - continue + } + } + }
