Hi Greg,
I've been able to reproduce it by running multiple instances with standby
tasks and many threads. If i force some rebalances, then i see the failure.
Now to see if i can repro in a test.
I think it is probably the same issue as:
https://issues.apache.org/jira/browse/KAFKA-5070

On Thu, 6 Jul 2017 at 12:43 Damian Guy <damian....@gmail.com> wrote:

> Greg, what OS are you running on?
> Are you able to reproduce this in a test at all?
> For instance, based on what you described it would seem that i should be
> able to start a streams app, wait for it to be up and running, run the
> state dir cleanup, see it fail. However, i can't reproduce it.
>
> On Wed, 5 Jul 2017 at 23:23 Damian Guy <damian....@gmail.com> wrote:
>
>> Thanks Greg. I'll look into it more tomorrow. Just finding it difficult
>> to reproduce in a test.
>> Thanks for providing the sequence, gives me something to try and repo.
>> Appreciated.
>>
>> Thanks,
>> Damian
>> On Wed, 5 Jul 2017 at 19:57, Greg Fodor <gfo...@gmail.com> wrote:
>>
>>> Also, the sequence of events is:
>>>
>>> - Job starts, rebalance happens, things run along smoothly.
>>> - After 10 minutes (retrospectively) the cleanup task kicks on and
>>> removes
>>> some directories
>>> - Tasks immediately start failing when trying to flush their state stores
>>>
>>>
>>>
>>> On Wed, Jul 5, 2017 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote:
>>>
>>> > The issue I am hitting is not the directory locking issues we've seen
>>> in
>>> > the past. The issue seems to be, as you mentioned, that the state dir
>>> is
>>> > getting deleted by the store cleanup process, but there are still tasks
>>> > running that are trying to flush the state store. It seems more than a
>>> > little scary given that right now it seems either a) there are tasks
>>> > running that should have been re-assigned or b) the cleanup job is
>>> removing
>>> > state directories for currently running + assigned tasks (perhaps
>>> during a
>>> > rebalance there is a race condition?) I'm guessing there's probably a
>>> more
>>> > benign explanation, but that is what it looks like right now.
>>> >
>>> > On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian....@gmail.com>
>>> wrote:
>>> >
>>> >> BTW - i'm trying to reproduce it, but not having much luck so far...
>>> >>
>>> >> On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian....@gmail.com> wrote:
>>> >>
>>> >> > Thans for the updates Greg. There were some minor changes around
>>> this in
>>> >> > 0.11.0 to make it less likely to happen, but we've only ever seen
>>> the
>>> >> > locking fail in the event of a rebalance. When everything is running
>>> >> state
>>> >> > dirs shouldn't be deleted if they are being used as the lock will
>>> fail.
>>> >> >
>>> >> >
>>> >> > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote:
>>> >> >
>>> >> >> I can report that setting state.cleanup.delay.ms to a very large
>>> value
>>> >> >> (effectively disabling it) works around the issue. It seems that
>>> the
>>> >> state
>>> >> >> store cleanup process can somehow get out ahead of another task
>>> that
>>> >> still
>>> >> >> thinks it should be writing to the state store/flushing it. In my
>>> test
>>> >> >> runs, this does not seem to be happening during a rebalancing
>>> event,
>>> >> but
>>> >> >> after the cluster is stable.
>>> >> >>
>>> >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com>
>>> wrote:
>>> >> >>
>>> >> >> > Upon another run, I see the same error occur during a rebalance,
>>> so
>>> >> >> either
>>> >> >> > my log was showing a rebalance or there is a shared underlying
>>> issue
>>> >> >> with
>>> >> >> > state stores.
>>> >> >> >
>>> >> >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com>
>>> >> wrote:
>>> >> >> >
>>> >> >> >> Also, I am on 0.10.2.1, so poll interval was already set to
>>> >> MAX_VALUE.
>>> >> >> >>
>>> >> >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com>
>>> >> wrote:
>>> >> >> >>
>>> >> >> >>> I've nuked the nodes this happened on, but the job had been
>>> running
>>> >> >> for
>>> >> >> >>> about 5-10 minutes across 5 nodes before this happened. Does
>>> the
>>> >> log
>>> >> >> show a
>>> >> >> >>> rebalance was happening? It looks to me like the standby task
>>> was
>>> >> just
>>> >> >> >>> committing as part of normal operations.
>>> >> >> >>>
>>> >> >> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <
>>> damian....@gmail.com>
>>> >> >> wrote:
>>> >> >> >>>
>>> >> >> >>>> Hi Greg,
>>> >> >> >>>>
>>> >> >> >>>> Obviously a bit difficult to read the RocksDBException, but my
>>> >> guess
>>> >> >> is
>>> >> >> >>>> it
>>> >> >> >>>> is because the state directory gets deleted right before the
>>> flush
>>> >> >> >>>> happens:
>>> >> >> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO
>>> >> >> [StreamThread-21:StateDirector
>>> >> >> >>>> y@213]
>>> >> >> >>>> - Deleting obsolete state directory 0_10 for task 0_10
>>> >> >> >>>>
>>> >> >> >>>> Yes it looks like it is possibly the same bug as KAFKA-5070.
>>> >> >> >>>>
>>> >> >> >>>> It looks like your application is constantly rebalancing
>>> during
>>> >> store
>>> >> >> >>>> intialization, which may be the reason this bug comes about
>>> (there
>>> >> >> is a
>>> >> >> >>>> chance that the state dir lock is released so when the thread
>>> >> tries
>>> >> >> to
>>> >> >> >>>> removes the stale state directory it is able to get the
>>> lock). You
>>> >> >> >>>> probably
>>> >> >> >>>> want to configure `max.poll.interval.ms` to be a reasonably
>>> large
>>> >> >> >>>> value ( i
>>> >> >> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can
>>> also
>>> >> try
>>> >> >> >>>> setting `state.cleanup.delay.ms` to a higher value (default
>>> is 10
>>> >> >> >>>> minutes),
>>> >> >> >>>> to try and avoid it happening during a rebalance (I know this
>>> >> isn't a
>>> >> >> >>>> fix,
>>> >> >> >>>> but will make it less likely to happen).
>>> >> >> >>>>
>>> >> >> >>>> Thanks,
>>> >> >> >>>> Damian
>>> >> >> >>>>
>>> >> >> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com>
>>> wrote:
>>> >> >> >>>>
>>> >> >> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to
>>> use
>>> >> >> Kafka
>>> >> >> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL
>>> job
>>> >> that
>>> >> >> >>>> has 4
>>> >> >> >>>> > state stores and runs across a few hundred partitions, and
>>> as
>>> >> part
>>> >> >> of
>>> >> >> >>>> load
>>> >> >> >>>> > testing the job we are trying to reload our data out of
>>> kafka
>>> >> into
>>> >> >> a
>>> >> >> >>>> test
>>> >> >> >>>> > db. The result is we are able to load about 4M tuples and
>>> then
>>> >> this
>>> >> >> >>>> error
>>> >> >> >>>> > pops up on all of the stream nodes simultaneously. There
>>> are 4
>>> >> >> rocksdb
>>> >> >> >>>> > stores in question and there are lots of these errors which
>>> >> takes
>>> >> >> it
>>> >> >> >>>> down.
>>> >> >> >>>> > This bug *does* not seem to occur on 0.10.1.
>>> >> >> >>>> >
>>> >> >> >>>> > A similar error was mentioned here:
>>> >> >> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070
>>> >> >> >>>> >
>>> >> >> >>>> > Full log attached.
>>> >> >> >>>> >
>>> >> >> >>>> > org.apache.kafka.streams.errors.ProcessorStateException:
>>> task
>>> >> >> [0_10]
>>> >> >> >>>> > Failed to flush state store session-id-start-events
>>> >> >> >>>> > at
>>> >> >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>>> >> >> >>>> anager.flush(ProcessorStateManager.java:337)
>>> >> >> >>>> > at
>>> >> >> >>>> >
>>> org.apache.kafka.streams.processor.internals.StandbyTask.com
>>> >> >> >>>> mit(StandbyTask.java:94)
>>> >> >> >>>> > at
>>> >> >> >>>> >
>>> org.apache.kafka.streams.processor.internals.StreamThread.co
>>> >> >> >>>> mmitOne(StreamThread.java:807)
>>> >> >> >>>> > at
>>> >> >> >>>> >
>>> org.apache.kafka.streams.processor.internals.StreamThread.co
>>> >> >> >>>> mmitAll(StreamThread.java:797)
>>> >> >> >>>> > at
>>> >> >> >>>> >
>>> org.apache.kafka.streams.processor.internals.StreamThread.ma
>>> >> >> >>>> ybeCommit(StreamThread.java:769)
>>> >> >> >>>> > at
>>> >> >> >>>> >
>>> org.apache.kafka.streams.processor.internals.StreamThread.ru
>>> >> >> >>>> nLoop(StreamThread.java:647)
>>> >> >> >>>> > at
>>> >> >> >>>> >
>>> org.apache.kafka.streams.processor.internals.StreamThread.ru
>>> >> >> >>>> n(StreamThread.java:361)
>>> >> >> >>>> > Caused by: org.apache.kafka.streams.error
>>> >> s.ProcessorStateException:
>>> >> >> >>>> Error
>>> >> >> >>>> > while executing flush from store session-id-start-events
>>> >> >> >>>> > at
>>> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>>> >> >> >>>> nternal(RocksDBStore.java:354)
>>> >> >> >>>> > at
>>> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush(
>>> >> >> >>>> RocksDBStore.java:345)
>>> >> >> >>>> > at
>>> >> >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>>> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>>> >> >> >>>> > at
>>> >> >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>>> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>>> >> >> >>>> > at
>>> >> >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>>> >> >> >>>> e$6.run(MeteredKeyValueStore.java:92)
>>> >> >> >>>> > at
>>> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>>> >> >> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>> >> >> >>>> > at
>>> >> >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>>> >> >> >>>> e.flush(MeteredKeyValueStore.java:186)
>>> >> >> >>>> > at
>>> >> >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>>> >> >> >>>> anager.flush(ProcessorStateManager.java:335)
>>> >> >> >>>> > ... 6 more
>>> >> >> >>>> > Caused by: org.rocksdb.RocksDBException: v
>>> >> >> >>>> > at org.rocksdb.RocksDB.flush(Native Method)
>>> >> >> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
>>> >> >> >>>> > at
>>> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>>> >> >> >>>> nternal(RocksDBStore.java:352)
>>> >> >> >>>> > ... 13 more
>>> >> >> >>>> >
>>> >> >> >>>> >
>>> >> >> >>>>
>>> >> >> >>>
>>> >> >> >>>
>>> >> >> >>
>>> >> >> >
>>> >> >>
>>> >> >
>>> >>
>>> >
>>> >
>>>
>>

Reply via email to