Upon another run, I see the same error occur during a rebalance, so either
my log was showing a rebalance or there is a shared underlying issue with
state stores.

On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com> wrote:

> Also, I am on 0.10.2.1, so poll interval was already set to MAX_VALUE.
>
> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com> wrote:
>
>> I've nuked the nodes this happened on, but the job had been running for
>> about 5-10 minutes across 5 nodes before this happened. Does the log show a
>> rebalance was happening? It looks to me like the standby task was just
>> committing as part of normal operations.
>>
>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian....@gmail.com> wrote:
>>
>>> Hi Greg,
>>>
>>> Obviously a bit difficult to read the RocksDBException, but my guess is
>>> it
>>> is because the state directory gets deleted right before the flush
>>> happens:
>>> 2017-07-04 10:54:46,829 [myid:] - INFO  [StreamThread-21:StateDirector
>>> y@213]
>>> - Deleting obsolete state directory 0_10 for task 0_10
>>>
>>> Yes it looks like it is possibly the same bug as KAFKA-5070.
>>>
>>> It looks like your application is constantly rebalancing during store
>>> intialization, which may be the reason this bug comes about (there is a
>>> chance that the state dir lock is released so when the thread tries to
>>> removes the stale state directory it is able to get the lock). You
>>> probably
>>> want to configure `max.poll.interval.ms` to be a reasonably large value
>>> ( i
>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try
>>> setting `state.cleanup.delay.ms` to a higher value (default is 10
>>> minutes),
>>> to try and avoid it happening during a rebalance (I know this isn't a
>>> fix,
>>> but will make it less likely to happen).
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote:
>>>
>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to use Kafka
>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that
>>> has 4
>>> > state stores and runs across a few hundred partitions, and as part of
>>> load
>>> > testing the job we are trying to reload our data out of kafka into a
>>> test
>>> > db. The result is we are able to load about 4M tuples and then this
>>> error
>>> > pops up on all of the stream nodes simultaneously. There are 4 rocksdb
>>> > stores in question and there are lots of these errors which takes it
>>> down.
>>> > This bug *does* not seem to occur on 0.10.1.
>>> >
>>> > A similar error was mentioned here:
>>> > https://issues.apache.org/jira/browse/KAFKA-5070
>>> >
>>> > Full log attached.
>>> >
>>> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_10]
>>> > Failed to flush state store session-id-start-events
>>> > at
>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>>> anager.flush(ProcessorStateManager.java:337)
>>> > at
>>> > org.apache.kafka.streams.processor.internals.StandbyTask.com
>>> mit(StandbyTask.java:94)
>>> > at
>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
>>> mmitOne(StreamThread.java:807)
>>> > at
>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
>>> mmitAll(StreamThread.java:797)
>>> > at
>>> > org.apache.kafka.streams.processor.internals.StreamThread.ma
>>> ybeCommit(StreamThread.java:769)
>>> > at
>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
>>> nLoop(StreamThread.java:647)
>>> > at
>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
>>> n(StreamThread.java:361)
>>> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
>>> Error
>>> > while executing flush from store session-id-start-events
>>> > at
>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>>> nternal(RocksDBStore.java:354)
>>> > at
>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush(
>>> RocksDBStore.java:345)
>>> > at
>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>>> > at
>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>>> > at
>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>>> e$6.run(MeteredKeyValueStore.java:92)
>>> > at
>>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>> > at
>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>>> e.flush(MeteredKeyValueStore.java:186)
>>> > at
>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>>> anager.flush(ProcessorStateManager.java:335)
>>> > ... 6 more
>>> > Caused by: org.rocksdb.RocksDBException: v
>>> > at org.rocksdb.RocksDB.flush(Native Method)
>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
>>> > at
>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>>> nternal(RocksDBStore.java:352)
>>> > ... 13 more
>>> >
>>> >
>>>
>>
>>
>

Reply via email to