[
https://issues.apache.org/jira/browse/FLINK-10960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718886#comment-16718886
]
Dawid Wysakowicz edited comment on FLINK-10960 at 12/12/18 12:03 PM:
---------------------------------------------------------------------
Hi [~Jamalarm],
The exception suggests that the underlying state machine changed.
Actually I think the reason for this exception is changing the {{times}}
parameter as that results in a different state machine. The {{times( n )}}
quantifier is translated into {{n}} separate states. As you change the number,
then the state machine cannot be mapped from the old to the new one. I guess
the reason you cannot reproduce this in local environment is that you do not
take the savepoint when the state machine is in one of the states that do not
longer exist in the updated one.
was (Author: dawidwys):
Hi [~Jamalarm],
The exception suggests that the underlying state machine changed.
Actually I think the reason for this exception is changing the {{times}}
parameter as that results in a different state machine. The {{times (n) }}
quantifier is translated into {{n}} separate states. As you change the number,
then the state machine cannot be mapped from the old to the new one. I guess
the reason you cannot reproduce this in local environment is that you do not
take the savepoint when the state machine is in one of the states that do not
longer exist in the updated one.
> CEP: Job Failure when .times(2) is used
> ---------------------------------------
>
> Key: FLINK-10960
> URL: https://issues.apache.org/jira/browse/FLINK-10960
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.6.2
> Reporter: Thomas Wozniakowski
> Priority: Critical
>
> Hi Guys,
> Encountered a strange one today. We use the CEP library in a configurable way
> where we plug a config file into the Flink Job JAR and it programmatically
> sets up a bunch of CEP operators matching the config file.
> I encountered a strange bug when I was testing with some artificially low
> numbers in our testing environment today. The CEP code we're using (modified
> slightly) is:
> {code:java}
> Pattern.begin(EVENT_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
> .times(config.getNumberOfUniqueEvents())
> .where(uniquenessCheckOnAlreadyMatchedEvents())
> .within(seconds(config.getWithinSeconds()));
> {code}
> When using the {{numberOfUniqueEvents: 2}}, I started seeing the following
> error killing the job whenever a match was detected:
> {quote}
> ava.lang.RuntimeException: Exception occurred while processing valve output
> watermark:
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkRuntimeException: State eventSequence:2
> does not exist in the NFA. NFA has states [Final State $endState$ [
> ]), Normal State eventSequence [
> StateTransition(TAKE, from eventSequenceto $endState$, with condition),
> StateTransition(IGNORE, from eventSequenceto eventSequence, with
> condition),
> ]), Start State eventSequence:0 [
> StateTransition(TAKE, from eventSequence:0to eventSequence, with
> condition),
> ])]
> at org.apache.flink.cep.nfa.NFA.isStartState(NFA.java:144)
> at org.apache.flink.cep.nfa.NFA.isStateTimedOut(NFA.java:270)
> at org.apache.flink.cep.nfa.NFA.advanceTime(NFA.java:244)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.advanceTime(AbstractKeyedCEPPatternOperator.java:389)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:293)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> {quote}
> Changing the config to {{numberOfUniqueEvents: 3}} fixed the problem.
> Changing it back to 2 brought the problem back. It seems to be specifically
> related to the value of 2.
> This is not a blocking issue for me because we typically use much higher
> numbers than this in production anyway, but I figured you guys might want to
> know about this issue.
> Let me know if you need any more information.
> Tom
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)