That's great news, thanks! On Thu, Jul 6, 2017 at 6:18 AM, Damian Guy <damian....@gmail.com> wrote:
> Hi Greg, > I've been able to reproduce it by running multiple instances with standby > tasks and many threads. If i force some rebalances, then i see the failure. > Now to see if i can repro in a test. > I think it is probably the same issue as: > https://issues.apache.org/jira/browse/KAFKA-5070 > > On Thu, 6 Jul 2017 at 12:43 Damian Guy <damian....@gmail.com> wrote: > > > Greg, what OS are you running on? > > Are you able to reproduce this in a test at all? > > For instance, based on what you described it would seem that i should be > > able to start a streams app, wait for it to be up and running, run the > > state dir cleanup, see it fail. However, i can't reproduce it. > > > > On Wed, 5 Jul 2017 at 23:23 Damian Guy <damian....@gmail.com> wrote: > > > >> Thanks Greg. I'll look into it more tomorrow. Just finding it difficult > >> to reproduce in a test. > >> Thanks for providing the sequence, gives me something to try and repo. > >> Appreciated. > >> > >> Thanks, > >> Damian > >> On Wed, 5 Jul 2017 at 19:57, Greg Fodor <gfo...@gmail.com> wrote: > >> > >>> Also, the sequence of events is: > >>> > >>> - Job starts, rebalance happens, things run along smoothly. > >>> - After 10 minutes (retrospectively) the cleanup task kicks on and > >>> removes > >>> some directories > >>> - Tasks immediately start failing when trying to flush their state > stores > >>> > >>> > >>> > >>> On Wed, Jul 5, 2017 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote: > >>> > >>> > The issue I am hitting is not the directory locking issues we've seen > >>> in > >>> > the past. The issue seems to be, as you mentioned, that the state dir > >>> is > >>> > getting deleted by the store cleanup process, but there are still > tasks > >>> > running that are trying to flush the state store. It seems more than > a > >>> > little scary given that right now it seems either a) there are tasks > >>> > running that should have been re-assigned or b) the cleanup job is > >>> removing > >>> > state directories for currently running + assigned tasks (perhaps > >>> during a > >>> > rebalance there is a race condition?) I'm guessing there's probably a > >>> more > >>> > benign explanation, but that is what it looks like right now. > >>> > > >>> > On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian....@gmail.com> > >>> wrote: > >>> > > >>> >> BTW - i'm trying to reproduce it, but not having much luck so far... > >>> >> > >>> >> On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian....@gmail.com> > wrote: > >>> >> > >>> >> > Thans for the updates Greg. There were some minor changes around > >>> this in > >>> >> > 0.11.0 to make it less likely to happen, but we've only ever seen > >>> the > >>> >> > locking fail in the event of a rebalance. When everything is > running > >>> >> state > >>> >> > dirs shouldn't be deleted if they are being used as the lock will > >>> fail. > >>> >> > > >>> >> > > >>> >> > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote: > >>> >> > > >>> >> >> I can report that setting state.cleanup.delay.ms to a very large > >>> value > >>> >> >> (effectively disabling it) works around the issue. It seems that > >>> the > >>> >> state > >>> >> >> store cleanup process can somehow get out ahead of another task > >>> that > >>> >> still > >>> >> >> thinks it should be writing to the state store/flushing it. In my > >>> test > >>> >> >> runs, this does not seem to be happening during a rebalancing > >>> event, > >>> >> but > >>> >> >> after the cluster is stable. > >>> >> >> > >>> >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com> > >>> wrote: > >>> >> >> > >>> >> >> > Upon another run, I see the same error occur during a > rebalance, > >>> so > >>> >> >> either > >>> >> >> > my log was showing a rebalance or there is a shared underlying > >>> issue > >>> >> >> with > >>> >> >> > state stores. > >>> >> >> > > >>> >> >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com> > >>> >> wrote: > >>> >> >> > > >>> >> >> >> Also, I am on 0.10.2.1, so poll interval was already set to > >>> >> MAX_VALUE. > >>> >> >> >> > >>> >> >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com > > > >>> >> wrote: > >>> >> >> >> > >>> >> >> >>> I've nuked the nodes this happened on, but the job had been > >>> running > >>> >> >> for > >>> >> >> >>> about 5-10 minutes across 5 nodes before this happened. Does > >>> the > >>> >> log > >>> >> >> show a > >>> >> >> >>> rebalance was happening? It looks to me like the standby task > >>> was > >>> >> just > >>> >> >> >>> committing as part of normal operations. > >>> >> >> >>> > >>> >> >> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy < > >>> damian....@gmail.com> > >>> >> >> wrote: > >>> >> >> >>> > >>> >> >> >>>> Hi Greg, > >>> >> >> >>>> > >>> >> >> >>>> Obviously a bit difficult to read the RocksDBException, but > my > >>> >> guess > >>> >> >> is > >>> >> >> >>>> it > >>> >> >> >>>> is because the state directory gets deleted right before the > >>> flush > >>> >> >> >>>> happens: > >>> >> >> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO > >>> >> >> [StreamThread-21:StateDirector > >>> >> >> >>>> y@213] > >>> >> >> >>>> - Deleting obsolete state directory 0_10 for task 0_10 > >>> >> >> >>>> > >>> >> >> >>>> Yes it looks like it is possibly the same bug as KAFKA-5070. > >>> >> >> >>>> > >>> >> >> >>>> It looks like your application is constantly rebalancing > >>> during > >>> >> store > >>> >> >> >>>> intialization, which may be the reason this bug comes about > >>> (there > >>> >> >> is a > >>> >> >> >>>> chance that the state dir lock is released so when the > thread > >>> >> tries > >>> >> >> to > >>> >> >> >>>> removes the stale state directory it is able to get the > >>> lock). You > >>> >> >> >>>> probably > >>> >> >> >>>> want to configure `max.poll.interval.ms` to be a reasonably > >>> large > >>> >> >> >>>> value ( i > >>> >> >> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can > >>> also > >>> >> try > >>> >> >> >>>> setting `state.cleanup.delay.ms` to a higher value (default > >>> is 10 > >>> >> >> >>>> minutes), > >>> >> >> >>>> to try and avoid it happening during a rebalance (I know > this > >>> >> isn't a > >>> >> >> >>>> fix, > >>> >> >> >>>> but will make it less likely to happen). > >>> >> >> >>>> > >>> >> >> >>>> Thanks, > >>> >> >> >>>> Damian > >>> >> >> >>>> > >>> >> >> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> > >>> wrote: > >>> >> >> >>>> > >>> >> >> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 > to > >>> use > >>> >> >> Kafka > >>> >> >> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL > >>> job > >>> >> that > >>> >> >> >>>> has 4 > >>> >> >> >>>> > state stores and runs across a few hundred partitions, and > >>> as > >>> >> part > >>> >> >> of > >>> >> >> >>>> load > >>> >> >> >>>> > testing the job we are trying to reload our data out of > >>> kafka > >>> >> into > >>> >> >> a > >>> >> >> >>>> test > >>> >> >> >>>> > db. The result is we are able to load about 4M tuples and > >>> then > >>> >> this > >>> >> >> >>>> error > >>> >> >> >>>> > pops up on all of the stream nodes simultaneously. There > >>> are 4 > >>> >> >> rocksdb > >>> >> >> >>>> > stores in question and there are lots of these errors > which > >>> >> takes > >>> >> >> it > >>> >> >> >>>> down. > >>> >> >> >>>> > This bug *does* not seem to occur on 0.10.1. > >>> >> >> >>>> > > >>> >> >> >>>> > A similar error was mentioned here: > >>> >> >> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070 > >>> >> >> >>>> > > >>> >> >> >>>> > Full log attached. > >>> >> >> >>>> > > >>> >> >> >>>> > org.apache.kafka.streams.errors.ProcessorStateException: > >>> task > >>> >> >> [0_10] > >>> >> >> >>>> > Failed to flush state store session-id-start-events > >>> >> >> >>>> > at > >>> >> >> >>>> > org.apache.kafka.streams.processor.internals. > ProcessorStateM > >>> >> >> >>>> anager.flush(ProcessorStateManager.java:337) > >>> >> >> >>>> > at > >>> >> >> >>>> > > >>> org.apache.kafka.streams.processor.internals.StandbyTask.com > >>> >> >> >>>> mit(StandbyTask.java:94) > >>> >> >> >>>> > at > >>> >> >> >>>> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.co > >>> >> >> >>>> mmitOne(StreamThread.java:807) > >>> >> >> >>>> > at > >>> >> >> >>>> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.co > >>> >> >> >>>> mmitAll(StreamThread.java:797) > >>> >> >> >>>> > at > >>> >> >> >>>> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.ma > >>> >> >> >>>> ybeCommit(StreamThread.java:769) > >>> >> >> >>>> > at > >>> >> >> >>>> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.ru > >>> >> >> >>>> nLoop(StreamThread.java:647) > >>> >> >> >>>> > at > >>> >> >> >>>> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.ru > >>> >> >> >>>> n(StreamThread.java:361) > >>> >> >> >>>> > Caused by: org.apache.kafka.streams.error > >>> >> s.ProcessorStateException: > >>> >> >> >>>> Error > >>> >> >> >>>> > while executing flush from store session-id-start-events > >>> >> >> >>>> > at > >>> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore. > flushI > >>> >> >> >>>> nternal(RocksDBStore.java:354) > >>> >> >> >>>> > at > >>> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore. > flush( > >>> >> >> >>>> RocksDBStore.java:345) > >>> >> >> >>>> > at > >>> >> >> >>>> > org.apache.kafka.streams.state.internals. > WrappedStateStore$A > >>> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) > >>> >> >> >>>> > at > >>> >> >> >>>> > org.apache.kafka.streams.state.internals. > WrappedStateStore$A > >>> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) > >>> >> >> >>>> > at > >>> >> >> >>>> > org.apache.kafka.streams.state.internals. > MeteredKeyValueStor > >>> >> >> >>>> e$6.run(MeteredKeyValueStore.java:92) > >>> >> >> >>>> > at > >>> >> >> >>>> > org.apache.kafka.streams.processor.internals. > StreamsMetricsI > >>> >> >> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) > >>> >> >> >>>> > at > >>> >> >> >>>> > org.apache.kafka.streams.state.internals. > MeteredKeyValueStor > >>> >> >> >>>> e.flush(MeteredKeyValueStore.java:186) > >>> >> >> >>>> > at > >>> >> >> >>>> > org.apache.kafka.streams.processor.internals. > ProcessorStateM > >>> >> >> >>>> anager.flush(ProcessorStateManager.java:335) > >>> >> >> >>>> > ... 6 more > >>> >> >> >>>> > Caused by: org.rocksdb.RocksDBException: v > >>> >> >> >>>> > at org.rocksdb.RocksDB.flush(Native Method) > >>> >> >> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) > >>> >> >> >>>> > at > >>> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore. > flushI > >>> >> >> >>>> nternal(RocksDBStore.java:352) > >>> >> >> >>>> > ... 13 more > >>> >> >> >>>> > > >>> >> >> >>>> > > >>> >> >> >>>> > >>> >> >> >>> > >>> >> >> >>> > >>> >> >> >> > >>> >> >> > > >>> >> >> > >>> >> > > >>> >> > >>> > > >>> > > >>> > >> >