Narayanan Arunachalam created FLINK-9268:
--------------------------------------------
Summary: RockDB errors from WindowOperator
Key: FLINK-9268
URL: https://issues.apache.org/jira/browse/FLINK-9268
Project: Flink
Issue Type: Bug
Components: DataStream API, State Backends, Checkpointing
Reporter: Narayanan Arunachalam
The job has no sinks, one Kafka source, does a windowing based on session and
uses processing time. The job fails with the error given below after running
for few hours. The only way to recover from this error is to cancel the job and
start a new one.
Using S3 backend for externalized checkpoints.
A representative job DAG:
val streams = sEnv
.addSource(makeKafkaSource(config))
.map(makeEvent)
.keyBy(_.get(EVENT_GROUP_ID))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
.trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
.apply(makeEventsList)
.addSink(makeNoOpSink)
A representative config:
state.backend=rocksDB
checkpoint.enabled=true
external.checkpoint.enabled=true
checkpoint.mode=AT_LEAST_ONCE
checkpoint.interval=900000
checkpoint.timeout=300000
Error:
TimerException\{java.lang.NegativeArraySizeException}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)