[ 
https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5167:
-----------------------------------
    Fix Version/s: 0.10.2.2

> streams task gets stuck after re-balance due to LockException
> -------------------------------------------------------------
>
>                 Key: KAFKA-5167
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5167
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
>            Reporter: Narendra Kumar
>            Assignee: Matthias J. Sax
>             Fix For: 0.10.2.2, 0.11.0.1, 0.11.1.0
>
>         Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once 
> from StreamThread.suspendTasksAndState() and once from 
> StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed 
> which I am closing in processor's close method. This instance's close method 
> throws some exception if I call close more than once. Because of this 
> exception, the Kafka streams does not attempt to close the statemanager ie.  
> task.closeStateManager(true) is never called. When a task moves from one 
> thread to another within same machine the task blocks trying to get lock on 
> state directory which is still held by unclosed statemanager and keep 
> throwing the below warning message:
> 2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will 
> retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
> state directory for task 0_1
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to