Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/3477#discussion_r105700356
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -247,27 +275,148 @@ public int compare(final StateTransition<T> o1,
final StateTransition<T> o2) {
* @return Collection of computation states which result from the
current one
*/
private Collection<ComputationState<T>> computeNextStates(
- final ComputationState<T> computationState,
- final T event,
- final long timestamp) {
- Stack<State<T>> states = new Stack<>();
- ArrayList<ComputationState<T>> resultingComputationStates = new
ArrayList<>();
- State<T> state = computationState.getState();
+ final ComputationState<T> computationState,
+ final T event,
+ final long timestamp) {
+ final ArrayList<ComputationState<T>> resultingComputationStates
= new ArrayList<>();
+
+ final OutgoingEdges<T> outgoingEdges =
createDecisionGraph(computationState, event);
+
+ // Create the computing version based on the previously
computed edges
+ // We need to defer the creation of computation states until we
know how many edges start
+ // at this computation state so that we can assign proper
version
+ final List<StateTransition<T>> edges = outgoingEdges.getEdges();
+ Integer takeBranchesToVisit = Math.max(0,
outgoingEdges.getTotalTakeBranches() - 1);
+ Integer ignoreBranchesToVisit =
outgoingEdges.getTotalIgnoreBranches();
+ for (StateTransition<T> edge : edges) {
+ switch (edge.getAction()) {
+ case IGNORE: {
+ if (!computationState.isStartState()) {
+ final DeweyNumber version;
+ if
(!isEquivalentState(edge.getTargetState(), computationState.getState())) {
+ version =
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+ ignoreBranchesToVisit--;
+ } else {
+ final int toIncrease =
calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(),
+
outgoingEdges.getTotalTakeBranches());
+ version =
computationState.getVersion().increase(toIncrease);
+ }
+
+ resultingComputationStates.add(
+
ComputationState.createState(
+
edge.getTargetState(),
+
computationState.getPreviousState(),
+
computationState.getEvent(),
+
computationState.getTimestamp(),
+ version,
+
computationState.getStartTimestamp()
+ )
+ );
+ sharedBuffer.lock(
+
edge.getTargetState().getName(),
+
computationState.getEvent(),
+
computationState.getTimestamp());
+ }
+ }
+ break;
+ case TAKE:
+ final State<T> newState =
edge.getTargetState();
+ final State<T> consumingState =
edge.getSourceState();
+ final State<T> previousEventState =
computationState.getPreviousState();
+
+ final T previousEvent =
computationState.getEvent();
+ final DeweyNumber currentVersion =
computationState.getVersion();
+
+ final DeweyNumber
newComputationStateVersion = new
DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+ takeBranchesToVisit--;
+
+ final long startTimestamp;
+ if (computationState.isStartState()) {
+ startTimestamp = timestamp;
+ sharedBuffer.put(
+
consumingState.getName(),
+ event,
+ timestamp,
+ currentVersion);
+ } else {
+ startTimestamp =
computationState.getStartTimestamp();
+ sharedBuffer.put(
+
consumingState.getName(),
+ event,
+ timestamp,
+
previousEventState.getName(),
+ previousEvent,
+
computationState.getTimestamp(),
+ currentVersion);
+ }
+
+ // a new computation state is referring
to the shared entry
+
sharedBuffer.lock(consumingState.getName(), event, timestamp);
+
+
resultingComputationStates.add(ComputationState.createState(
+ newState,
+ consumingState,
+ event,
+ timestamp,
+ newComputationStateVersion,
+ startTimestamp
+ ));
+ break;
+ }
+ }
- states.push(state);
+ if (computationState.isStartState()) {
+ final int totalBranches =
calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(),
outgoingEdges.getTotalTakeBranches());
+ final ComputationState<T> startState =
createStartState(computationState, totalBranches);
+ resultingComputationStates.add(startState);
+ }
+
+ if (computationState.getEvent() != null) {
+ // release the shared entry referenced by the current
computation state.
+ sharedBuffer.release(
+ computationState.getState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
+ // try to remove unnecessary shared buffer entries
+ sharedBuffer.remove(
+ computationState.getState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
+ }
+
+ return resultingComputationStates;
+ }
+
+ private int calculateIncreasingSelfState(int ignoreBranches, int
takeBranches) {
+ if (takeBranches == 0 && ignoreBranches == 0) {
--- End diff --
This can become: `return takeBranches == 0 && ignoreBranches == 0 ? 0 :
ignoreBranches + 1;`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---