Narayanan Arunachalam created FLINK-9268: --------------------------------------------
Summary: RockDB errors from WindowOperator Key: FLINK-9268 URL: https://issues.apache.org/jira/browse/FLINK-9268 Project: Flink Issue Type: Bug Components: DataStream API, State Backends, Checkpointing Reporter: Narayanan Arunachalam The job has no sinks, one Kafka source, does a windowing based on session and uses processing time. The job fails with the error given below after running for few hours. The only way to recover from this error is to cancel the job and start a new one. Using S3 backend for externalized checkpoints. A representative job DAG: val streams = sEnv .addSource(makeKafkaSource(config)) .map(makeEvent) .keyBy(_.get(EVENT_GROUP_ID)) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) .apply(makeEventsList) .addSink(makeNoOpSink) A representative config: state.backend=rocksDB checkpoint.enabled=true external.checkpoint.enabled=true checkpoint.mode=AT_LEAST_ONCE checkpoint.interval=900000 checkpoint.timeout=300000 Error: TimerException\{java.lang.NegativeArraySizeException} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NegativeArraySizeException at org.rocksdb.RocksDB.get(Native Method) at org.rocksdb.RocksDB.get(RocksDB.java:810) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)