Narayanan Arunachalam created FLINK-9268:
--------------------------------------------

             Summary: RockDB errors from WindowOperator
                 Key: FLINK-9268
                 URL: https://issues.apache.org/jira/browse/FLINK-9268
             Project: Flink
          Issue Type: Bug
          Components: DataStream API, State Backends, Checkpointing
            Reporter: Narayanan Arunachalam


The job has no sinks, one Kafka source, does a windowing based on session and 
uses processing time. The job fails with the error given below after running 
for few hours. The only way to recover from this error is to cancel the job and 
start a new one.

Using S3 backend for externalized checkpoints.

A representative job DAG:

val streams = sEnv
 .addSource(makeKafkaSource(config))
 .map(makeEvent)
 .keyBy(_.get(EVENT_GROUP_ID))
 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
 .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
 .apply(makeEventsList)
.addSink(makeNoOpSink)

A representative config:

state.backend=rocksDB

checkpoint.enabled=true
external.checkpoint.enabled=true
checkpoint.mode=AT_LEAST_ONCE
checkpoint.interval=900000
checkpoint.timeout=300000

Error:

TimerException\{java.lang.NegativeArraySizeException}
 at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
 at org.rocksdb.RocksDB.get(Native Method)
 at org.rocksdb.RocksDB.get(RocksDB.java:810)
 at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
 at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
 at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
 at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
 at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to