This is an automated email from the ASF dual-hosted git repository.

ahuber pushed a commit to branch v3
in repository https://gitbox.apache.org/repos/asf/causeway.git

commit c649d5752d5bd8de7ba6b95721c4f9929c8b5f0e
Merge: 2853647ae1 f4096a41f4
Author: Andi Huber <[email protected]>
AuthorDate: Mon May 13 09:26:48 2024 +0200

    merge 'master'

 .../applib/job/RunBackgroundCommandsJob.java       | 142 ++++++++++++++-------
 .../CausewayExtSecmanAdminRoleAndPermissions.java  |   1 +
 ...ewayExtSecmanRegularUserRoleAndPermissions.java |  17 ---
 3 files changed, 97 insertions(+), 63 deletions(-)

diff --cc 
extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java
index e1ef0d175f,4dec5bfeb9..ab1ea19417
--- 
a/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java
+++ 
b/extensions/core/commandlog/applib/src/main/java/org/apache/causeway/extensions/commandlog/applib/job/RunBackgroundCommandsJob.java
@@@ -40,8 -44,7 +41,9 @@@ import org.apache.causeway.applib.servi
  import org.apache.causeway.applib.services.user.UserMemento;
  import org.apache.causeway.applib.services.xactn.TransactionService;
  import org.apache.causeway.applib.util.schema.CommandDtoUtils;
++import org.apache.causeway.commons.functional.Try;
  import org.apache.causeway.extensions.commandlog.applib.dom.CommandLogEntry;
 +import 
org.apache.causeway.extensions.commandlog.applib.dom.CommandLogEntryRepository;
  import org.apache.causeway.schema.cmd.v2.CommandDto;
  
  import lombok.val;
@@@ -104,40 -98,97 +97,97 @@@ public class RunBackgroundCommandsJob i
          // for each command, we execute within its own transaction.  Failure 
of one should not impact the next.
          commandDtosIfAny.ifPresent(commandDtos -> {
              for (val commandDto : commandDtos) {
-                 interactionService.runAndCatch(interactionContext, () -> {
-                     
transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> {
-                         // look up the CommandLogEntry again because we are 
within a new transaction.
-                         val commandLogEntryIfAny = 
commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId()));
- 
-                         // finally, we execute
-                         commandLogEntryIfAny.ifPresent(commandLogEntry ->
-                         {
-                             commandExecutorService.executeCommand(
-                                     
CommandExecutorService.InteractionContextPolicy.NO_SWITCH, commandDto);
-                             
commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp());
-                         });
-                     })
-                     .ifFailureFail();
-                 })
-                 .ifFailure(throwable -> {
-                     log.error("Failed to execute command: " + 
CommandDtoUtils.dtoMapper().toString(commandDto), throwable);
-                     // update this command as having failed.
-                     interactionService.runAndCatch(interactionContext, () -> {
-                         
transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> {
-                             // look up the CommandLogEntry again because we 
are within a new transaction.
-                             val commandLogEntryIfAny = 
commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId()));
- 
-                             // capture the error
-                             commandLogEntryIfAny.ifPresent(commandLogEntry ->
-                             {
-                                 commandLogEntry.setException(throwable);
-                                 
commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp());
-                             });
-                         });
-                     });
+                 executeCommandWithinOwnTransaction(commandDto, 
interactionContext);
+             }
+         });
+     }
+ 
+     private Optional<List<CommandDto>> pendingCommandDtos(final 
InteractionContext interactionContext) {
+         return interactionService.callAndCatch(interactionContext, () ->
+             transactionService.callTransactional(Propagation.REQUIRES_NEW, () 
->
+                 commandLogEntryRepository.findBackgroundAndNotYetStarted()
+                         .stream()
+                         .map(CommandLogEntry::getCommandDto)
+                         .collect(Collectors.toList())
+                 )
+                 .ifFailureFail()
+                 .valueAsNonNullElseFail()
+             )
+             .ifFailureFail()    // we give up if unable to find these
+             .getValue();
+     }
+ 
+     private void executeCommandWithinOwnTransaction(
+             final CommandDto commandDto,
+             final InteractionContext interactionContext
+     ) {
+         int retryCount = RETRY_COUNT;
+         while(retryCount > 0) {
+             Try<?> result = 
interactionService.runAndCatch(interactionContext, () -> {
+                 executeCommandWithinOwnTransactionElseFail(commandDto);
+             });
+             if (isEncounteredDeadlock(result)) {
+                 retryCount--;
+                 log.debug("Deadlock occurred, retrying command: " + 
CommandDtoUtils.dtoMapper().toString(commandDto));
+                 sleep(RETRY_INTERVAL_MILLIS);
+             } else {
+                 retryCount=0;   // ie break
+                 result.ifFailure(throwable -> {
+                     logAndCaptureFailure(throwable, commandDto, 
interactionContext);
                  });
              }
+         }
+     }
+ 
 -    private void executeCommandWithinOwnTransactionElseFail(CommandDto 
commandDto) {
++    private void executeCommandWithinOwnTransactionElseFail(final CommandDto 
commandDto) {
+         transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> {
+                 // look up the CommandLogEntry again because we are within a 
new transaction.
+                 val commandLogEntryIfAny = 
commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId()));
+ 
+                 // finally, we execute
+                 commandLogEntryIfAny.ifPresent(commandLogEntry ->
+                 {
+                     commandExecutorService.executeCommand(
+                             
CommandExecutorService.InteractionContextPolicy.NO_SWITCH, commandDto);
+                     
commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp());
+                 });
+             })
+             .ifFailureFail();
+     }
+ 
 -    private void logAndCaptureFailure(Throwable throwable, CommandDto 
commandDto, InteractionContext interactionContext) {
++    private void logAndCaptureFailure(final Throwable throwable, final 
CommandDto commandDto, final InteractionContext interactionContext) {
+         log.error("Failed to execute command: " + 
CommandDtoUtils.dtoMapper().toString(commandDto), throwable);
+         // update this command as having failed.
+         interactionService.runAndCatch(interactionContext, () -> {
+             transactionService.runTransactional(Propagation.REQUIRES_NEW, () 
-> {
+                 // look up the CommandLogEntry again because we are within a 
new transaction.
+                 val commandLogEntryIfAny = 
commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId()));
+ 
+                 // capture the error
+                 commandLogEntryIfAny.ifPresent(commandLogEntry ->
+                 {
+                     commandLogEntry.setException(throwable);
+                     
commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp());
+                 });
+             });
          });
      }
  
 -    private static boolean isEncounteredDeadlock(Try<?> result) {
++    private static boolean isEncounteredDeadlock(final Try<?> result) {
+         if (!result.isFailure()) {
+             return false;
+         }
+         return result.getFailure()
+                 .map(throwable -> throwable instanceof 
DeadlockLoserDataAccessException)
+                 .orElse(false);
+     }
+ 
 -    private static void sleep(long retryIntervalMs) {
++    private static void sleep(final long retryIntervalMs) {
+         try {
+             Thread.sleep(retryIntervalMs);
+         } catch (InterruptedException e) {
+             // do nothing - continue
+         }
+     }
+ 
  }

Reply via email to