[
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524983#comment-16524983
]
ASF GitHub Bot commented on FLINK-9593:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/6171#discussion_r198473858
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -330,77 +328,85 @@ private boolean isStateTimedOut(final
ComputationState state, final long timesta
}
}
- discardComputationStatesAccordingToStrategy(
- sharedBuffer, computationStates, result,
afterMatchSkipStrategy);
+ if (!potentialMatches.isEmpty()) {
+ nfaState.setStateChanged();
+ }
+
+ List<Map<String, List<T>>> result = new ArrayList<>();
+ if (afterMatchSkipStrategy.isSkipStrategy()) {
+ processMatchesAccordingToSkipStrategy(sharedBuffer,
+ nfaState,
+ afterMatchSkipStrategy,
+ potentialMatches,
+ result);
+ } else {
+ for (ComputationState match : potentialMatches) {
+
result.add(sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(match.getPreviousBufferEntry(),
+ match.getVersion()).get(0)));
+
sharedBuffer.releaseNode(match.getPreviousBufferEntry());
+ }
+ }
return result;
}
- private void discardComputationStatesAccordingToStrategy(
- final SharedBuffer<T> sharedBuffer,
- final Queue<ComputationState> computationStates,
- final Collection<Map<String, List<T>>> matchedResult,
- final AfterMatchSkipStrategy afterMatchSkipStrategy)
throws Exception {
+ private void processMatchesAccordingToSkipStrategy(
+ SharedBuffer<T> sharedBuffer,
+ NFAState nfaState,
+ AfterMatchSkipStrategy afterMatchSkipStrategy,
+ PriorityQueue<ComputationState> potentialMatches,
+ List<Map<String, List<T>>> result) throws Exception {
- Set<T> discardEvents = new HashSet<>();
- switch(afterMatchSkipStrategy.getStrategy()) {
- case SKIP_TO_LAST:
- for (Map<String, List<T>> resultMap:
matchedResult) {
- for (Map.Entry<String, List<T>>
keyMatches : resultMap.entrySet()) {
- if
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
-
discardEvents.addAll(keyMatches.getValue().subList(0,
keyMatches.getValue().size() - 1));
- break;
- } else {
-
discardEvents.addAll(keyMatches.getValue());
- }
- }
- }
- break;
- case SKIP_TO_FIRST:
- for (Map<String, List<T>> resultMap:
matchedResult) {
- for (Map.Entry<String, List<T>>
keyMatches : resultMap.entrySet()) {
- if
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
- break;
- } else {
-
discardEvents.addAll(keyMatches.getValue());
- }
- }
- }
- break;
- case SKIP_PAST_LAST_EVENT:
- for (Map<String, List<T>> resultMap:
matchedResult) {
- for (List<T> eventList:
resultMap.values()) {
- discardEvents.addAll(eventList);
- }
- }
- break;
- }
- if (!discardEvents.isEmpty()) {
- List<ComputationState> discardStates = new
ArrayList<>();
- for (ComputationState computationState :
computationStates) {
- boolean discard = false;
- Map<String, List<T>> partialMatch =
extractCurrentMatches(sharedBuffer, computationState);
- for (List<T> list: partialMatch.values()) {
- for (T e: list) {
- if (discardEvents.contains(e)) {
- // discard the
computation state.
- discard = true;
- break;
- }
- }
- if (discard) {
- break;
- }
- }
- if (discard) {
-
sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
- discardStates.add(computationState);
- }
+ nfaState.getCompletedMatches().addAll(potentialMatches);
+
+ ComputationState earliestMatch =
nfaState.getCompletedMatches().peek();
+
+ if (earliestMatch != null) {
+ Queue<ComputationState> sortedPartialMatches =
sortByStartTime(nfaState.getPartialMatches());
--- End diff --
Instead of sorting every time, why not keeping the partial matches in a
priority queue?
> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---------------------------------------------------
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)