Also sorry, to clarify the job context: - This is a job running across 5 nodes on AWS Linux - It is under load with a large number of partitions: approximately 700-800 topic-partitions assignments in total for the entire job. Topics involved have large # of partitions, 128 each. - 32 stream threads per host. - Peak TPS seems to be approximately 5k-10k tuples/sec per node. We're reprocessing historical data in kafka.
On Thu, Jul 6, 2017 at 10:45 AM, Greg Fodor <gfo...@gmail.com> wrote: > That's great news, thanks! > > On Thu, Jul 6, 2017 at 6:18 AM, Damian Guy <damian....@gmail.com> wrote: > >> Hi Greg, >> I've been able to reproduce it by running multiple instances with standby >> tasks and many threads. If i force some rebalances, then i see the >> failure. >> Now to see if i can repro in a test. >> I think it is probably the same issue as: >> https://issues.apache.org/jira/browse/KAFKA-5070 >> >> On Thu, 6 Jul 2017 at 12:43 Damian Guy <damian....@gmail.com> wrote: >> >> > Greg, what OS are you running on? >> > Are you able to reproduce this in a test at all? >> > For instance, based on what you described it would seem that i should be >> > able to start a streams app, wait for it to be up and running, run the >> > state dir cleanup, see it fail. However, i can't reproduce it. >> > >> > On Wed, 5 Jul 2017 at 23:23 Damian Guy <damian....@gmail.com> wrote: >> > >> >> Thanks Greg. I'll look into it more tomorrow. Just finding it difficult >> >> to reproduce in a test. >> >> Thanks for providing the sequence, gives me something to try and repo. >> >> Appreciated. >> >> >> >> Thanks, >> >> Damian >> >> On Wed, 5 Jul 2017 at 19:57, Greg Fodor <gfo...@gmail.com> wrote: >> >> >> >>> Also, the sequence of events is: >> >>> >> >>> - Job starts, rebalance happens, things run along smoothly. >> >>> - After 10 minutes (retrospectively) the cleanup task kicks on and >> >>> removes >> >>> some directories >> >>> - Tasks immediately start failing when trying to flush their state >> stores >> >>> >> >>> >> >>> >> >>> On Wed, Jul 5, 2017 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote: >> >>> >> >>> > The issue I am hitting is not the directory locking issues we've >> seen >> >>> in >> >>> > the past. The issue seems to be, as you mentioned, that the state >> dir >> >>> is >> >>> > getting deleted by the store cleanup process, but there are still >> tasks >> >>> > running that are trying to flush the state store. It seems more >> than a >> >>> > little scary given that right now it seems either a) there are tasks >> >>> > running that should have been re-assigned or b) the cleanup job is >> >>> removing >> >>> > state directories for currently running + assigned tasks (perhaps >> >>> during a >> >>> > rebalance there is a race condition?) I'm guessing there's probably >> a >> >>> more >> >>> > benign explanation, but that is what it looks like right now. >> >>> > >> >>> > On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian....@gmail.com> >> >>> wrote: >> >>> > >> >>> >> BTW - i'm trying to reproduce it, but not having much luck so >> far... >> >>> >> >> >>> >> On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian....@gmail.com> >> wrote: >> >>> >> >> >>> >> > Thans for the updates Greg. There were some minor changes around >> >>> this in >> >>> >> > 0.11.0 to make it less likely to happen, but we've only ever seen >> >>> the >> >>> >> > locking fail in the event of a rebalance. When everything is >> running >> >>> >> state >> >>> >> > dirs shouldn't be deleted if they are being used as the lock will >> >>> fail. >> >>> >> > >> >>> >> > >> >>> >> > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote: >> >>> >> > >> >>> >> >> I can report that setting state.cleanup.delay.ms to a very >> large >> >>> value >> >>> >> >> (effectively disabling it) works around the issue. It seems that >> >>> the >> >>> >> state >> >>> >> >> store cleanup process can somehow get out ahead of another task >> >>> that >> >>> >> still >> >>> >> >> thinks it should be writing to the state store/flushing it. In >> my >> >>> test >> >>> >> >> runs, this does not seem to be happening during a rebalancing >> >>> event, >> >>> >> but >> >>> >> >> after the cluster is stable. >> >>> >> >> >> >>> >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com> >> >>> wrote: >> >>> >> >> >> >>> >> >> > Upon another run, I see the same error occur during a >> rebalance, >> >>> so >> >>> >> >> either >> >>> >> >> > my log was showing a rebalance or there is a shared underlying >> >>> issue >> >>> >> >> with >> >>> >> >> > state stores. >> >>> >> >> > >> >>> >> >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com >> > >> >>> >> wrote: >> >>> >> >> > >> >>> >> >> >> Also, I am on 0.10.2.1, so poll interval was already set to >> >>> >> MAX_VALUE. >> >>> >> >> >> >> >>> >> >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor < >> gfo...@gmail.com> >> >>> >> wrote: >> >>> >> >> >> >> >>> >> >> >>> I've nuked the nodes this happened on, but the job had been >> >>> running >> >>> >> >> for >> >>> >> >> >>> about 5-10 minutes across 5 nodes before this happened. Does >> >>> the >> >>> >> log >> >>> >> >> show a >> >>> >> >> >>> rebalance was happening? It looks to me like the standby >> task >> >>> was >> >>> >> just >> >>> >> >> >>> committing as part of normal operations. >> >>> >> >> >>> >> >>> >> >> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy < >> >>> damian....@gmail.com> >> >>> >> >> wrote: >> >>> >> >> >>> >> >>> >> >> >>>> Hi Greg, >> >>> >> >> >>>> >> >>> >> >> >>>> Obviously a bit difficult to read the RocksDBException, >> but my >> >>> >> guess >> >>> >> >> is >> >>> >> >> >>>> it >> >>> >> >> >>>> is because the state directory gets deleted right before >> the >> >>> flush >> >>> >> >> >>>> happens: >> >>> >> >> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO >> >>> >> >> [StreamThread-21:StateDirector >> >>> >> >> >>>> y@213] >> >>> >> >> >>>> - Deleting obsolete state directory 0_10 for task 0_10 >> >>> >> >> >>>> >> >>> >> >> >>>> Yes it looks like it is possibly the same bug as >> KAFKA-5070. >> >>> >> >> >>>> >> >>> >> >> >>>> It looks like your application is constantly rebalancing >> >>> during >> >>> >> store >> >>> >> >> >>>> intialization, which may be the reason this bug comes about >> >>> (there >> >>> >> >> is a >> >>> >> >> >>>> chance that the state dir lock is released so when the >> thread >> >>> >> tries >> >>> >> >> to >> >>> >> >> >>>> removes the stale state directory it is able to get the >> >>> lock). You >> >>> >> >> >>>> probably >> >>> >> >> >>>> want to configure `max.poll.interval.ms` to be a >> reasonably >> >>> large >> >>> >> >> >>>> value ( i >> >>> >> >> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can >> >>> also >> >>> >> try >> >>> >> >> >>>> setting `state.cleanup.delay.ms` to a higher value >> (default >> >>> is 10 >> >>> >> >> >>>> minutes), >> >>> >> >> >>>> to try and avoid it happening during a rebalance (I know >> this >> >>> >> isn't a >> >>> >> >> >>>> fix, >> >>> >> >> >>>> but will make it less likely to happen). >> >>> >> >> >>>> >> >>> >> >> >>>> Thanks, >> >>> >> >> >>>> Damian >> >>> >> >> >>>> >> >>> >> >> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> >> >>> wrote: >> >>> >> >> >>>> >> >>> >> >> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 >> to >> >>> use >> >>> >> >> Kafka >> >>> >> >> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an >> ETL >> >>> job >> >>> >> that >> >>> >> >> >>>> has 4 >> >>> >> >> >>>> > state stores and runs across a few hundred partitions, >> and >> >>> as >> >>> >> part >> >>> >> >> of >> >>> >> >> >>>> load >> >>> >> >> >>>> > testing the job we are trying to reload our data out of >> >>> kafka >> >>> >> into >> >>> >> >> a >> >>> >> >> >>>> test >> >>> >> >> >>>> > db. The result is we are able to load about 4M tuples and >> >>> then >> >>> >> this >> >>> >> >> >>>> error >> >>> >> >> >>>> > pops up on all of the stream nodes simultaneously. There >> >>> are 4 >> >>> >> >> rocksdb >> >>> >> >> >>>> > stores in question and there are lots of these errors >> which >> >>> >> takes >> >>> >> >> it >> >>> >> >> >>>> down. >> >>> >> >> >>>> > This bug *does* not seem to occur on 0.10.1. >> >>> >> >> >>>> > >> >>> >> >> >>>> > A similar error was mentioned here: >> >>> >> >> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070 >> >>> >> >> >>>> > >> >>> >> >> >>>> > Full log attached. >> >>> >> >> >>>> > >> >>> >> >> >>>> > org.apache.kafka.streams.errors.ProcessorStateException: >> >>> task >> >>> >> >> [0_10] >> >>> >> >> >>>> > Failed to flush state store session-id-start-events >> >>> >> >> >>>> > at >> >>> >> >> >>>> > org.apache.kafka.streams.proce >> ssor.internals.ProcessorStateM >> >>> >> >> >>>> anager.flush(ProcessorStateManager.java:337) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > >> >>> org.apache.kafka.streams.processor.internals.StandbyTask.com >> >>> >> >> >>>> mit(StandbyTask.java:94) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > >> >>> org.apache.kafka.streams.processor.internals.StreamThread.co >> >>> >> >> >>>> mmitOne(StreamThread.java:807) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > >> >>> org.apache.kafka.streams.processor.internals.StreamThread.co >> >>> >> >> >>>> mmitAll(StreamThread.java:797) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > >> >>> org.apache.kafka.streams.processor.internals.StreamThread.ma >> >>> >> >> >>>> ybeCommit(StreamThread.java:769) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > >> >>> org.apache.kafka.streams.processor.internals.StreamThread.ru >> >>> >> >> >>>> nLoop(StreamThread.java:647) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > >> >>> org.apache.kafka.streams.processor.internals.StreamThread.ru >> >>> >> >> >>>> n(StreamThread.java:361) >> >>> >> >> >>>> > Caused by: org.apache.kafka.streams.error >> >>> >> s.ProcessorStateException: >> >>> >> >> >>>> Error >> >>> >> >> >>>> > while executing flush from store session-id-start-events >> >>> >> >> >>>> > at >> >>> >> >> >>>> > org.apache.kafka.streams.state >> .internals.RocksDBStore.flushI >> >>> >> >> >>>> nternal(RocksDBStore.java:354) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > org.apache.kafka.streams.state >> .internals.RocksDBStore.flush( >> >>> >> >> >>>> RocksDBStore.java:345) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > org.apache.kafka.streams.state >> .internals.WrappedStateStore$A >> >>> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > org.apache.kafka.streams.state >> .internals.WrappedStateStore$A >> >>> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > org.apache.kafka.streams.state >> .internals.MeteredKeyValueStor >> >>> >> >> >>>> e$6.run(MeteredKeyValueStore.java:92) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > org.apache.kafka.streams.proce >> ssor.internals.StreamsMetricsI >> >>> >> >> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > org.apache.kafka.streams.state >> .internals.MeteredKeyValueStor >> >>> >> >> >>>> e.flush(MeteredKeyValueStore.java:186) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > org.apache.kafka.streams.proce >> ssor.internals.ProcessorStateM >> >>> >> >> >>>> anager.flush(ProcessorStateManager.java:335) >> >>> >> >> >>>> > ... 6 more >> >>> >> >> >>>> > Caused by: org.rocksdb.RocksDBException: v >> >>> >> >> >>>> > at org.rocksdb.RocksDB.flush(Native Method) >> >>> >> >> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) >> >>> >> >> >>>> > at >> >>> >> >> >>>> > org.apache.kafka.streams.state >> .internals.RocksDBStore.flushI >> >>> >> >> >>>> nternal(RocksDBStore.java:352) >> >>> >> >> >>>> > ... 13 more >> >>> >> >> >>>> > >> >>> >> >> >>>> > >> >>> >> >> >>>> >> >>> >> >> >>> >> >>> >> >> >>> >> >>> >> >> >> >> >>> >> >> > >> >>> >> >> >> >>> >> > >> >>> >> >> >>> > >> >>> > >> >>> >> >> >> > >