[
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462164#comment-16462164
]
Stefan Richter commented on FLINK-9268:
---------------------------------------
It is hard to make any assumptions about your job without seeing the code, but
I still doubt that this is the reason. You could somewhat test your assumption
by running with EXACTLY_ONCE. However, given that you are using processing time
triggers, the absence of problems in a test run does not show that
exactly-once's deduplication is the cause why your problem does not show up. So
this test is only helpful to invalidate your duplication hypothesis in case we
you still observe the problem even under exactly-once.
> RockDB errors from WindowOperator
> ---------------------------------
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, State Backends, Checkpointing
> Affects Versions: 1.4.2
> Reporter: Narayanan Arunachalam
> Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and
> uses processing time. The job fails with the error given below after running
> for few hours. The only way to recover from this error is to cancel the job
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
> .addSource(makeKafkaSource(config))
> .map(makeEvent)
> .keyBy(_.get(EVENT_GROUP_ID))
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
> .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
> .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=900000
> checkpoint.timeout=300000
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:810)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)