Also, I am on 0.10.2.1, so poll interval was already set to MAX_VALUE. On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com> wrote:
> I've nuked the nodes this happened on, but the job had been running for > about 5-10 minutes across 5 nodes before this happened. Does the log show a > rebalance was happening? It looks to me like the standby task was just > committing as part of normal operations. > > On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian....@gmail.com> wrote: > >> Hi Greg, >> >> Obviously a bit difficult to read the RocksDBException, but my guess is it >> is because the state directory gets deleted right before the flush >> happens: >> 2017-07-04 10:54:46,829 [myid:] - INFO [StreamThread-21:StateDirector >> y@213] >> - Deleting obsolete state directory 0_10 for task 0_10 >> >> Yes it looks like it is possibly the same bug as KAFKA-5070. >> >> It looks like your application is constantly rebalancing during store >> intialization, which may be the reason this bug comes about (there is a >> chance that the state dir lock is released so when the thread tries to >> removes the stale state directory it is able to get the lock). You >> probably >> want to configure `max.poll.interval.ms` to be a reasonably large value >> ( i >> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try >> setting `state.cleanup.delay.ms` to a higher value (default is 10 >> minutes), >> to try and avoid it happening during a rebalance (I know this isn't a fix, >> but will make it less likely to happen). >> >> Thanks, >> Damian >> >> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote: >> >> > Hi all, we are working on upgrading our jobs from 0.10.0 to use Kafka >> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that has >> 4 >> > state stores and runs across a few hundred partitions, and as part of >> load >> > testing the job we are trying to reload our data out of kafka into a >> test >> > db. The result is we are able to load about 4M tuples and then this >> error >> > pops up on all of the stream nodes simultaneously. There are 4 rocksdb >> > stores in question and there are lots of these errors which takes it >> down. >> > This bug *does* not seem to occur on 0.10.1. >> > >> > A similar error was mentioned here: >> > https://issues.apache.org/jira/browse/KAFKA-5070 >> > >> > Full log attached. >> > >> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_10] >> > Failed to flush state store session-id-start-events >> > at >> > org.apache.kafka.streams.processor.internals.ProcessorStateM >> anager.flush(ProcessorStateManager.java:337) >> > at >> > org.apache.kafka.streams.processor.internals.StandbyTask. >> commit(StandbyTask.java:94) >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread. >> commitOne(StreamThread.java:807) >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread. >> commitAll(StreamThread.java:797) >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread. >> maybeCommit(StreamThread.java:769) >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread. >> runLoop(StreamThread.java:647) >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread. >> run(StreamThread.java:361) >> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: >> Error >> > while executing flush from store session-id-start-events >> > at >> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI >> nternal(RocksDBStore.java:354) >> > at >> > org.apache.kafka.streams.state.internals.RocksDBStore.flush( >> RocksDBStore.java:345) >> > at >> > org.apache.kafka.streams.state.internals.WrappedStateStore$A >> bstractWrappedStateStore.flush(WrappedStateStore.java:80) >> > at >> > org.apache.kafka.streams.state.internals.WrappedStateStore$A >> bstractWrappedStateStore.flush(WrappedStateStore.java:80) >> > at >> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor >> e$6.run(MeteredKeyValueStore.java:92) >> > at >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) >> > at >> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor >> e.flush(MeteredKeyValueStore.java:186) >> > at >> > org.apache.kafka.streams.processor.internals.ProcessorStateM >> anager.flush(ProcessorStateManager.java:335) >> > ... 6 more >> > Caused by: org.rocksdb.RocksDBException: v >> > at org.rocksdb.RocksDB.flush(Native Method) >> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) >> > at >> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI >> nternal(RocksDBStore.java:352) >> > ... 13 more >> > >> > >> > >