John's last comment is related to the question he asked in the second last 
comment above.

If the lock is removed from ```locks``` unconditionally, there wouldn't be 
subsequent unlocking of the same lock.

lock.release() call may throw ClosedChannelException (I checked source code of 
FileLockImpl) which implies the lock can be considered released.
In that case, ClosedChannelException doesn't need to be propagated.

How about the following change ?
```
     synchronized void unlock(final TaskId taskId) throws IOException {
         final LockAndOwner lockAndOwner = locks.get(taskId);
         if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+            try {
+                lockAndOwner.lock.release();
+            } catch (ClosedChannelException cce) {
+                // no need to propagate
+            }
             locks.remove(taskId);
-            lockAndOwner.lock.release();
             log.debug("{} Released state dir lock for task {}", logPrefix(), 
taskId);

             final FileChannel fileChannel = channels.remove(taskId);
```

[ Full content available at: https://github.com/apache/kafka/pull/5574 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to