I can report that setting state.cleanup.delay.ms to a very large value
(effectively disabling it) works around the issue. It seems that the state
store cleanup process can somehow get out ahead of another task that still
thinks it should be writing to the state store/flushing it. In my test
runs, this does not seem to be happening during a rebalancing event, but
after the cluster is stable.

On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com> wrote:

> Upon another run, I see the same error occur during a rebalance, so either
> my log was showing a rebalance or there is a shared underlying issue with
> state stores.
>
> On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com> wrote:
>
>> Also, I am on 0.10.2.1, so poll interval was already set to MAX_VALUE.
>>
>> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com> wrote:
>>
>>> I've nuked the nodes this happened on, but the job had been running for
>>> about 5-10 minutes across 5 nodes before this happened. Does the log show a
>>> rebalance was happening? It looks to me like the standby task was just
>>> committing as part of normal operations.
>>>
>>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian....@gmail.com> wrote:
>>>
>>>> Hi Greg,
>>>>
>>>> Obviously a bit difficult to read the RocksDBException, but my guess is
>>>> it
>>>> is because the state directory gets deleted right before the flush
>>>> happens:
>>>> 2017-07-04 10:54:46,829 [myid:] - INFO  [StreamThread-21:StateDirector
>>>> y@213]
>>>> - Deleting obsolete state directory 0_10 for task 0_10
>>>>
>>>> Yes it looks like it is possibly the same bug as KAFKA-5070.
>>>>
>>>> It looks like your application is constantly rebalancing during store
>>>> intialization, which may be the reason this bug comes about (there is a
>>>> chance that the state dir lock is released so when the thread tries to
>>>> removes the stale state directory it is able to get the lock). You
>>>> probably
>>>> want to configure `max.poll.interval.ms` to be a reasonably large
>>>> value ( i
>>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try
>>>> setting `state.cleanup.delay.ms` to a higher value (default is 10
>>>> minutes),
>>>> to try and avoid it happening during a rebalance (I know this isn't a
>>>> fix,
>>>> but will make it less likely to happen).
>>>>
>>>> Thanks,
>>>> Damian
>>>>
>>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote:
>>>>
>>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to use Kafka
>>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that
>>>> has 4
>>>> > state stores and runs across a few hundred partitions, and as part of
>>>> load
>>>> > testing the job we are trying to reload our data out of kafka into a
>>>> test
>>>> > db. The result is we are able to load about 4M tuples and then this
>>>> error
>>>> > pops up on all of the stream nodes simultaneously. There are 4 rocksdb
>>>> > stores in question and there are lots of these errors which takes it
>>>> down.
>>>> > This bug *does* not seem to occur on 0.10.1.
>>>> >
>>>> > A similar error was mentioned here:
>>>> > https://issues.apache.org/jira/browse/KAFKA-5070
>>>> >
>>>> > Full log attached.
>>>> >
>>>> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_10]
>>>> > Failed to flush state store session-id-start-events
>>>> > at
>>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>>>> anager.flush(ProcessorStateManager.java:337)
>>>> > at
>>>> > org.apache.kafka.streams.processor.internals.StandbyTask.com
>>>> mit(StandbyTask.java:94)
>>>> > at
>>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
>>>> mmitOne(StreamThread.java:807)
>>>> > at
>>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
>>>> mmitAll(StreamThread.java:797)
>>>> > at
>>>> > org.apache.kafka.streams.processor.internals.StreamThread.ma
>>>> ybeCommit(StreamThread.java:769)
>>>> > at
>>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
>>>> nLoop(StreamThread.java:647)
>>>> > at
>>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
>>>> n(StreamThread.java:361)
>>>> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
>>>> Error
>>>> > while executing flush from store session-id-start-events
>>>> > at
>>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>>>> nternal(RocksDBStore.java:354)
>>>> > at
>>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush(
>>>> RocksDBStore.java:345)
>>>> > at
>>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>>>> > at
>>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>>>> > at
>>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>>>> e$6.run(MeteredKeyValueStore.java:92)
>>>> > at
>>>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>>> > at
>>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>>>> e.flush(MeteredKeyValueStore.java:186)
>>>> > at
>>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>>>> anager.flush(ProcessorStateManager.java:335)
>>>> > ... 6 more
>>>> > Caused by: org.rocksdb.RocksDBException: v
>>>> > at org.rocksdb.RocksDB.flush(Native Method)
>>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
>>>> > at
>>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>>>> nternal(RocksDBStore.java:352)
>>>> > ... 13 more
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to