[
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837831#comment-15837831
]
ASF GitHub Bot commented on FLINK-3318:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/2361#discussion_r97790026
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
---
@@ -130,26 +114,166 @@
}
// add the beginning state
- final State<T> beginningState;
+ State<T> beginningState =
states.get(BEGINNING_STATE_NAME);;
+ addTransitions(beginningState, -1, patterns, states);
+ return new NFAFactoryImpl<T>(inputTypeSerializer,
windowTime, new HashSet<>(states.values()), timeoutHandling);
+ }
+ }
- if (states.containsKey(BEGINNING_STATE_NAME)) {
- beginningState =
states.get(BEGINNING_STATE_NAME);
- } else {
- beginningState = new
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
- states.put(BEGINNING_STATE_NAME,
beginningState);
- }
+ private static <T> void addTransitions(State<T> currentState, int
patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
+ Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
+ State<T> succeedingState =
states.get(succeedingPattern.getName());
- beginningState.addStateTransition(new
StateTransition<T>(
+ if (shouldRepeatPattern(patternPos, patterns)) {
+ expandRepeatingPattern(currentState, patternPos,
patterns, states);
+ } else {
+ currentState.addStateTransition(new StateTransition<T>(
StateTransitionAction.TAKE,
- currentState,
- (FilterFunction<T>)
currentPattern.getFilterFunction()
+ succeedingState,
+ (FilterFunction<T>)
succeedingPattern.getFilterFunction()
));
- return new NFAFactoryImpl<T>(inputTypeSerializer,
windowTime, new HashSet<>(states.values()), timeoutHandling);
+ if (shouldAddSelfTransition(succeedingPattern)) {
+ addTransitionToSelf(succeedingPattern,
succeedingState);
+ }
+ if (isPatternOptional(succeedingPattern)) {
+ addOptionalTransitions(currentState,
patternPos, patterns, states);
+ }
+ }
+ }
+
+ private static <T> void addOptionalTransitions(State<T> currentState,
int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>>
states) {
+ int firstNonOptionalPattern =
findFirstNonOptionalPattern(patterns, patternPos + 1);
+
+ for (int optionalPatternPos = patternPos + 2;
+ optionalPatternPos <
Math.min(firstNonOptionalPattern + 1, patterns.size());
+ optionalPatternPos++) {
+
+ Pattern<T, ?> optionalPattern =
patterns.get(optionalPatternPos);
+ State<T> optionalState =
states.get(optionalPattern.getName());
+ currentState.addStateTransition(new StateTransition<>(
+ StateTransitionAction.TAKE,
+ optionalState,
+ (FilterFunction<T>)
optionalPattern.getFilterFunction()));
}
}
/**
+ * Expand a pattern number of times and connect expanded states. E.g.
count(3) wil result in:
+ *
+ * +-----+ +-------+ +-------+
+ * |State+->|State#1+->|State#2+
+ * +--+--+ +-------+ +--+----+
+ */
+ private static <T> void expandRepeatingPattern(State<T> currentState,
int patternPos,
+
ArrayList<Pattern<T, ?>> patterns, Map<String,
State<T>> states) {
--- End diff --
The `patterns` can become a `List` instead of `ArrayList`.
> Add support for quantifiers to CEP's pattern API
> ------------------------------------------------
>
> Key: FLINK-3318
> URL: https://issues.apache.org/jira/browse/FLINK-3318
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Affects Versions: 1.0.0
> Reporter: Till Rohrmann
> Assignee: Ivan Mushketyk
> Priority: Minor
>
> It would be a good addition to extend the pattern API to support quantifiers
> known from regular expressions (e.g. Kleene star, ?, +, or count bounds).
> This would considerably enrich the set of supported patterns.
> Implementing the count bounds could be done by unrolling the pattern state.
> In order to support the Kleene star operator, the {{NFACompiler}} has to be
> extended to insert epsilon-transition between a Kleene start state and the
> succeeding pattern state. In order to support {{?}}, one could insert two
> paths from the preceding state, one which accepts the event and another which
> directly goes into the next pattern state.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)