George Bloggs commented on KAFKA-6647:

Yeah, I can see your thought path. The .lock file is created by the streams 
process. It seems impossible for the thread attempting to delete the .lock file 
to do so. On Windows 10, attempting to delete the .lock file throws an 

To repeat, run an app that streams from a topic to an inMemory KTable. Stop the 
app. The .lock file will remain even if you have cleanup in a shutdownHook. If 
you attempt to cleanup before a streams.start(), as per docs, it too fails 
ultimately as it tries to delete the parent directory. The failure is due to 
the fact the Utils.delete call failed to delete the .lock file so when it 
attempts to delete the parent directory it fails as the directory is not empty. 

Understand what you are saying about the thread owning the .lock. If no process 
owns any lock on the .lock then the cleanup process should be able to acquire 
the lock and delete the file. This isn’t happening. Give it a whirl. 

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean
> ------------------------------------------------------------------------
>                 Key: KAFKA-6647
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6647
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.1
>         Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>            Reporter: George Bloggs
>            Priority: Minor
>              Labels: streams
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>                   if (lock(id, 0)) {
>                         long now = time.milliseconds();
>                         long lastModifiedMs = taskDir.lastModified();
>                         if (now > lastModifiedMs + cleanupDelayMs) {
>                             log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
>                             Utils.delete(taskDir);
>                         }
>                     }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>      : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.

This message was sent by Atlassian JIRA

Reply via email to