[
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462068#comment-16462068
]
Stefan Richter commented on FLINK-9268:
---------------------------------------
The 2GB limit actually applies on a per-key-per-window basis, that is why it is
very surprising that your job hits it. But from the stack trace I am very
confident that this is the cause for the exception. I am not sure what you mean
in the last question. The window will collect data from all events with the
same key in that window as one list to which it appends and on trigger it
iterates the list. This list is what seems to grow out of bounds for your job
for some key-window-pair(s).
> RockDB errors from WindowOperator
> ---------------------------------
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, State Backends, Checkpointing
> Affects Versions: 1.4.2
> Reporter: Narayanan Arunachalam
> Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and
> uses processing time. The job fails with the error given below after running
> for few hours. The only way to recover from this error is to cancel the job
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
> .addSource(makeKafkaSource(config))
> .map(makeEvent)
> .keyBy(_.get(EVENT_GROUP_ID))
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
> .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
> .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=900000
> checkpoint.timeout=300000
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:810)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)