[ 
https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16002228#comment-16002228
 ] 

Matthias J. Sax commented on KAFKA-5070:
----------------------------------------

Digging a little further, it seems the first severe error is:
{noformat}
2017-04-05 12:51:18.481 ERROR StreamThread:783 StreamThread-127 - stream-thread 
[StreamThread-127] Failed to commit StreamTask 0_331 state: 
org.apache.kafka.streams.errors.ProcessorStateException: task [0_331] Failed to 
flush state store fmdbt
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325)
        at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:764)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while 
executing flush from store fmdbt
        at 
com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flushInternal(HarmanRocksDBStore.java:353)
        at 
com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flush(HarmanRocksDBStore.java:342)
        at 
com.harman.analytics.stream.base.stores.HarmanPersistentKVStore.flush(HarmanPersistentKVStore.java:72)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323)
        ... 8 more
Caused by: org.rocksdb.RocksDBException: N
        at org.rocksdb.RocksDB.flush(Native Method)
        at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
        at 
com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flushInternal(HarmanRocksDBStore.java:351)
        ... 11 more
{noformat}

Looking at the stacktrace, it seems you are using a custom store implementation 
{{HarmanRocksDBStore}} -- what does this store do differently than default 
RocksDB stored shipped with Streams? Can you try using default RocksDB store to 
see if the error is still there?

> org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the 
> state directory: /opt/rocksdb/pulse10/0_18
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5070
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5070
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>         Environment: Linux Version
>            Reporter: Dhana
>            Assignee: Matthias J. Sax
>         Attachments: RocksDB_LockStateDirec.7z
>
>
> Notes: we run two instance of consumer in two difference machines/nodes.
> we have 400 partitions. 200  stream threads/consumer, with 2 consumer.
> We perform HA test(on rebalance - shutdown of one of the consumer/broker), we 
> see this happening
> Error:
> 2017-04-05 11:36:09.352 WARN  StreamThread:1184 StreamThread-66 - Could not 
> create task 0_115. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock 
> the state directory: /opt/rocksdb/pulse10/0_115
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to