[
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837829#comment-15837829
]
ASF GitHub Bot commented on FLINK-3318:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/2361#discussion_r97790385
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
---
@@ -130,26 +114,166 @@
}
// add the beginning state
- final State<T> beginningState;
+ State<T> beginningState =
states.get(BEGINNING_STATE_NAME);;
+ addTransitions(beginningState, -1, patterns, states);
+ return new NFAFactoryImpl<T>(inputTypeSerializer,
windowTime, new HashSet<>(states.values()), timeoutHandling);
+ }
+ }
- if (states.containsKey(BEGINNING_STATE_NAME)) {
- beginningState =
states.get(BEGINNING_STATE_NAME);
- } else {
- beginningState = new
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
- states.put(BEGINNING_STATE_NAME,
beginningState);
- }
+ private static <T> void addTransitions(State<T> currentState, int
patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
+ Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
+ State<T> succeedingState =
states.get(succeedingPattern.getName());
- beginningState.addStateTransition(new
StateTransition<T>(
+ if (shouldRepeatPattern(patternPos, patterns)) {
+ expandRepeatingPattern(currentState, patternPos,
patterns, states);
+ } else {
+ currentState.addStateTransition(new StateTransition<T>(
StateTransitionAction.TAKE,
- currentState,
- (FilterFunction<T>)
currentPattern.getFilterFunction()
+ succeedingState,
+ (FilterFunction<T>)
succeedingPattern.getFilterFunction()
));
- return new NFAFactoryImpl<T>(inputTypeSerializer,
windowTime, new HashSet<>(states.values()), timeoutHandling);
+ if (shouldAddSelfTransition(succeedingPattern)) {
+ addTransitionToSelf(succeedingPattern,
succeedingState);
+ }
+ if (isPatternOptional(succeedingPattern)) {
+ addOptionalTransitions(currentState,
patternPos, patterns, states);
+ }
+ }
+ }
+
+ private static <T> void addOptionalTransitions(State<T> currentState,
int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>>
states) {
+ int firstNonOptionalPattern =
findFirstNonOptionalPattern(patterns, patternPos + 1);
+
+ for (int optionalPatternPos = patternPos + 2;
+ optionalPatternPos <
Math.min(firstNonOptionalPattern + 1, patterns.size());
+ optionalPatternPos++) {
+
+ Pattern<T, ?> optionalPattern =
patterns.get(optionalPatternPos);
+ State<T> optionalState =
states.get(optionalPattern.getName());
+ currentState.addStateTransition(new StateTransition<>(
+ StateTransitionAction.TAKE,
+ optionalState,
+ (FilterFunction<T>)
optionalPattern.getFilterFunction()));
}
}
/**
+ * Expand a pattern number of times and connect expanded states. E.g.
count(3) wil result in:
+ *
+ * +-----+ +-------+ +-------+
+ * |State+->|State#1+->|State#2+
+ * +--+--+ +-------+ +--+----+
+ */
+ private static <T> void expandRepeatingPattern(State<T> currentState,
int patternPos,
+
ArrayList<Pattern<T, ?>> patterns, Map<String,
State<T>> states) {
+ Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
+ State<T> succeedingState =
states.get(succeedingPattern.getName());
+ Pattern<T, ?> currentPattern = patterns.get(patternPos);
+
+ State<T> currentRepeatingState = null;
+ State<T> nextRepeatingState = currentState;
+ for (int i = 1; i < currentPattern.getMaxCount(); i++) {
+ currentRepeatingState = nextRepeatingState;
+ nextRepeatingState = new State<>(
+ currentState.getName() + "#" + i,
+ State.StateType.Normal);
+ states.put(nextRepeatingState.getName(),
nextRepeatingState);
+ currentRepeatingState.addStateTransition(new
StateTransition<T>(
+ StateTransitionAction.TAKE,
+ nextRepeatingState,
+ (FilterFunction<T>)
currentPattern.getFilterFunction()));
+
+ // Add a transition around optional pattern.
+ // count(2,3) will result in:
+ // +-----+ +-------+ +-------+ +----+
+ // |State+->|State#1+-->|State#2+->|Next|
+ // +--+--+ +-------+ +--+----+ +-+--+
+ // | ^
+ // +--------------------+
+ if (i >= currentPattern.getMinCount()) {
+ currentRepeatingState.addStateTransition(new
StateTransition<T>(
+ StateTransitionAction.TAKE,
+ succeedingState,
+ (FilterFunction<T>)
succeedingPattern.getFilterFunction()));
+ }
+ }
+ nextRepeatingState.addStateTransition(new StateTransition<T>(
+ StateTransitionAction.TAKE,
+ succeedingState,
+ (FilterFunction<T>)
succeedingPattern.getFilterFunction()));
+ }
+
+ private static <T> boolean shouldRepeatPattern(int patternPos,
ArrayList<Pattern<T, ?>> patterns) {
+ if (patternPos == -1) {
+ return false;
+ }
+
+ Pattern<T, ?> pattern = patterns.get(patternPos);
+ return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
+ }
+
+ private static <T> void addTransitionToSelf(Pattern<T, ?>
succeedingPattern, State<T> succeedingState) {
+ succeedingState.addStateTransition(new StateTransition<T>(
+ StateTransitionAction.TAKE,
+ succeedingState,
+ (FilterFunction<T>)
succeedingPattern.getFilterFunction()));
+ }
+
+ private static <T> boolean shouldAddSelfTransition(Pattern<T, ?>
succeedingPattern) {
+ return succeedingPattern.getQuantifier() ==
Quantifier.ZERO_OR_MANY
+ || succeedingPattern.getQuantifier() ==
Quantifier.ONE_OR_MANY;
+ }
+
+ private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T,
?>> patterns, int startPos) {
+ int pos = startPos;
+ for (; pos < patterns.size(); pos++) {
+ Pattern<T, ?> pattern = patterns.get(pos);
+ if (!isPatternOptional(pattern)) {
+ return pos;
+ }
+ }
+
+ return pos;
+ }
+
+ private static <T> Map<String, State<T>>
createStatesFrom(ArrayList<Pattern<T, ?>> patterns) {
+ Map<String, State<T>> states = new HashMap<>();
+
+ boolean foundNonOptionalPattern = false;
+ for (int i = patterns.size() - 1; i >= 0; i--) {
+ Pattern<T, ?> pattern = patterns.get(i);
+ State.StateType stateType = foundNonOptionalPattern ?
State.StateType.Normal
+
: State.StateType.Final;
+ State<T> newState = new State<>(pattern.getName(),
stateType);
+ foundNonOptionalPattern |= !isPatternOptional(pattern);
+ states.put(newState.getName(), newState);
+ }
+
+ State<T> beginningState = new State<>(BEGINNING_STATE_NAME,
State.StateType.Start);
+ states.put(BEGINNING_STATE_NAME, beginningState);
+ return states;
+ }
+
+ private static <T> boolean isPatternOptional(Pattern<T, ?> pattern) {
+ return pattern.getQuantifier() == Quantifier.ZERO_OR_MANY
+ || pattern.getQuantifier() == Quantifier.OPTIONAL
+ || pattern.getMinCount() == 0;
+ }
+
+ private static <T> ArrayList<Pattern<T, ?>>
createPatternsList(Pattern<T, ?> pattern) {
--- End diff --
The return type can become a `List` instead of `ArrayList`.
> Add support for quantifiers to CEP's pattern API
> ------------------------------------------------
>
> Key: FLINK-3318
> URL: https://issues.apache.org/jira/browse/FLINK-3318
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Affects Versions: 1.0.0
> Reporter: Till Rohrmann
> Assignee: Ivan Mushketyk
> Priority: Minor
>
> It would be a good addition to extend the pattern API to support quantifiers
> known from regular expressions (e.g. Kleene star, ?, +, or count bounds).
> This would considerably enrich the set of supported patterns.
> Implementing the count bounds could be done by unrolling the pattern state.
> In order to support the Kleene star operator, the {{NFACompiler}} has to be
> extended to insert epsilon-transition between a Kleene start state and the
> succeeding pattern state. In order to support {{?}}, one could insert two
> paths from the preceding state, one which accepts the event and another which
> directly goes into the next pattern state.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)