This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 617ee00  KAFKA-12509 Tighten up StateDirectory thread locking (#10418)
617ee00 is described below

commit 617ee003223873da2c1c7f383d3416684c2d84c8
Author: ketulgupta1995 <[email protected]>
AuthorDate: Wed Mar 31 00:43:53 2021 +0530

    KAFKA-12509 Tighten up StateDirectory thread locking (#10418)
    
    Modified LockAndOwner class to have Thread reference instead of just name
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../kafka/streams/processor/internals/StateDirectory.java     | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 01f62e7..1b745a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -96,9 +96,8 @@ public class StateDirectory {
 
     private static class LockAndOwner {
         final FileLock lock;
-        final String owningThread;
-
-        LockAndOwner(final String owningThread, final FileLock lock) {
+        final Thread owningThread;
+        LockAndOwner(final Thread owningThread, final FileLock lock) {
             this.owningThread = owningThread;
             this.lock = lock;
         }
@@ -298,7 +297,7 @@ public class StateDirectory {
         final File lockFile;
         // we already have the lock so bail out here
         final LockAndOwner lockAndOwner = locks.get(taskId);
-        if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+        if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread())) {
             log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
             return true;
         } else if (lockAndOwner != null) {
@@ -327,7 +326,7 @@ public class StateDirectory {
 
         final FileLock lock = tryLock(channel);
         if (lock != null) {
-            locks.put(taskId, new 
LockAndOwner(Thread.currentThread().getName(), lock));
+            locks.put(taskId, new LockAndOwner(Thread.currentThread(), lock));
 
             log.debug("{} Acquired state dir lock for task {}", logPrefix(), 
taskId);
         }
@@ -384,7 +383,7 @@ public class StateDirectory {
      */
     synchronized void unlock(final TaskId taskId) throws IOException {
         final LockAndOwner lockAndOwner = locks.get(taskId);
-        if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+        if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread())) {
             locks.remove(taskId);
             lockAndOwner.lock.release();
             log.debug("{} Released state dir lock for task {}", logPrefix(), 
taskId);

Reply via email to