Hi Greg,

Obviously a bit difficult to read the RocksDBException, but my guess is it
is because the state directory gets deleted right before the flush happens:
2017-07-04 10:54:46,829 [myid:] - INFO  [StreamThread-21:StateDirectory@213]
- Deleting obsolete state directory 0_10 for task 0_10

Yes it looks like it is possibly the same bug as KAFKA-5070.

It looks like your application is constantly rebalancing during store
intialization, which may be the reason this bug comes about (there is a
chance that the state dir lock is released so when the thread tries to
removes the stale state directory it is able to get the lock). You probably
want to configure `max.poll.interval.ms` to be a reasonably large value ( i
think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try
setting `state.cleanup.delay.ms` to a higher value (default is 10 minutes),
to try and avoid it happening during a rebalance (I know this isn't a fix,
but will make it less likely to happen).

Thanks,
Damian

On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote:

> Hi all, we are working on upgrading our jobs from 0.10.0 to use Kafka
> Streams 0.10.2.1 and are hitting a problem. We have an ETL job that has 4
> state stores and runs across a few hundred partitions, and as part of load
> testing the job we are trying to reload our data out of kafka into a test
> db. The result is we are able to load about 4M tuples and then this error
> pops up on all of the stream nodes simultaneously. There are 4 rocksdb
> stores in question and there are lots of these errors which takes it down.
> This bug *does* not seem to occur on 0.10.1.
>
> A similar error was mentioned here:
> https://issues.apache.org/jira/browse/KAFKA-5070
>
> Full log attached.
>
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_10]
> Failed to flush state store session-id-start-events
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
> at
> org.apache.kafka.streams.processor.internals.StandbyTask.commit(StandbyTask.java:94)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:797)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
> while executing flush from store session-id-start-events
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
> ... 6 more
> Caused by: org.rocksdb.RocksDBException: v
> at org.rocksdb.RocksDB.flush(Native Method)
> at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
> ... 13 more
>
>

Reply via email to