This is an automated email from the ASF dual-hosted git repository. ahuber pushed a commit to branch v3 in repository https://gitbox.apache.org/repos/asf/causeway.git
commit c649d5752d5bd8de7ba6b95721c4f9929c8b5f0e Merge: 2853647ae1 f4096a41f4 Author: Andi Huber <[email protected]> AuthorDate: Mon May 13 09:26:48 2024 +0200 merge 'master' .../applib/job/RunBackgroundCommandsJob.java | 142 ++++++++++++++------- .../CausewayExtSecmanAdminRoleAndPermissions.java | 1 + ...ewayExtSecmanRegularUserRoleAndPermissions.java | 17 --- 3 files changed, 97 insertions(+), 63 deletions(-) diff --cc extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java index e1ef0d175f,4dec5bfeb9..ab1ea19417 --- a/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java +++ b/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java @@@ -40,8 -44,7 +41,9 @@@ import org.apache.causeway.applib.servi import org.apache.causeway.applib.services.user.UserMemento; import org.apache.causeway.applib.services.xactn.TransactionService; import org.apache.causeway.applib.util.schema.CommandDtoUtils; ++import org.apache.causeway.commons.functional.Try; import org.apache.causeway.extensions.commandlog.applib.dom.CommandLogEntry; +import org.apache.causeway.extensions.commandlog.applib.dom.CommandLogEntryRepository; import org.apache.causeway.schema.cmd.v2.CommandDto; import lombok.val; @@@ -104,40 -98,97 +97,97 @@@ public class RunBackgroundCommandsJob i // for each command, we execute within its own transaction. Failure of one should not impact the next. commandDtosIfAny.ifPresent(commandDtos -> { for (val commandDto : commandDtos) { - interactionService.runAndCatch(interactionContext, () -> { - transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> { - // look up the CommandLogEntry again because we are within a new transaction. - val commandLogEntryIfAny = commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId())); - - // finally, we execute - commandLogEntryIfAny.ifPresent(commandLogEntry -> - { - commandExecutorService.executeCommand( - CommandExecutorService.InteractionContextPolicy.NO_SWITCH, commandDto); - commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp()); - }); - }) - .ifFailureFail(); - }) - .ifFailure(throwable -> { - log.error("Failed to execute command: " + CommandDtoUtils.dtoMapper().toString(commandDto), throwable); - // update this command as having failed. - interactionService.runAndCatch(interactionContext, () -> { - transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> { - // look up the CommandLogEntry again because we are within a new transaction. - val commandLogEntryIfAny = commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId())); - - // capture the error - commandLogEntryIfAny.ifPresent(commandLogEntry -> - { - commandLogEntry.setException(throwable); - commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp()); - }); - }); - }); + executeCommandWithinOwnTransaction(commandDto, interactionContext); + } + }); + } + + private Optional<List<CommandDto>> pendingCommandDtos(final InteractionContext interactionContext) { + return interactionService.callAndCatch(interactionContext, () -> + transactionService.callTransactional(Propagation.REQUIRES_NEW, () -> + commandLogEntryRepository.findBackgroundAndNotYetStarted() + .stream() + .map(CommandLogEntry::getCommandDto) + .collect(Collectors.toList()) + ) + .ifFailureFail() + .valueAsNonNullElseFail() + ) + .ifFailureFail() // we give up if unable to find these + .getValue(); + } + + private void executeCommandWithinOwnTransaction( + final CommandDto commandDto, + final InteractionContext interactionContext + ) { + int retryCount = RETRY_COUNT; + while(retryCount > 0) { + Try<?> result = interactionService.runAndCatch(interactionContext, () -> { + executeCommandWithinOwnTransactionElseFail(commandDto); + }); + if (isEncounteredDeadlock(result)) { + retryCount--; + log.debug("Deadlock occurred, retrying command: " + CommandDtoUtils.dtoMapper().toString(commandDto)); + sleep(RETRY_INTERVAL_MILLIS); + } else { + retryCount=0; // ie break + result.ifFailure(throwable -> { + logAndCaptureFailure(throwable, commandDto, interactionContext); }); } + } + } + - private void executeCommandWithinOwnTransactionElseFail(CommandDto commandDto) { ++ private void executeCommandWithinOwnTransactionElseFail(final CommandDto commandDto) { + transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> { + // look up the CommandLogEntry again because we are within a new transaction. + val commandLogEntryIfAny = commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId())); + + // finally, we execute + commandLogEntryIfAny.ifPresent(commandLogEntry -> + { + commandExecutorService.executeCommand( + CommandExecutorService.InteractionContextPolicy.NO_SWITCH, commandDto); + commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp()); + }); + }) + .ifFailureFail(); + } + - private void logAndCaptureFailure(Throwable throwable, CommandDto commandDto, InteractionContext interactionContext) { ++ private void logAndCaptureFailure(final Throwable throwable, final CommandDto commandDto, final InteractionContext interactionContext) { + log.error("Failed to execute command: " + CommandDtoUtils.dtoMapper().toString(commandDto), throwable); + // update this command as having failed. + interactionService.runAndCatch(interactionContext, () -> { + transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> { + // look up the CommandLogEntry again because we are within a new transaction. + val commandLogEntryIfAny = commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId())); + + // capture the error + commandLogEntryIfAny.ifPresent(commandLogEntry -> + { + commandLogEntry.setException(throwable); + commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp()); + }); + }); }); } - private static boolean isEncounteredDeadlock(Try<?> result) { ++ private static boolean isEncounteredDeadlock(final Try<?> result) { + if (!result.isFailure()) { + return false; + } + return result.getFailure() + .map(throwable -> throwable instanceof DeadlockLoserDataAccessException) + .orElse(false); + } + - private static void sleep(long retryIntervalMs) { ++ private static void sleep(final long retryIntervalMs) { + try { + Thread.sleep(retryIntervalMs); + } catch (InterruptedException e) { + // do nothing - continue + } + } + }
