This is an automated email from the ASF dual-hosted git repository.
arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new e811491da7 FINERACT-2304: Handle Stuck CommandProcessing Retries
e811491da7 is described below
commit e811491da71553513d88129535e131c9486a5d91
Author: Soma Sörös <[email protected]>
AuthorDate: Wed Jun 11 17:03:38 2025 +0200
FINERACT-2304: Handle Stuck CommandProcessing Retries
---
.../configuration/RetryConfigurationAssembler.java | 18 +-
.../SynchronousCommandProcessingService.java | 8 +-
.../batch/service/BatchApiServiceImplTest.java | 5 +
.../SynchronousCommandProcessingServiceTest.java | 212 ++++++++++++++++++++-
4 files changed, 238 insertions(+), 5 deletions(-)
diff --git
a/fineract-core/src/main/java/org/apache/fineract/commands/configuration/RetryConfigurationAssembler.java
b/fineract-core/src/main/java/org/apache/fineract/commands/configuration/RetryConfigurationAssembler.java
index 91ccfff738..fb0fc05bab 100644
---
a/fineract-core/src/main/java/org/apache/fineract/commands/configuration/RetryConfigurationAssembler.java
+++
b/fineract-core/src/main/java/org/apache/fineract/commands/configuration/RetryConfigurationAssembler.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
import lombok.AllArgsConstructor;
import org.apache.fineract.batch.service.BatchExecutionException;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import
org.apache.fineract.infrastructure.core.domain.FineractRequestContextHolder;
import org.springframework.stereotype.Service;
@AllArgsConstructor
@@ -34,19 +35,32 @@ public class RetryConfigurationAssembler {
public static final String EXECUTE_COMMAND = "executeCommand";
public static final String BATCH_RETRY = "batchRetry";
+ private static final String LAST_EXECUTION_EXCEPTION_KEY =
"LAST_EXECUTION_EXCEPTION";
private final RetryRegistry registry;
private final FineractProperties fineractProperties;
+ private final FineractRequestContextHolder fineractRequestContextHolder;
private boolean isAssignableFrom(Object e, Class<? extends Throwable>[]
exceptionList) {
return Arrays.stream(exceptionList).anyMatch(re ->
re.isAssignableFrom(e.getClass()));
}
+ private void setLastException(Object ex) {
+
fineractRequestContextHolder.setAttribute(LAST_EXECUTION_EXCEPTION_KEY,
ex.getClass());
+ }
+
+ public Class<?> getLastException() {
+ return (Class<?>)
fineractRequestContextHolder.getAttribute(RetryConfigurationAssembler.LAST_EXECUTION_EXCEPTION_KEY);
+ }
+
public Retry getRetryConfigurationForExecuteCommand() {
Class<? extends Throwable>[] exceptionList =
fineractProperties.getRetry().getInstances().getExecuteCommand().getRetryExceptions();
RetryConfig.Builder configBuilder =
buildCommonExecuteCommandConfiguration();
if (exceptionList != null) {
- configBuilder.retryOnException(e -> isAssignableFrom(e,
exceptionList));
+ configBuilder.retryOnException(ex -> {
+ setLastException(ex);
+ return isAssignableFrom(ex, exceptionList);
+ });
}
RetryConfig config = configBuilder.build();
@@ -60,8 +74,10 @@ public class RetryConfigurationAssembler {
if (exceptionList != null) {
configBuilder.retryOnException(ex -> {
if (ex instanceof BatchExecutionException e) {
+ setLastException(e.getCause().getClass());
return isAssignableFrom(e.getCause(), exceptionList);
} else {
+ setLastException(ex);
return isAssignableFrom(ex, exceptionList);
}
});
diff --git
a/fineract-core/src/main/java/org/apache/fineract/commands/service/SynchronousCommandProcessingService.java
b/fineract-core/src/main/java/org/apache/fineract/commands/service/SynchronousCommandProcessingService.java
index 37250c475b..656dfaa05f 100644
---
a/fineract-core/src/main/java/org/apache/fineract/commands/service/SynchronousCommandProcessingService.java
+++
b/fineract-core/src/main/java/org/apache/fineract/commands/service/SynchronousCommandProcessingService.java
@@ -188,7 +188,13 @@ public class SynchronousCommandProcessingService
implements CommandProcessingSer
}
CommandProcessingResultType status =
CommandProcessingResultType.fromInt(command.getStatus());
switch (status) {
- case UNDER_PROCESSING -> throw new
IdempotentCommandProcessUnderProcessingException(wrapper, idempotencyKey);
+ case UNDER_PROCESSING -> {
+ Class<?> lastExecutionExceptionClass =
retryConfigurationAssembler.getLastException();
+ if (lastExecutionExceptionClass == null
+ ||
IdempotentCommandProcessUnderProcessingException.class.isAssignableFrom(lastExecutionExceptionClass))
{
+ throw new
IdempotentCommandProcessUnderProcessingException(wrapper, idempotencyKey);
+ }
+ }
case PROCESSED -> throw new
IdempotentCommandProcessSucceedException(wrapper, idempotencyKey, command);
case ERROR -> {
if (!retry) {
diff --git
a/fineract-core/src/test/java/org/apache/fineract/batch/service/BatchApiServiceImplTest.java
b/fineract-core/src/test/java/org/apache/fineract/batch/service/BatchApiServiceImplTest.java
index 91c98c350f..573aed1a91 100644
---
a/fineract-core/src/test/java/org/apache/fineract/batch/service/BatchApiServiceImplTest.java
+++
b/fineract-core/src/test/java/org/apache/fineract/batch/service/BatchApiServiceImplTest.java
@@ -42,6 +42,7 @@ import org.apache.fineract.batch.domain.BatchResponse;
import org.apache.fineract.batch.exception.ErrorInfo;
import org.apache.fineract.commands.configuration.RetryConfigurationAssembler;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import
org.apache.fineract.infrastructure.core.domain.FineractRequestContextHolder;
import org.apache.fineract.infrastructure.core.exception.ErrorHandler;
import
org.apache.fineract.infrastructure.core.filters.BatchRequestPreprocessor;
import
org.apache.fineract.infrastructure.core.persistence.ExtendedJpaTransactionManager;
@@ -53,6 +54,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.transaction.support.DefaultTransactionStatus;
@@ -78,6 +80,9 @@ class BatchApiServiceImplTest {
@Mock
private FineractProperties fineractProperties;
+ @Spy
+ private FineractRequestContextHolder fineractRequestContextHolder;
+
@InjectMocks
private RetryConfigurationAssembler retryConfigurationAssembler;
diff --git
a/fineract-provider/src/test/java/org/apache/fineract/commands/service/SynchronousCommandProcessingServiceTest.java
b/fineract-provider/src/test/java/org/apache/fineract/commands/service/SynchronousCommandProcessingServiceTest.java
index f1df45a6b0..af59e49daf 100644
---
a/fineract-provider/src/test/java/org/apache/fineract/commands/service/SynchronousCommandProcessingServiceTest.java
+++
b/fineract-provider/src/test/java/org/apache/fineract/commands/service/SynchronousCommandProcessingServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.fineract.commands.service;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@@ -30,6 +31,7 @@ import static org.mockito.Mockito.when;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
+import io.github.resilience4j.retry.RetryRegistry;
import jakarta.servlet.http.HttpServletRequest;
import java.time.Duration;
import java.util.Map;
@@ -42,8 +44,10 @@ import
org.apache.fineract.commands.handler.NewCommandSourceHandler;
import org.apache.fineract.commands.provider.CommandHandlerProvider;
import
org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
import org.apache.fineract.infrastructure.core.api.JsonCommand;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.core.data.CommandProcessingResult;
import
org.apache.fineract.infrastructure.core.domain.FineractRequestContextHolder;
+import
org.apache.fineract.infrastructure.core.exception.IdempotentCommandProcessUnderProcessingException;
import
org.apache.fineract.infrastructure.core.exception.PlatformApiDataValidationException;
import
org.apache.fineract.infrastructure.core.serialization.ToApiJsonSerializer;
import
org.apache.fineract.infrastructure.security.service.PlatformSecurityContext;
@@ -80,6 +84,12 @@ public class SynchronousCommandProcessingServiceTest {
@Mock
private CommandSourceService commandSourceService;
+ @Mock
+ private RetryRegistry retryRegistry;
+
+ @Mock
+ private FineractProperties fineractProperties;
+
@Mock
private RetryConfigurationAssembler retryConfigurationAssembler;
@@ -95,14 +105,29 @@ public class SynchronousCommandProcessingServiceTest {
@BeforeEach
public void setup() {
MockitoAnnotations.openMocks(this);
+ RequestContextHolder.resetRequestAttributes();
RequestContextHolder.setRequestAttributes(new
ServletRequestAttributes(request));
-
when(retryConfigurationAssembler.getRetryConfigurationForExecuteCommand())
- .thenReturn(Retry.of("IDENTIFIER",
RetryConfig.custom().maxAttempts(3).failAfterMaxAttempts(true)
- .waitDuration(Duration.ofMillis(1)).retryOnException(e
-> e instanceof RetryException).build()));
+
ErrorInfo errorInfo = mock(ErrorInfo.class);
when(errorInfo.getMessage()).thenReturn("Failed");
when(errorInfo.getStatusCode()).thenReturn(500);
when(commandSourceService.generateErrorInfo(any())).thenReturn(errorInfo);
+
+ FineractProperties.RetryProperties settings = new
FineractProperties.RetryProperties();
+ settings.setInstances(new
FineractProperties.RetryProperties.InstancesProperties());
+ settings.getInstances().setExecuteCommand(new
FineractProperties.RetryProperties.InstancesProperties.ExecuteCommandProperties());
+ settings.getInstances().getExecuteCommand().setMaxAttempts(3);
+
settings.getInstances().getExecuteCommand().setWaitDuration(Duration.ofMillis(1));
+
settings.getInstances().getExecuteCommand().setEnableExponentialBackoff(false);
+ settings.getInstances().getExecuteCommand()
+ .setRetryExceptions(new Class[] { RetryException.class,
IdempotentCommandProcessUnderProcessingException.class });
+ when(fineractProperties.getRetry()).thenReturn(settings);
+ when(retryRegistry.retry(anyString(), any(RetryConfig.class)))
+ .thenAnswer(i -> Retry.of((String) i.getArgument(0),
(RetryConfig) i.getArgument(1)));
+
+ var impl = new RetryConfigurationAssembler(retryRegistry,
fineractProperties, fineractRequestContextHolder);
+ var retry = impl.getRetryConfigurationForExecuteCommand();
+
when(retryConfigurationAssembler.getRetryConfigurationForExecuteCommand()).thenReturn(retry);
}
@AfterEach
@@ -163,6 +188,187 @@ public class SynchronousCommandProcessingServiceTest {
verify(commandSourceService).saveResultSameTransaction(commandSource);
}
+ /**
+ * Test that an instance picked up an already under processing command. We
assume that during retry timeouts it
+ * stays in the same status therefor it should fail after reaching max
retry count.
+ */
+ @Test
+ public void
executeCommandShouldFailAfterRetriesWithIdempotentCommandProcessUnderProcessingException()
{
+ CommandWrapper commandWrapper = Mockito.mock(CommandWrapper.class);
+ when(commandWrapper.isDatatableResource()).thenReturn(false);
+ when(commandWrapper.isNoteResource()).thenReturn(false);
+ when(commandWrapper.isSurveyResource()).thenReturn(false);
+ when(commandWrapper.isLoanDisburseDetailResource()).thenReturn(false);
+
+ long commandId = 1L;
+ JsonCommand jsonCommand = Mockito.mock(JsonCommand.class);
+ when(jsonCommand.commandId()).thenReturn(commandId);
+
+ NewCommandSourceHandler commandHandler =
Mockito.mock(NewCommandSourceHandler.class);
+ CommandProcessingResult commandProcessingResult =
Mockito.mock(CommandProcessingResult.class);
+
when(commandProcessingResult.isRollbackTransaction()).thenReturn(false);
+
when(commandHandler.processCommand(jsonCommand)).thenReturn(commandProcessingResult);
+
+ when(commandHandlerProvider.getHandler(Mockito.any(),
Mockito.any())).thenReturn(commandHandler);
+
+
when(configurationDomainService.isMakerCheckerEnabledForTask(Mockito.any())).thenReturn(false);
+ String idk = "idk";
+ when(idempotencyKeyResolver.resolve(commandWrapper)).thenReturn(idk);
+ CommandSource commandSource = Mockito.mock(CommandSource.class);
+ when(commandSource.getId()).thenReturn(commandId);
+
+ when(commandSourceService.findCommandSource(any(),
any())).thenReturn(commandSource);
+
+
when(commandSourceService.getCommandSource(commandId)).thenReturn(commandSource);
+
+ AppUser appUser = Mockito.mock(AppUser.class);
+ when(commandSourceService.saveInitialNewTransaction(commandWrapper,
jsonCommand, appUser, idk)).thenReturn(commandSource);
+
when(commandSourceService.saveResultSameTransaction(commandSource)).thenReturn(commandSource);
+
when(commandSource.getStatus()).thenReturn(CommandProcessingResultType.UNDER_PROCESSING.getValue());
+
when(context.authenticatedUser(Mockito.any(CommandWrapper.class))).thenReturn(appUser);
+
+ when(commandSourceService.processCommand(commandHandler, jsonCommand,
commandSource, appUser, false))
+ .thenThrow(new RetryException()).thenThrow(new
RetryException()).thenReturn(commandProcessingResult);
+
+ when(retryConfigurationAssembler.getLastException()).thenReturn(null)
+ .thenAnswer((i) ->
IdempotentCommandProcessUnderProcessingException.class)
+ .thenAnswer((i) ->
IdempotentCommandProcessUnderProcessingException.class);
+
+ assertThrows(IdempotentCommandProcessUnderProcessingException.class,
+ () -> underTest.executeCommand(commandWrapper, jsonCommand,
false));
+
+ verify(commandSource, times(3)).getStatus();
+ assertEquals(CommandProcessingResultType.UNDER_PROCESSING.getValue(),
commandSource.getStatus());
+ verify(commandSourceService, times(0)).generateErrorInfo(any());
+ verify(commandSourceService,
times(0)).saveResultSameTransaction(commandSource);
+ }
+
+ /**
+ * Test that an instance picked up an already under processing command. We
assume that during retry timeouts it is
+ * moved out from retry and the process can pick it up. We expect 2 fails
then the third time the command is
+ * processable.
+ */
+ @Test
+ public void
executeCommandShouldPassAfter1retryFailsByIdempotentCommandProcessUnderProcessingException()
{
+ CommandWrapper commandWrapper = Mockito.mock(CommandWrapper.class);
+ when(commandWrapper.isDatatableResource()).thenReturn(false);
+ when(commandWrapper.isNoteResource()).thenReturn(false);
+ when(commandWrapper.isSurveyResource()).thenReturn(false);
+ when(commandWrapper.isLoanDisburseDetailResource()).thenReturn(false);
+
+ long commandId = 1L;
+ JsonCommand jsonCommand = Mockito.mock(JsonCommand.class);
+ when(jsonCommand.commandId()).thenReturn(commandId);
+
+ NewCommandSourceHandler commandHandler =
Mockito.mock(NewCommandSourceHandler.class);
+ CommandProcessingResult commandProcessingResult =
Mockito.mock(CommandProcessingResult.class);
+
when(commandProcessingResult.isRollbackTransaction()).thenReturn(false);
+
when(commandHandler.processCommand(jsonCommand)).thenReturn(commandProcessingResult);
+
+ when(commandHandlerProvider.getHandler(Mockito.any(),
Mockito.any())).thenReturn(commandHandler);
+
+
when(configurationDomainService.isMakerCheckerEnabledForTask(Mockito.any())).thenReturn(false);
+ String idk = "idk";
+ when(idempotencyKeyResolver.resolve(commandWrapper)).thenReturn(idk);
+ CommandSource commandSource = Mockito.mock(CommandSource.class);
+ when(commandSource.getId()).thenReturn(commandId);
+
+ when(commandSourceService.findCommandSource(any(),
any())).thenReturn(commandSource);
+
+
when(commandSourceService.getCommandSource(commandId)).thenReturn(commandSource);
+
+ AppUser appUser = Mockito.mock(AppUser.class);
+ when(commandSourceService.saveInitialNewTransaction(commandWrapper,
jsonCommand, appUser, idk)).thenReturn(commandSource);
+
when(commandSourceService.saveResultSameTransaction(commandSource)).thenReturn(commandSource);
+
when(commandSource.getStatus()).thenReturn(CommandProcessingResultType.UNDER_PROCESSING.getValue())
//
+
.thenReturn(CommandProcessingResultType.UNDER_PROCESSING.getValue()) //
+ // Is it possible???
+
.thenReturn(CommandProcessingResultType.AWAITING_APPROVAL.getValue()) //
+ ;
+
when(context.authenticatedUser(Mockito.any(CommandWrapper.class))).thenReturn(appUser);
+
+ when(commandSourceService.processCommand(commandHandler, jsonCommand,
commandSource, appUser, false))
+ .thenReturn(commandProcessingResult);
+
+ when(retryConfigurationAssembler.getLastException()).thenReturn(null)
+ .thenAnswer((i) ->
IdempotentCommandProcessUnderProcessingException.class);
+
+ CommandProcessingResult actualCommandProcessingResult =
underTest.executeCommand(commandWrapper, jsonCommand, false);
+
+ verify(commandSource, times(3)).getStatus();
+ verify(commandSourceService, times(0)).generateErrorInfo(any());
+ verify(commandSourceService).saveResultSameTransaction(commandSource);
+ assertEquals(actualCommandProcessingResult, commandProcessingResult);
+ }
+
+ /**
+ * Test that an instance picked up a new command. During first processing,
we expect a retryable exception to
+ * happen, but commandSource should have already UNDER_PROCESSING status.
We should try to reprocess. After 2nd time
+ * fail, status should be still the same. On 3rd try it should result no
error.
+ */
+ @Test
+ public void
executeCommandShouldPassAfter2RetriesOnRetryExceptionAndWithStuckStatus() {
+ CommandWrapper commandWrapper = Mockito.mock(CommandWrapper.class);
+ when(commandWrapper.isDatatableResource()).thenReturn(false);
+ when(commandWrapper.isNoteResource()).thenReturn(false);
+ when(commandWrapper.isSurveyResource()).thenReturn(false);
+ when(commandWrapper.isLoanDisburseDetailResource()).thenReturn(false);
+
+ long commandId = 1L;
+ JsonCommand jsonCommand = Mockito.mock(JsonCommand.class);
+ when(jsonCommand.commandId()).thenReturn(commandId);
+
+ NewCommandSourceHandler commandHandler =
Mockito.mock(NewCommandSourceHandler.class);
+ CommandProcessingResult commandProcessingResult =
Mockito.mock(CommandProcessingResult.class);
+
when(commandProcessingResult.isRollbackTransaction()).thenReturn(false);
+
when(commandHandler.processCommand(jsonCommand)).thenReturn(commandProcessingResult);
+
+ when(commandHandlerProvider.getHandler(Mockito.any(),
Mockito.any())).thenReturn(commandHandler);
+
+
when(configurationDomainService.isMakerCheckerEnabledForTask(Mockito.any())).thenReturn(false);
+ String idk = "idk";
+ when(idempotencyKeyResolver.resolve(commandWrapper)).thenReturn(idk);
+ CommandSource commandSource = Mockito.mock(CommandSource.class);
+ when(commandSource.getId()).thenReturn(commandId);
+
+
when(commandSourceService.getCommandSource(commandId)).thenReturn(commandSource);
+
+ AppUser appUser = Mockito.mock(AppUser.class);
+ when(commandSourceService.saveInitialNewTransaction(commandWrapper,
jsonCommand, appUser, idk)).thenReturn(commandSource);
+
when(commandSourceService.saveResultSameTransaction(commandSource)).thenReturn(commandSource);
+
+
when(context.authenticatedUser(Mockito.any(CommandWrapper.class))).thenReturn(appUser);
+
+ when(commandSourceService.findCommandSource(any(),
any())).thenReturn(null) // simulate new Command
+ .thenReturn(commandSource) // simulate stuck Command
+ .thenReturn(commandSource); // simulate stuck Command
+
+ when(commandSource.getStatus())
+ // on first hit we don't have a command source because it is
new.
+ // on 2nd hit we have a stuck one
+
.thenReturn(CommandProcessingResultType.UNDER_PROCESSING.getValue()) //
+
.thenReturn(CommandProcessingResultType.UNDER_PROCESSING.getValue()); //
+
+ when(retryConfigurationAssembler.getLastException()).thenAnswer((i) ->
RetryException.class)
+ .thenAnswer((i) -> RetryException.class);
+
+ when(commandSourceService.processCommand(commandHandler, jsonCommand,
commandSource, appUser, false))
+ // first time
+ .thenThrow(new RetryException())
+ // look like stuck and fails
+ .thenThrow(new RetryException())
+ // look like stuck and pass
+ .thenReturn(commandProcessingResult);
+
+ CommandProcessingResult actualCommandProcessingResult =
underTest.executeCommand(commandWrapper, jsonCommand, false);
+
+ verify(commandSource, times(2)).getStatus();
+ assertEquals(CommandProcessingResultType.UNDER_PROCESSING.getValue(),
commandSource.getStatus());
+ verify(commandSourceService, times(2)).generateErrorInfo(any());
+ verify(commandSourceService).saveResultSameTransaction(commandSource);
+ assertEquals(actualCommandProcessingResult, commandProcessingResult);
+ }
+
@Test
public void testExecuteCommandSuccess() {
CommandWrapper commandWrapper = Mockito.mock(CommandWrapper.class);