This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c4db4774 RATIS-2433. Cancel transaction in case of failure to append 
(#1382)
2c4db4774 is described below

commit 2c4db4774b1717eecebed0dc52f9a44b11c2dbd5
Author: Abhishek Pal <[email protected]>
AuthorDate: Thu Apr 9 23:27:44 2026 +0530

    RATIS-2433. Cancel transaction in case of failure to append (#1382)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 89 +++++++++++++++-------
 .../apache/ratis/server/impl/RetryCacheImpl.java   | 10 ---
 .../statemachine/impl/TransactionContextImpl.java  |  2 -
 .../impl/RaftStateMachineExceptionTests.java       | 35 +++++++++
 4 files changed, 98 insertions(+), 38 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d4c6f164e..a9c80d000 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -768,17 +768,18 @@ class RaftServerImpl implements RaftServer.Division,
     } catch (GroupMismatchException e) {
       return JavaUtils.completeExceptionally(e);
     }
-    return checkLeaderState(request, null);
+    return checkLeaderState(request, null, null);
   }
 
   /**
    * @return null if the server is in leader state.
    */
-  private CompletableFuture<RaftClientReply> 
checkLeaderState(RaftClientRequest request, CacheEntry entry) {
+  private CompletableFuture<RaftClientReply> checkLeaderState(
+      RaftClientRequest request, CacheEntry entry, TransactionContextImpl 
context) {
     if (!getInfo().isLeader()) {
       NotLeaderException exception = generateNotLeaderException();
       final RaftClientReply reply = newExceptionReply(request, exception);
-      return RetryCacheImpl.failWithReply(reply, entry);
+      return failWithReply(reply, entry, context);
     }
     if (!getInfo().isLeaderReady()) {
       final CacheEntry cacheEntry = 
retryCache.getIfPresent(ClientInvocationId.valueOf(request));
@@ -787,13 +788,13 @@ class RaftServerImpl implements RaftServer.Division,
       }
       final LeaderNotReadyException lnre = new 
LeaderNotReadyException(getMemberId());
       final RaftClientReply reply = newExceptionReply(request, lnre);
-      return RetryCacheImpl.failWithReply(reply, entry);
+      return failWithReply(reply, entry, context);
     }
 
     if (!request.isReadOnly() && isSteppingDown()) {
       final LeaderSteppingDownException lsde = new 
LeaderSteppingDownException(getMemberId() + " is stepping down");
       final RaftClientReply reply = newExceptionReply(request, lsde);
-      return RetryCacheImpl.failWithReply(reply, entry);
+      return failWithReply(reply, entry, context);
     }
 
     return null;
@@ -819,11 +820,44 @@ class RaftServerImpl implements RaftServer.Division,
         getMemberId() + " is not in " + expected + ": current state is " + c), 
expected);
   }
 
-  private CompletableFuture<RaftClientReply> 
getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) {
-    return entry.failWithException(new ResourceUnavailableException(
-        getMemberId() + ": Failed to acquire a pending write request for " + 
request));
+  private CompletableFuture<RaftClientReply> 
getResourceUnavailableReply(String op,
+      RaftClientRequest request, CacheEntry entry, TransactionContextImpl 
context) {
+    final ResourceUnavailableException e = new 
ResourceUnavailableException(getMemberId()
+        + ": Failed to " + op + " for " + request);
+    cancelTransaction(context, e);
+    return entry.failWithException(e);
+  }
+
+  private CompletableFuture<RaftClientReply> failWithReply(
+      RaftClientReply reply, CacheEntry entry, TransactionContextImpl context) 
{
+    if (context != null) {
+      cancelTransaction(context, reply.getException());
+    }
+
+    if (entry == null) {
+      return CompletableFuture.completedFuture(reply);
+    }
+    entry.failWithReply(reply);
+    return entry.getReplyFuture();
+  }
+
+  /** Cancel a transaction and notify the state machine. Set exception if 
provided to the transaction context. */
+  private void cancelTransaction(TransactionContextImpl context, Exception 
exception) {
+    if (context == null) {
+      return;
     }
 
+    if (exception != null) {
+      context.setException(exception);
+    }
+
+    try {
+      context.cancelTransaction();
+    } catch (IOException ioe) {
+      LOG.warn("{}: Failed to cancel transaction {}", getMemberId(), context, 
ioe);
+    }
+  }
+
   /**
    * Handle a normal update request from client.
    */
@@ -836,27 +870,28 @@ class RaftServerImpl implements RaftServer.Division,
 
     final LeaderStateImpl unsyncedLeaderState = 
role.getLeaderState().orElse(null);
     if (unsyncedLeaderState == null) {
-      final RaftClientReply reply = newExceptionReply(request, 
generateNotLeaderException());
-      return RetryCacheImpl.failWithReply(reply, cacheEntry);
+      final NotLeaderException nle = generateNotLeaderException();
+      final RaftClientReply reply = newExceptionReply(request, nle);
+      return failWithReply(reply, cacheEntry, context);
     }
     final PendingRequests.Permit unsyncedPermit = 
unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage());
     if (unsyncedPermit == null) {
-      return getResourceUnavailableReply(request, cacheEntry);
+      return getResourceUnavailableReply("acquire a pending write request", 
request, cacheEntry, context);
     }
 
     final LeaderStateImpl leaderState;
     final PendingRequest pending;
     synchronized (this) {
-      final CompletableFuture<RaftClientReply> reply = 
checkLeaderState(request, cacheEntry);
+      final CompletableFuture<RaftClientReply> reply = 
checkLeaderState(request, cacheEntry, context);
       if (reply != null) {
         return reply;
       }
 
       leaderState = role.getLeaderStateNonNull();
-      final PendingRequests.Permit permit = leaderState == 
unsyncedLeaderState? unsyncedPermit
+      final PendingRequests.Permit permit = leaderState == unsyncedLeaderState 
? unsyncedPermit
           : leaderState.tryAcquirePendingRequest(request.getMessage());
       if (permit == null) {
-        return getResourceUnavailableReply(request, cacheEntry);
+        return getResourceUnavailableReply("acquire a pending write request", 
request, cacheEntry, context);
       }
 
       // append the message to its local log
@@ -866,20 +901,18 @@ class RaftServerImpl implements RaftServer.Division,
       } catch (StateMachineException e) {
         // the StateMachineException is thrown by the SM in the preAppend 
stage.
         // Return the exception in a RaftClientReply.
-        RaftClientReply exceptionReply = newExceptionReply(request, e);
-        cacheEntry.failWithReply(exceptionReply);
+        final RaftClientReply exceptionReply = newExceptionReply(request, e);
         // leader will step down here
         if (e.leaderShouldStepDown() && getInfo().isLeader()) {
           
leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION);
         }
-        return CompletableFuture.completedFuture(exceptionReply);
+        return failWithReply(exceptionReply, cacheEntry, null);
       }
 
       // put the request into the pending queue
       pending = leaderState.addPendingRequest(permit, request, context);
       if (pending == null) {
-        return cacheEntry.failWithException(new ResourceUnavailableException(
-            getMemberId() + ": Failed to add a pending write request for " + 
request));
+        return getResourceUnavailableReply("add a pending write request", 
request, cacheEntry, context);
       }
     }
     leaderState.notifySenders();
@@ -1011,19 +1044,23 @@ class RaftServerImpl implements RaftServer.Division,
       // return the cached future.
       return cacheEntry.getReplyFuture();
     }
-    // TODO: this client request will not be added to pending requests until
-    // later which means that any failure in between will leave partial state 
in
-    // the state machine. We should call cancelTransaction() for failed 
requests
+    // This request will be added to pending requests later in 
appendTransaction.
+    // Any failure in between must invoke cancelTransaction.
     final TransactionContextImpl context = (TransactionContextImpl) 
stateMachine.startTransaction(
         filterDataStreamRaftClientRequest(request));
     if (context.getException() != null) {
-      final StateMachineException e = new StateMachineException(getMemberId(), 
context.getException());
+      final Exception exception = context.getException();
+      final StateMachineException e = new StateMachineException(getMemberId(), 
exception);
       final RaftClientReply exceptionReply = newExceptionReply(request, e);
-      cacheEntry.failWithReply(exceptionReply);
-      return CompletableFuture.completedFuture(exceptionReply);
+      return failWithReply(exceptionReply, cacheEntry, context);
     }
 
-    return appendTransaction(request, context, cacheEntry);
+    try {
+      return appendTransaction(request, context, cacheEntry);
+    } catch (Exception e) {
+      cancelTransaction(context, e);
+      throw e;
+    }
   }
 
   private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest 
request) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
index 4da459ae9..96ad62a53 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
@@ -257,14 +257,4 @@ class RetryCacheImpl implements RetryCache {
     cache.invalidateAll();
     statistics.set(null);
   }
-
-  static CompletableFuture<RaftClientReply> failWithReply(
-      RaftClientReply reply, CacheEntry entry) {
-    if (entry != null) {
-      entry.failWithReply(reply);
-      return entry.getReplyFuture();
-    } else {
-      return CompletableFuture.completedFuture(reply);
-    }
-  }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
index d92f3a1c8..8497b12f4 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
@@ -191,8 +191,6 @@ public class TransactionContextImpl implements 
TransactionContext {
 
   @Override
   public TransactionContext cancelTransaction() throws IOException {
-    // TODO: This is not called from Raft server / log yet. When an 
IOException happens, we should
-    // call this to let the SM know that Transaction cannot be synced
     return stateMachine.cancelTransaction(this);
   }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index 3a58f4e7c..1e46907d1 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -40,6 +40,7 @@ import org.slf4j.event.Level;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -52,6 +53,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER 
extends MiniRaftClu
   }
 
   private static volatile boolean failPreAppend = false;
+  private static final AtomicInteger numCancelTransaction = new 
AtomicInteger();
 
   protected static class StateMachineWithException extends
       SimpleStateMachine4Testing {
@@ -72,6 +74,12 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER 
extends MiniRaftClu
         return trx;
       }
     }
+
+    @Override
+    public TransactionContext cancelTransaction(TransactionContext trx) throws 
IOException {
+      numCancelTransaction.incrementAndGet();
+      return super.cancelTransaction(trx);
+    }
   }
 
   {
@@ -179,4 +187,31 @@ public abstract class 
RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
       failPreAppend = false;
     }
   }
+
+  @Test
+  public void testNoCancelTransactionOnPreAppendFailure() throws Exception {
+    runWithNewCluster(3, this::runTestNoCancelTransactionOnPreAppendFailure);
+  }
+
+  private void runTestNoCancelTransactionOnPreAppendFailure(CLUSTER cluster) 
throws Exception {
+    final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+    failPreAppend = true;
+    numCancelTransaction.set(0);
+    try (final RaftClient client = cluster.createClient(leaderId)) {
+      try {
+        client.io().send(new SimpleMessage("cancel-transaction"));
+        fail("Exception expected");
+      } catch (StateMachineException e) {
+        Assertions.assertTrue(e.getCause().getMessage().contains("Fake 
Exception in preAppend"));
+      }
+
+      JavaUtils.attemptRepeatedly(() -> {
+        Assertions.assertEquals(0, numCancelTransaction.get(),
+            () -> "Expected cancelTransaction() not to be called but got " + 
numCancelTransaction.get());
+        return null;
+      }, 10, ONE_SECOND, "wait for cancelTransaction", LOG);
+    } finally {
+      failPreAppend = false;
+    }
+  }
 }

Reply via email to