[ https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035590#comment-16035590 ]
Kevin Chen commented on KAFKA-5070: ----------------------------------- Hi, Matthias: my state store was not corrupted. I had tried to delete my local state dir. But not every time it worked. I think I found out the root cause, it is related to timing. Even I did not delete my local state dir, I just need increase my poll time, then it started ok. During fresh start, the application has been assigned 12 tasks, some of the tasks initialized and started to processing data, while the other are still being initialized. Since I am using in-memory state store, it will pull data from broker so it may starve those tasks that have been initialized. If those tasks cannot finish their process within the poll time, it will trigger re-balance. it will try to start a new one while old one is still running. also, I do not think the lock was released properly when this happen because when I debug into your library, I saw it throw OverlappingFileLockException. according to Oracle, this means: Unchecked exception thrown when an attempt is made to acquire a lock on a region of a file that overlaps a region already locked by the same Java virtual machine, or when another thread is already waiting to lock an overlapping region of the same file. Let me know if it need more information from me to fix the issue. thanks, Kevin > org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the > state directory: /opt/rocksdb/pulse10/0_18 > ------------------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-5070 > URL: https://issues.apache.org/jira/browse/KAFKA-5070 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Environment: Linux Version > Reporter: Dhana > Assignee: Matthias J. Sax > Attachments: RocksDB_LockStateDirec.7z > > > Notes: we run two instance of consumer in two difference machines/nodes. > we have 400 partitions. 200 stream threads/consumer, with 2 consumer. > We perform HA test(on rebalance - shutdown of one of the consumer/broker), we > see this happening > Error: > 2017-04-05 11:36:09.352 WARN StreamThread:1184 StreamThread-66 - Could not > create task 0_115. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock > the state directory: /opt/rocksdb/pulse10/0_115 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) -- This message was sent by Atlassian JIRA (v6.3.15#6346)