Based on what I've seen so far, I think the only "fix" we really need here is:
```java
    synchronized void unlock(final TaskId taskId) throws IOException {
        final LockAndOwner lockAndOwner = locks.get(taskId);
        if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-           locks.remove(taskId);
            lockAndOwner.lock.release();
            log.debug("{} Released state dir lock for task {}", logPrefix(), 
taskId);

            final FileChannel fileChannel = channels.remove(taskId);
            if (fileChannel != null) {
                fileChannel.close();
            }
+           locks.remove(taskId);
        }
    }
```

I.e., don't stop tracking the lock until we know it's been released and cleaned 
up.

[ Full content available at: https://github.com/apache/kafka/pull/5574 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to