[ https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16002228#comment-16002228 ]
Matthias J. Sax commented on KAFKA-5070: ---------------------------------------- Digging a little further, it seems the first severe error is: {noformat} 2017-04-05 12:51:18.481 ERROR StreamThread:783 StreamThread-127 - stream-thread [StreamThread-127] Failed to commit StreamTask 0_331 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_331] Failed to flush state store fmdbt at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:764) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while executing flush from store fmdbt at com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flushInternal(HarmanRocksDBStore.java:353) at com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flush(HarmanRocksDBStore.java:342) at com.harman.analytics.stream.base.stores.HarmanPersistentKVStore.flush(HarmanPersistentKVStore.java:72) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323) ... 8 more Caused by: org.rocksdb.RocksDBException: N at org.rocksdb.RocksDB.flush(Native Method) at org.rocksdb.RocksDB.flush(RocksDB.java:1642) at com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flushInternal(HarmanRocksDBStore.java:351) ... 11 more {noformat} Looking at the stacktrace, it seems you are using a custom store implementation {{HarmanRocksDBStore}} -- what does this store do differently than default RocksDB stored shipped with Streams? Can you try using default RocksDB store to see if the error is still there? > org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the > state directory: /opt/rocksdb/pulse10/0_18 > ------------------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-5070 > URL: https://issues.apache.org/jira/browse/KAFKA-5070 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Environment: Linux Version > Reporter: Dhana > Assignee: Matthias J. Sax > Attachments: RocksDB_LockStateDirec.7z > > > Notes: we run two instance of consumer in two difference machines/nodes. > we have 400 partitions. 200 stream threads/consumer, with 2 consumer. > We perform HA test(on rebalance - shutdown of one of the consumer/broker), we > see this happening > Error: > 2017-04-05 11:36:09.352 WARN StreamThread:1184 StreamThread-66 - Could not > create task 0_115. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock > the state directory: /opt/rocksdb/pulse10/0_115 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) -- This message was sent by Atlassian JIRA (v6.3.15#6346)