Based on what I've seen so far, I think the only "fix" we really need here is:
```java
synchronized void unlock(final TaskId taskId) throws IOException {
final LockAndOwner lockAndOwner = locks.get(taskId);
if (lockAndOwner != null &&
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
- locks.remove(taskId);
lockAndOwner.lock.release();
log.debug("{} Released state dir lock for task {}", logPrefix(),
taskId);
final FileChannel fileChannel = channels.remove(taskId);
if (fileChannel != null) {
fileChannel.close();
}
+ locks.remove(taskId);
}
}
```
I.e., don't stop tracking the lock until we know it's been released and cleaned
up.
[ Full content available at: https://github.com/apache/kafka/pull/5574 ]
This message was relayed via gitbox.apache.org for [email protected]