This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 0827b19ca36 MINOR: Moving the rollback out of lock in share partition 
(#20153) (#20651)
0827b19ca36 is described below

commit 0827b19ca36a25f2fffdf81ffa0fdada26a4916f
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Oct 7 20:24:37 2025 +0100

    MINOR: Moving the rollback out of lock in share partition (#20153) (#20651)
    
    Moving rollback out of lock, if persister returns a completed future for
    write state then same data-plane-request-handler thread should not call
    purgatory safeTryAndComplete while holding SharePartition's write lock.
    
    Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit
     <[email protected]>
---
 core/src/main/java/kafka/server/share/SharePartition.java | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index eb0fedbe8cd..6bad3594cdf 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -958,14 +958,12 @@ public class SharePartition {
                     break;
                 }
             }
-
-            // If the acknowledgement is successful then persist state, 
complete the state transition
-            // and update the cached state for start offset. Else rollback the 
state transition.
-            rollbackOrProcessStateUpdates(future, throwable, updatedStates, 
stateBatches);
         } finally {
             lock.writeLock().unlock();
         }
-
+        // If the acknowledgement is successful then persist state, complete 
the state transition
+        // and update the cached state for start offset. Else rollback the 
state transition.
+        rollbackOrProcessStateUpdates(future, throwable, updatedStates, 
stateBatches);
         return future;
     }
 
@@ -1014,13 +1012,12 @@ public class SharePartition {
                     break;
                 }
             }
-
-            // If the release acquired records is successful then persist 
state, complete the state transition
-            // and update the cached state for start offset. Else rollback the 
state transition.
-            rollbackOrProcessStateUpdates(future, throwable, updatedStates, 
stateBatches);
         } finally {
             lock.writeLock().unlock();
         }
+        // If the release acquired records is successful then persist state, 
complete the state transition
+        // and update the cached state for start offset. Else rollback the 
state transition.
+        rollbackOrProcessStateUpdates(future, throwable, updatedStates, 
stateBatches);
         return future;
     }
 

Reply via email to