Narendra Kumar created KAFKA-5167:
-------------------------------------
Summary: streams task gets stuck after re-balance due to
LockException
Key: KAFKA-5167
URL: https://issues.apache.org/jira/browse/KAFKA-5167
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.10.2.0
Reporter: Narendra Kumar
During rebalance processor node's close() method gets called two times. I have
some instance filed which I am closing in processor's close method. This
instance's close method throws some exception if I call close more than once.
Because of this exception, the Kafka streams does not attempt to close the
statemanager ie. task.closeStateManager(true) is never called. When a task
moves from one thread to another within same machine the task blocks trying to
get lock on state directory which is still held by unclosed statemanager and
keep throwing the following exception:
2017-04-30 12:34:17 WARN StreamThread:1214 - Could not create task 0_1. Will
retry.
org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the
state directory for task 0_1
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)