[
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494851#comment-16494851
]
Stefan Richter commented on FLINK-9268:
---------------------------------------
Please notice that this is our attempt to "fix" (= for now, a better error
message) the problem on the RocksDB side:
https://github.com/facebook/rocksdb/issues/3849.
> RockDB errors from WindowOperator
> ---------------------------------
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, State Backends, Checkpointing
> Affects Versions: 1.4.2
> Reporter: Narayanan Arunachalam
> Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and
> uses processing time. The job fails with the error given below after running
> for few hours. The only way to recover from this error is to cancel the job
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
> .addSource(makeKafkaSource(config))
> .map(makeEvent)
> .keyBy(_.get(EVENT_GROUP_ID))
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
> .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
> .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=900000
> checkpoint.timeout=300000
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:810)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)