Greg, what OS are you running on? Are you able to reproduce this in a test at all? For instance, based on what you described it would seem that i should be able to start a streams app, wait for it to be up and running, run the state dir cleanup, see it fail. However, i can't reproduce it.
On Wed, 5 Jul 2017 at 23:23 Damian Guy <damian....@gmail.com> wrote: > Thanks Greg. I'll look into it more tomorrow. Just finding it difficult to > reproduce in a test. > Thanks for providing the sequence, gives me something to try and repo. > Appreciated. > > Thanks, > Damian > On Wed, 5 Jul 2017 at 19:57, Greg Fodor <gfo...@gmail.com> wrote: > >> Also, the sequence of events is: >> >> - Job starts, rebalance happens, things run along smoothly. >> - After 10 minutes (retrospectively) the cleanup task kicks on and removes >> some directories >> - Tasks immediately start failing when trying to flush their state stores >> >> >> >> On Wed, Jul 5, 2017 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote: >> >> > The issue I am hitting is not the directory locking issues we've seen in >> > the past. The issue seems to be, as you mentioned, that the state dir is >> > getting deleted by the store cleanup process, but there are still tasks >> > running that are trying to flush the state store. It seems more than a >> > little scary given that right now it seems either a) there are tasks >> > running that should have been re-assigned or b) the cleanup job is >> removing >> > state directories for currently running + assigned tasks (perhaps >> during a >> > rebalance there is a race condition?) I'm guessing there's probably a >> more >> > benign explanation, but that is what it looks like right now. >> > >> > On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian....@gmail.com> >> wrote: >> > >> >> BTW - i'm trying to reproduce it, but not having much luck so far... >> >> >> >> On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian....@gmail.com> wrote: >> >> >> >> > Thans for the updates Greg. There were some minor changes around >> this in >> >> > 0.11.0 to make it less likely to happen, but we've only ever seen the >> >> > locking fail in the event of a rebalance. When everything is running >> >> state >> >> > dirs shouldn't be deleted if they are being used as the lock will >> fail. >> >> > >> >> > >> >> > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote: >> >> > >> >> >> I can report that setting state.cleanup.delay.ms to a very large >> value >> >> >> (effectively disabling it) works around the issue. It seems that the >> >> state >> >> >> store cleanup process can somehow get out ahead of another task that >> >> still >> >> >> thinks it should be writing to the state store/flushing it. In my >> test >> >> >> runs, this does not seem to be happening during a rebalancing event, >> >> but >> >> >> after the cluster is stable. >> >> >> >> >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com> >> wrote: >> >> >> >> >> >> > Upon another run, I see the same error occur during a rebalance, >> so >> >> >> either >> >> >> > my log was showing a rebalance or there is a shared underlying >> issue >> >> >> with >> >> >> > state stores. >> >> >> > >> >> >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com> >> >> wrote: >> >> >> > >> >> >> >> Also, I am on 0.10.2.1, so poll interval was already set to >> >> MAX_VALUE. >> >> >> >> >> >> >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com> >> >> wrote: >> >> >> >> >> >> >> >>> I've nuked the nodes this happened on, but the job had been >> running >> >> >> for >> >> >> >>> about 5-10 minutes across 5 nodes before this happened. Does the >> >> log >> >> >> show a >> >> >> >>> rebalance was happening? It looks to me like the standby task >> was >> >> just >> >> >> >>> committing as part of normal operations. >> >> >> >>> >> >> >> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy < >> damian....@gmail.com> >> >> >> wrote: >> >> >> >>> >> >> >> >>>> Hi Greg, >> >> >> >>>> >> >> >> >>>> Obviously a bit difficult to read the RocksDBException, but my >> >> guess >> >> >> is >> >> >> >>>> it >> >> >> >>>> is because the state directory gets deleted right before the >> flush >> >> >> >>>> happens: >> >> >> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO >> >> >> [StreamThread-21:StateDirector >> >> >> >>>> y@213] >> >> >> >>>> - Deleting obsolete state directory 0_10 for task 0_10 >> >> >> >>>> >> >> >> >>>> Yes it looks like it is possibly the same bug as KAFKA-5070. >> >> >> >>>> >> >> >> >>>> It looks like your application is constantly rebalancing during >> >> store >> >> >> >>>> intialization, which may be the reason this bug comes about >> (there >> >> >> is a >> >> >> >>>> chance that the state dir lock is released so when the thread >> >> tries >> >> >> to >> >> >> >>>> removes the stale state directory it is able to get the lock). >> You >> >> >> >>>> probably >> >> >> >>>> want to configure `max.poll.interval.ms` to be a reasonably >> large >> >> >> >>>> value ( i >> >> >> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can >> also >> >> try >> >> >> >>>> setting `state.cleanup.delay.ms` to a higher value (default >> is 10 >> >> >> >>>> minutes), >> >> >> >>>> to try and avoid it happening during a rebalance (I know this >> >> isn't a >> >> >> >>>> fix, >> >> >> >>>> but will make it less likely to happen). >> >> >> >>>> >> >> >> >>>> Thanks, >> >> >> >>>> Damian >> >> >> >>>> >> >> >> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> >> wrote: >> >> >> >>>> >> >> >> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to >> use >> >> >> Kafka >> >> >> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL >> job >> >> that >> >> >> >>>> has 4 >> >> >> >>>> > state stores and runs across a few hundred partitions, and as >> >> part >> >> >> of >> >> >> >>>> load >> >> >> >>>> > testing the job we are trying to reload our data out of kafka >> >> into >> >> >> a >> >> >> >>>> test >> >> >> >>>> > db. The result is we are able to load about 4M tuples and >> then >> >> this >> >> >> >>>> error >> >> >> >>>> > pops up on all of the stream nodes simultaneously. There are >> 4 >> >> >> rocksdb >> >> >> >>>> > stores in question and there are lots of these errors which >> >> takes >> >> >> it >> >> >> >>>> down. >> >> >> >>>> > This bug *does* not seem to occur on 0.10.1. >> >> >> >>>> > >> >> >> >>>> > A similar error was mentioned here: >> >> >> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070 >> >> >> >>>> > >> >> >> >>>> > Full log attached. >> >> >> >>>> > >> >> >> >>>> > org.apache.kafka.streams.errors.ProcessorStateException: task >> >> >> [0_10] >> >> >> >>>> > Failed to flush state store session-id-start-events >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM >> >> >> >>>> anager.flush(ProcessorStateManager.java:337) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.processor.internals.StandbyTask.com >> >> >> >>>> mit(StandbyTask.java:94) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co >> >> >> >>>> mmitOne(StreamThread.java:807) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co >> >> >> >>>> mmitAll(StreamThread.java:797) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ma >> >> >> >>>> ybeCommit(StreamThread.java:769) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru >> >> >> >>>> nLoop(StreamThread.java:647) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru >> >> >> >>>> n(StreamThread.java:361) >> >> >> >>>> > Caused by: org.apache.kafka.streams.error >> >> s.ProcessorStateException: >> >> >> >>>> Error >> >> >> >>>> > while executing flush from store session-id-start-events >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI >> >> >> >>>> nternal(RocksDBStore.java:354) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush( >> >> >> >>>> RocksDBStore.java:345) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A >> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A >> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor >> >> >> >>>> e$6.run(MeteredKeyValueStore.java:92) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI >> >> >> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor >> >> >> >>>> e.flush(MeteredKeyValueStore.java:186) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM >> >> >> >>>> anager.flush(ProcessorStateManager.java:335) >> >> >> >>>> > ... 6 more >> >> >> >>>> > Caused by: org.rocksdb.RocksDBException: v >> >> >> >>>> > at org.rocksdb.RocksDB.flush(Native Method) >> >> >> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) >> >> >> >>>> > at >> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI >> >> >> >>>> nternal(RocksDBStore.java:352) >> >> >> >>>> > ... 13 more >> >> >> >>>> > >> >> >> >>>> > >> >> >> >>>> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> > >> >> >> >> >> > >> >> >> > >> > >> >