Github user dianfu commented on a diff in the pull request:
https://github.com/apache/flink/pull/4153#discussion_r125173832
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
---
@@ -455,6 +548,76 @@ private void addStopStateToLooping(final State<T>
loopingState) {
}
/**
+ * Create all the states for the group pattern.
+ *
+ * @param groupPattern the group pattern to create the states
for
+ * @param sinkState the state that the group pattern being
converted should point to
+ * @param proceedState the state that the group pattern being
converted should proceed to
+ * @param isOptional whether the group pattern being converted
is optional
+ * @return the first state of the states of the group pattern
+ */
+ private State<T> createGroupPatternState(
+ final GroupPattern<T, ?> groupPattern,
+ final State<T> sinkState,
+ final State<T> proceedState,
+ final boolean isOptional) {
+ final IterativeCondition<T> trueFunction =
BooleanConditions.trueFunction();
+
+ Pattern<T, ?> oldCurrentPattern = currentPattern;
+ Pattern<T, ?> oldFollowingPattern = followingPattern;
+ GroupPattern<T, ?> oldGroupPattern =
currentGroupPattern;
+ try {
+ State<T> lastSink = sinkState;
+ currentGroupPattern = groupPattern;
+ currentPattern = groupPattern.getRawPattern();
+ lastSink = createMiddleStates(lastSink);
+ lastSink = convertPattern(lastSink);
+ if (isOptional) {
+ // for the first state of a group
pattern, its PROCEED edge should point to
+ // the following state of that group
pattern
+ lastSink.addProceed(proceedState,
trueFunction);
+ }
+ return lastSink;
+ } finally {
+ currentPattern = oldCurrentPattern;
+ followingPattern = oldFollowingPattern;
+ currentGroupPattern = oldGroupPattern;
+ }
+ }
+
+ /**
+ * Create the states for the group pattern as a looping one.
+ *
+ * @param groupPattern the group pattern to create the states
for
+ * @param sinkState the state that the group pattern being
converted should point to
+ * @return the first state of the states of the group pattern
+ */
+ private State<T> createLoopingGroupPatternState(
+ final GroupPattern<T, ?> groupPattern,
+ final State<T> sinkState) {
+ final IterativeCondition<T> trueFunction =
BooleanConditions.trueFunction();
+
+ Pattern<T, ?> oldCurrentPattern = currentPattern;
+ Pattern<T, ?> oldFollowingPattern = followingPattern;
+ GroupPattern<T, ?> oldGroupPattern =
currentGroupPattern;
+ try {
--- End diff --
updated.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---