SteNicholas commented on a change in pull request #19259:
URL: https://github.com/apache/flink/pull/19259#discussion_r839347821
##########
File path:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
##########
@@ -97,6 +100,18 @@ public void
setNewPartialMatches(PriorityQueue<ComputationState> newPartialMatch
this.partialMatches = newPartialMatches;
}
+ public boolean isNewStartPartialMatch() {
+ return isNewStartPartialMatch;
+ }
+
+ public void resetNewStartPartialMatch() {
Review comment:
Do the `resetNewStartPartialMatch` and `setNewStartPartiailMatch`
combine to one method?
##########
File path:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
##########
@@ -417,6 +405,9 @@ private void processEvent(NFAState nfaState, IN event, long
timestamp) throws Ex
timestamp,
afterMatchSkipStrategy,
cepTimerService);
+ if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {
+ registerTimer(timestamp + nfa.getWindowTime());
Review comment:
If the `windowTime` of the `NFA` equals to 0, should this registers a
time for {@code current watermark + 1}?
##########
File path:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
##########
@@ -333,6 +337,10 @@ private boolean isStateTimedOut(final ComputationState
state, final long timesta
boolean shouldDiscardPath = false;
for (final ComputationState newComputationState :
newComputationStates) {
+ if (isStartState(computationState) &&
newComputationState.getStartTimestamp() > 0) {
+ nfaState.setNewStartPartiailMatch();
Review comment:
If the `computationState` is in start state and the start timestamp
equal to 0, what's the behaivor of this situation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]