[ 
https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035590#comment-16035590
 ] 

Kevin Chen commented on KAFKA-5070:
-----------------------------------

Hi, Matthias:

my state store was not corrupted. I had tried to  delete my local state dir. 
But not every time it worked.

I think I found out the root cause, it is related to timing. Even I did not 
delete my local state dir, I just need increase my poll time, then it started 
ok.

During fresh start, the application has been assigned 12 tasks, some of the 
tasks initialized and started to processing data, while the other are still 
being initialized. Since I am using in-memory state store, it will pull data 
from broker so it may starve those tasks that have been initialized. If those 
tasks cannot finish their process within  the poll time, it will trigger 
re-balance.  it will try to start a new one while old one is still running.  
also, I do not think the lock was released properly when this happen because 
when I debug into your library, I saw it throw OverlappingFileLockException.  
according to Oracle, this means:

   Unchecked exception thrown when an attempt is made to acquire a lock on a 
region of a file that overlaps a region already locked by the same Java virtual 
machine, or when another thread is already waiting to lock an overlapping 
region of the same file.

Let me know if it need more information from me to fix the issue.

thanks,
Kevin



> org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the 
> state directory: /opt/rocksdb/pulse10/0_18
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5070
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5070
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>         Environment: Linux Version
>            Reporter: Dhana
>            Assignee: Matthias J. Sax
>         Attachments: RocksDB_LockStateDirec.7z
>
>
> Notes: we run two instance of consumer in two difference machines/nodes.
> we have 400 partitions. 200  stream threads/consumer, with 2 consumer.
> We perform HA test(on rebalance - shutdown of one of the consumer/broker), we 
> see this happening
> Error:
> 2017-04-05 11:36:09.352 WARN  StreamThread:1184 StreamThread-66 - Could not 
> create task 0_115. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock 
> the state directory: /opt/rocksdb/pulse10/0_115
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to