Also sorry, to clarify the job context:

- This is a job running across 5 nodes on AWS Linux
- It is under load with a large number of partitions: approximately 700-800
topic-partitions assignments in total for the entire job. Topics involved
have large # of partitions, 128 each.
- 32 stream threads per host.
- Peak TPS seems to be approximately 5k-10k tuples/sec per node. We're
reprocessing historical data in kafka.

On Thu, Jul 6, 2017 at 10:45 AM, Greg Fodor <gfo...@gmail.com> wrote:

> That's great news, thanks!
>
> On Thu, Jul 6, 2017 at 6:18 AM, Damian Guy <damian....@gmail.com> wrote:
>
>> Hi Greg,
>> I've been able to reproduce it by running multiple instances with standby
>> tasks and many threads. If i force some rebalances, then i see the
>> failure.
>> Now to see if i can repro in a test.
>> I think it is probably the same issue as:
>> https://issues.apache.org/jira/browse/KAFKA-5070
>>
>> On Thu, 6 Jul 2017 at 12:43 Damian Guy <damian....@gmail.com> wrote:
>>
>> > Greg, what OS are you running on?
>> > Are you able to reproduce this in a test at all?
>> > For instance, based on what you described it would seem that i should be
>> > able to start a streams app, wait for it to be up and running, run the
>> > state dir cleanup, see it fail. However, i can't reproduce it.
>> >
>> > On Wed, 5 Jul 2017 at 23:23 Damian Guy <damian....@gmail.com> wrote:
>> >
>> >> Thanks Greg. I'll look into it more tomorrow. Just finding it difficult
>> >> to reproduce in a test.
>> >> Thanks for providing the sequence, gives me something to try and repo.
>> >> Appreciated.
>> >>
>> >> Thanks,
>> >> Damian
>> >> On Wed, 5 Jul 2017 at 19:57, Greg Fodor <gfo...@gmail.com> wrote:
>> >>
>> >>> Also, the sequence of events is:
>> >>>
>> >>> - Job starts, rebalance happens, things run along smoothly.
>> >>> - After 10 minutes (retrospectively) the cleanup task kicks on and
>> >>> removes
>> >>> some directories
>> >>> - Tasks immediately start failing when trying to flush their state
>> stores
>> >>>
>> >>>
>> >>>
>> >>> On Wed, Jul 5, 2017 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote:
>> >>>
>> >>> > The issue I am hitting is not the directory locking issues we've
>> seen
>> >>> in
>> >>> > the past. The issue seems to be, as you mentioned, that the state
>> dir
>> >>> is
>> >>> > getting deleted by the store cleanup process, but there are still
>> tasks
>> >>> > running that are trying to flush the state store. It seems more
>> than a
>> >>> > little scary given that right now it seems either a) there are tasks
>> >>> > running that should have been re-assigned or b) the cleanup job is
>> >>> removing
>> >>> > state directories for currently running + assigned tasks (perhaps
>> >>> during a
>> >>> > rebalance there is a race condition?) I'm guessing there's probably
>> a
>> >>> more
>> >>> > benign explanation, but that is what it looks like right now.
>> >>> >
>> >>> > On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian....@gmail.com>
>> >>> wrote:
>> >>> >
>> >>> >> BTW - i'm trying to reproduce it, but not having much luck so
>> far...
>> >>> >>
>> >>> >> On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian....@gmail.com>
>> wrote:
>> >>> >>
>> >>> >> > Thans for the updates Greg. There were some minor changes around
>> >>> this in
>> >>> >> > 0.11.0 to make it less likely to happen, but we've only ever seen
>> >>> the
>> >>> >> > locking fail in the event of a rebalance. When everything is
>> running
>> >>> >> state
>> >>> >> > dirs shouldn't be deleted if they are being used as the lock will
>> >>> fail.
>> >>> >> >
>> >>> >> >
>> >>> >> > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote:
>> >>> >> >
>> >>> >> >> I can report that setting state.cleanup.delay.ms to a very
>> large
>> >>> value
>> >>> >> >> (effectively disabling it) works around the issue. It seems that
>> >>> the
>> >>> >> state
>> >>> >> >> store cleanup process can somehow get out ahead of another task
>> >>> that
>> >>> >> still
>> >>> >> >> thinks it should be writing to the state store/flushing it. In
>> my
>> >>> test
>> >>> >> >> runs, this does not seem to be happening during a rebalancing
>> >>> event,
>> >>> >> but
>> >>> >> >> after the cluster is stable.
>> >>> >> >>
>> >>> >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com>
>> >>> wrote:
>> >>> >> >>
>> >>> >> >> > Upon another run, I see the same error occur during a
>> rebalance,
>> >>> so
>> >>> >> >> either
>> >>> >> >> > my log was showing a rebalance or there is a shared underlying
>> >>> issue
>> >>> >> >> with
>> >>> >> >> > state stores.
>> >>> >> >> >
>> >>> >> >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com
>> >
>> >>> >> wrote:
>> >>> >> >> >
>> >>> >> >> >> Also, I am on 0.10.2.1, so poll interval was already set to
>> >>> >> MAX_VALUE.
>> >>> >> >> >>
>> >>> >> >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <
>> gfo...@gmail.com>
>> >>> >> wrote:
>> >>> >> >> >>
>> >>> >> >> >>> I've nuked the nodes this happened on, but the job had been
>> >>> running
>> >>> >> >> for
>> >>> >> >> >>> about 5-10 minutes across 5 nodes before this happened. Does
>> >>> the
>> >>> >> log
>> >>> >> >> show a
>> >>> >> >> >>> rebalance was happening? It looks to me like the standby
>> task
>> >>> was
>> >>> >> just
>> >>> >> >> >>> committing as part of normal operations.
>> >>> >> >> >>>
>> >>> >> >> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <
>> >>> damian....@gmail.com>
>> >>> >> >> wrote:
>> >>> >> >> >>>
>> >>> >> >> >>>> Hi Greg,
>> >>> >> >> >>>>
>> >>> >> >> >>>> Obviously a bit difficult to read the RocksDBException,
>> but my
>> >>> >> guess
>> >>> >> >> is
>> >>> >> >> >>>> it
>> >>> >> >> >>>> is because the state directory gets deleted right before
>> the
>> >>> flush
>> >>> >> >> >>>> happens:
>> >>> >> >> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO
>> >>> >> >> [StreamThread-21:StateDirector
>> >>> >> >> >>>> y@213]
>> >>> >> >> >>>> - Deleting obsolete state directory 0_10 for task 0_10
>> >>> >> >> >>>>
>> >>> >> >> >>>> Yes it looks like it is possibly the same bug as
>> KAFKA-5070.
>> >>> >> >> >>>>
>> >>> >> >> >>>> It looks like your application is constantly rebalancing
>> >>> during
>> >>> >> store
>> >>> >> >> >>>> intialization, which may be the reason this bug comes about
>> >>> (there
>> >>> >> >> is a
>> >>> >> >> >>>> chance that the state dir lock is released so when the
>> thread
>> >>> >> tries
>> >>> >> >> to
>> >>> >> >> >>>> removes the stale state directory it is able to get the
>> >>> lock). You
>> >>> >> >> >>>> probably
>> >>> >> >> >>>> want to configure `max.poll.interval.ms` to be a
>> reasonably
>> >>> large
>> >>> >> >> >>>> value ( i
>> >>> >> >> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can
>> >>> also
>> >>> >> try
>> >>> >> >> >>>> setting `state.cleanup.delay.ms` to a higher value
>> (default
>> >>> is 10
>> >>> >> >> >>>> minutes),
>> >>> >> >> >>>> to try and avoid it happening during a rebalance (I know
>> this
>> >>> >> isn't a
>> >>> >> >> >>>> fix,
>> >>> >> >> >>>> but will make it less likely to happen).
>> >>> >> >> >>>>
>> >>> >> >> >>>> Thanks,
>> >>> >> >> >>>> Damian
>> >>> >> >> >>>>
>> >>> >> >> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com>
>> >>> wrote:
>> >>> >> >> >>>>
>> >>> >> >> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0
>> to
>> >>> use
>> >>> >> >> Kafka
>> >>> >> >> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an
>> ETL
>> >>> job
>> >>> >> that
>> >>> >> >> >>>> has 4
>> >>> >> >> >>>> > state stores and runs across a few hundred partitions,
>> and
>> >>> as
>> >>> >> part
>> >>> >> >> of
>> >>> >> >> >>>> load
>> >>> >> >> >>>> > testing the job we are trying to reload our data out of
>> >>> kafka
>> >>> >> into
>> >>> >> >> a
>> >>> >> >> >>>> test
>> >>> >> >> >>>> > db. The result is we are able to load about 4M tuples and
>> >>> then
>> >>> >> this
>> >>> >> >> >>>> error
>> >>> >> >> >>>> > pops up on all of the stream nodes simultaneously. There
>> >>> are 4
>> >>> >> >> rocksdb
>> >>> >> >> >>>> > stores in question and there are lots of these errors
>> which
>> >>> >> takes
>> >>> >> >> it
>> >>> >> >> >>>> down.
>> >>> >> >> >>>> > This bug *does* not seem to occur on 0.10.1.
>> >>> >> >> >>>> >
>> >>> >> >> >>>> > A similar error was mentioned here:
>> >>> >> >> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070
>> >>> >> >> >>>> >
>> >>> >> >> >>>> > Full log attached.
>> >>> >> >> >>>> >
>> >>> >> >> >>>> > org.apache.kafka.streams.errors.ProcessorStateException:
>> >>> task
>> >>> >> >> [0_10]
>> >>> >> >> >>>> > Failed to flush state store session-id-start-events
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> > org.apache.kafka.streams.proce
>> ssor.internals.ProcessorStateM
>> >>> >> >> >>>> anager.flush(ProcessorStateManager.java:337)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> >
>> >>> org.apache.kafka.streams.processor.internals.StandbyTask.com
>> >>> >> >> >>>> mit(StandbyTask.java:94)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> >
>> >>> org.apache.kafka.streams.processor.internals.StreamThread.co
>> >>> >> >> >>>> mmitOne(StreamThread.java:807)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> >
>> >>> org.apache.kafka.streams.processor.internals.StreamThread.co
>> >>> >> >> >>>> mmitAll(StreamThread.java:797)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> >
>> >>> org.apache.kafka.streams.processor.internals.StreamThread.ma
>> >>> >> >> >>>> ybeCommit(StreamThread.java:769)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> >
>> >>> org.apache.kafka.streams.processor.internals.StreamThread.ru
>> >>> >> >> >>>> nLoop(StreamThread.java:647)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> >
>> >>> org.apache.kafka.streams.processor.internals.StreamThread.ru
>> >>> >> >> >>>> n(StreamThread.java:361)
>> >>> >> >> >>>> > Caused by: org.apache.kafka.streams.error
>> >>> >> s.ProcessorStateException:
>> >>> >> >> >>>> Error
>> >>> >> >> >>>> > while executing flush from store session-id-start-events
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> > org.apache.kafka.streams.state
>> .internals.RocksDBStore.flushI
>> >>> >> >> >>>> nternal(RocksDBStore.java:354)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> > org.apache.kafka.streams.state
>> .internals.RocksDBStore.flush(
>> >>> >> >> >>>> RocksDBStore.java:345)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> > org.apache.kafka.streams.state
>> .internals.WrappedStateStore$A
>> >>> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> > org.apache.kafka.streams.state
>> .internals.WrappedStateStore$A
>> >>> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> > org.apache.kafka.streams.state
>> .internals.MeteredKeyValueStor
>> >>> >> >> >>>> e$6.run(MeteredKeyValueStore.java:92)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> > org.apache.kafka.streams.proce
>> ssor.internals.StreamsMetricsI
>> >>> >> >> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> > org.apache.kafka.streams.state
>> .internals.MeteredKeyValueStor
>> >>> >> >> >>>> e.flush(MeteredKeyValueStore.java:186)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> > org.apache.kafka.streams.proce
>> ssor.internals.ProcessorStateM
>> >>> >> >> >>>> anager.flush(ProcessorStateManager.java:335)
>> >>> >> >> >>>> > ... 6 more
>> >>> >> >> >>>> > Caused by: org.rocksdb.RocksDBException: v
>> >>> >> >> >>>> > at org.rocksdb.RocksDB.flush(Native Method)
>> >>> >> >> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
>> >>> >> >> >>>> > at
>> >>> >> >> >>>> > org.apache.kafka.streams.state
>> .internals.RocksDBStore.flushI
>> >>> >> >> >>>> nternal(RocksDBStore.java:352)
>> >>> >> >> >>>> > ... 13 more
>> >>> >> >> >>>> >
>> >>> >> >> >>>> >
>> >>> >> >> >>>>
>> >>> >> >> >>>
>> >>> >> >> >>>
>> >>> >> >> >>
>> >>> >> >> >
>> >>> >> >>
>> >>> >> >
>> >>> >>
>> >>> >
>> >>> >
>> >>>
>> >>
>>
>
>

Reply via email to