Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/2361#discussion_r97790271
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
---
@@ -130,26 +114,166 @@
}
// add the beginning state
- final State<T> beginningState;
+ State<T> beginningState =
states.get(BEGINNING_STATE_NAME);;
+ addTransitions(beginningState, -1, patterns, states);
+ return new NFAFactoryImpl<T>(inputTypeSerializer,
windowTime, new HashSet<>(states.values()), timeoutHandling);
+ }
+ }
- if (states.containsKey(BEGINNING_STATE_NAME)) {
- beginningState =
states.get(BEGINNING_STATE_NAME);
- } else {
- beginningState = new
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
- states.put(BEGINNING_STATE_NAME,
beginningState);
- }
+ private static <T> void addTransitions(State<T> currentState, int
patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
+ Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
+ State<T> succeedingState =
states.get(succeedingPattern.getName());
- beginningState.addStateTransition(new
StateTransition<T>(
+ if (shouldRepeatPattern(patternPos, patterns)) {
+ expandRepeatingPattern(currentState, patternPos,
patterns, states);
+ } else {
+ currentState.addStateTransition(new StateTransition<T>(
StateTransitionAction.TAKE,
- currentState,
- (FilterFunction<T>)
currentPattern.getFilterFunction()
+ succeedingState,
+ (FilterFunction<T>)
succeedingPattern.getFilterFunction()
));
- return new NFAFactoryImpl<T>(inputTypeSerializer,
windowTime, new HashSet<>(states.values()), timeoutHandling);
+ if (shouldAddSelfTransition(succeedingPattern)) {
+ addTransitionToSelf(succeedingPattern,
succeedingState);
+ }
+ if (isPatternOptional(succeedingPattern)) {
+ addOptionalTransitions(currentState,
patternPos, patterns, states);
+ }
+ }
+ }
+
+ private static <T> void addOptionalTransitions(State<T> currentState,
int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>>
states) {
+ int firstNonOptionalPattern =
findFirstNonOptionalPattern(patterns, patternPos + 1);
+
+ for (int optionalPatternPos = patternPos + 2;
+ optionalPatternPos <
Math.min(firstNonOptionalPattern + 1, patterns.size());
+ optionalPatternPos++) {
+
+ Pattern<T, ?> optionalPattern =
patterns.get(optionalPatternPos);
+ State<T> optionalState =
states.get(optionalPattern.getName());
+ currentState.addStateTransition(new StateTransition<>(
+ StateTransitionAction.TAKE,
+ optionalState,
+ (FilterFunction<T>)
optionalPattern.getFilterFunction()));
}
}
/**
+ * Expand a pattern number of times and connect expanded states. E.g.
count(3) wil result in:
+ *
+ * +-----+ +-------+ +-------+
+ * |State+->|State#1+->|State#2+
+ * +--+--+ +-------+ +--+----+
+ */
+ private static <T> void expandRepeatingPattern(State<T> currentState,
int patternPos,
+
ArrayList<Pattern<T, ?>> patterns, Map<String,
State<T>> states) {
+ Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
+ State<T> succeedingState =
states.get(succeedingPattern.getName());
+ Pattern<T, ?> currentPattern = patterns.get(patternPos);
+
+ State<T> currentRepeatingState = null;
+ State<T> nextRepeatingState = currentState;
+ for (int i = 1; i < currentPattern.getMaxCount(); i++) {
+ currentRepeatingState = nextRepeatingState;
+ nextRepeatingState = new State<>(
+ currentState.getName() + "#" + i,
+ State.StateType.Normal);
+ states.put(nextRepeatingState.getName(),
nextRepeatingState);
+ currentRepeatingState.addStateTransition(new
StateTransition<T>(
+ StateTransitionAction.TAKE,
+ nextRepeatingState,
+ (FilterFunction<T>)
currentPattern.getFilterFunction()));
+
+ // Add a transition around optional pattern.
+ // count(2,3) will result in:
+ // +-----+ +-------+ +-------+ +----+
+ // |State+->|State#1+-->|State#2+->|Next|
+ // +--+--+ +-------+ +--+----+ +-+--+
+ // | ^
+ // +--------------------+
+ if (i >= currentPattern.getMinCount()) {
+ currentRepeatingState.addStateTransition(new
StateTransition<T>(
+ StateTransitionAction.TAKE,
+ succeedingState,
+ (FilterFunction<T>)
succeedingPattern.getFilterFunction()));
+ }
+ }
+ nextRepeatingState.addStateTransition(new StateTransition<T>(
+ StateTransitionAction.TAKE,
+ succeedingState,
+ (FilterFunction<T>)
succeedingPattern.getFilterFunction()));
+ }
+
+ private static <T> boolean shouldRepeatPattern(int patternPos,
ArrayList<Pattern<T, ?>> patterns) {
+ if (patternPos == -1) {
+ return false;
+ }
+
+ Pattern<T, ?> pattern = patterns.get(patternPos);
+ return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
+ }
+
+ private static <T> void addTransitionToSelf(Pattern<T, ?>
succeedingPattern, State<T> succeedingState) {
+ succeedingState.addStateTransition(new StateTransition<T>(
+ StateTransitionAction.TAKE,
+ succeedingState,
+ (FilterFunction<T>)
succeedingPattern.getFilterFunction()));
+ }
+
+ private static <T> boolean shouldAddSelfTransition(Pattern<T, ?>
succeedingPattern) {
+ return succeedingPattern.getQuantifier() ==
Quantifier.ZERO_OR_MANY
+ || succeedingPattern.getQuantifier() ==
Quantifier.ONE_OR_MANY;
+ }
+
+ private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T,
?>> patterns, int startPos) {
+ int pos = startPos;
+ for (; pos < patterns.size(); pos++) {
+ Pattern<T, ?> pattern = patterns.get(pos);
+ if (!isPatternOptional(pattern)) {
+ return pos;
+ }
+ }
+
+ return pos;
+ }
+
+ private static <T> Map<String, State<T>>
createStatesFrom(ArrayList<Pattern<T, ?>> patterns) {
--- End diff --
The `patterns` can become a `List` instead of `ArrayList`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---