Hai Zhou created FLINK-9233:
-------------------------------
Summary: Merging state may cause runtime exception when windows
trigger onMerge
Key: FLINK-9233
URL: https://issues.apache.org/jira/browse/FLINK-9233
Project: Flink
Issue Type: Bug
Components: State Backends, Checkpointing
Affects Versions: 1.4.0
Reporter: Hai Zhou
the main logic of my flink job is as follows:
{code:java}
clickStream.coGroup(exposureStream).where(...).equalTo(...)
.window(EventTimeSessionWindows.withGap())
.trigger(new SessionMatchTrigger)
.evictor()
.apply();
{code}
{code:java}
SessionMatchTrigger{
ReducingStateDescriptor stateDesc = new ReducingStateDescriptor()
...
public boolean canMerge() {
return true;
}
public void onMerge(TimeWindow window, OnMergeContext ctx) {
ctx.mergePartitionedState(this.stateDesc);
ctx.registerEventTimeTimer(window.maxTimestamp());
}
....
}
{code}
{panel:title=detailed error logs}
java.lang.RuntimeException: Error while merging state.
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.mergePartitionedState(WindowOperator.java:895)
at com.package.trigger.SessionMatchTrigger.onMerge(SessionMatchTrigger.java:56)
at com.package.trigger.SessionMatchTrigger.onMerge(SessionMatchTrigger.java:14)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onMerge(WindowOperator.java:939)
at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$1.merge(EvictingWindowOperator.java:141)
at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$1.merge(EvictingWindowOperator.java:120)
at
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:119)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Error while merging state in RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBReducingState.mergeNamespaces(RocksDBReducingState.java:186)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.mergePartitionedState(WindowOperator.java:887)
... 12 more
Caused by: java.lang.IllegalArgumentException: Illegal value provided for
SubCode.
at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
at org.rocksdb.Status.<init>(Status.java:30)
at org.rocksdb.RocksDB.delete(Native Method)
at org.rocksdb.RocksDB.delete(RocksDB.java:1110)
at
org.apache.flink.contrib.streaming.state.RocksDBReducingState.mergeNamespaces(RocksDBReducingState.java:143)
... 13 more
{panel}
I found the reason of this error.
Due to Java's
{RocksDB.Status.SubCode}
was out of sync with
{include/rocksdb/status.h:SubCode}
.
When running out of disc space this led to an
{IllegalArgumentException}
because of an invalid status code, rather than just returning the corresponding
status code without an exception.
more details:<[https://github.com/facebook/rocksdb/pull/3050]>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)