[
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125189#comment-16125189
]
ASF GitHub Bot commented on FLINK-7169:
---------------------------------------
Github user dianfu commented on a diff in the pull request:
https://github.com/apache/flink/pull/4331#discussion_r132867483
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
+ private void
discardComputationStatesAccordingToStrategy(Queue<ComputationState<T>>
computationStates,
+ Collection<Map<String, List<T>>> matchedResult,
AfterMatchSkipStrategy afterMatchSkipStrategy) {
+ Set<T> discardEvents = new HashSet<>();
+ switch(afterMatchSkipStrategy.getStrategy()) {
+ case SKIP_TO_LAST:
+ for (Map<String, List<T>> resultMap:
matchedResult) {
+ for (Map.Entry<String, List<T>>
keyMatches : resultMap.entrySet()) {
+ if
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+
discardEvents.addAll(keyMatches.getValue().subList(0,
keyMatches.getValue().size() - 1));
+ break;
+ } else {
+
discardEvents.addAll(keyMatches.getValue());
+ }
+ }
+ }
+ break;
+ case SKIP_TO_FIRST:
+ for (Map<String, List<T>> resultMap:
matchedResult) {
+ for (Map.Entry<String, List<T>>
keyMatches : resultMap.entrySet()) {
+ if
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+ break;
+ } else {
+
discardEvents.addAll(keyMatches.getValue());
+ }
+ }
+ }
+ break;
+ case SKIP_PAST_LAST_EVENT:
+ for (Map<String, List<T>> resultMap:
matchedResult) {
+ for (List<T> eventList:
resultMap.values()) {
+ discardEvents.addAll(eventList);
+ }
+ }
+ break;
+ }
+ if (!discardEvents.isEmpty()) {
+ List<ComputationState<T>> discardStates = new
ArrayList<>();
+ for (ComputationState<T> computationState :
computationStates) {
+ Map<String, List<T>> partialMatch =
extractCurrentMatches(computationState);
+ for (List<T> list: partialMatch.values()) {
+ for (T e: list) {
+ if (discardEvents.contains(e)) {
+ // discard the
computation state.
+
eventSharedBuffer.release(
+
NFAStateNameHandler.getOriginalNameFromInternal(
+
computationState.getState().getName()),
+
computationState.getEvent(),
+
computationState.getTimestamp(),
+
computationState.getCounter()
+ );
+
discardStates.add(computationState);
--- End diff --
Should add **break;** after **discardStates.add(computationState);**, right?
> Support AFTER MATCH SKIP function in CEP library API
> ----------------------------------------------------
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
> Issue Type: Sub-task
> Components: CEP
> Reporter: Yueting Chen
> Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?>
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy)
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's
> what CEP library behaves currently.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)