[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462051#comment-16462051
 ] 

Narayanan Arunachalam commented on FLINK-9268:
----------------------------------------------

Is the 2GB Rocksdb limit apply to the size of one window state or a group of 
windows? If it's per window, I don't see why it would hit that limit. Because 
in my case, at max there will be only few hundred events collected in one 
window and size of each event is only kilo bytes. Is it possible for a session 
window to collect duplicate data?

> RockDB errors from WindowOperator
> ---------------------------------
>
>                 Key: FLINK-9268
>                 URL: https://issues.apache.org/jira/browse/FLINK-9268
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, State Backends, Checkpointing
>    Affects Versions: 1.4.2
>            Reporter: Narayanan Arunachalam
>            Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=900000
> checkpoint.timeout=300000
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to