Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/3477#discussion_r105699441
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -209,33 +204,66 @@ public boolean equals(Object obj) {
return
nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
sharedBuffer.equals(other.sharedBuffer) &&
states.equals(other.states) &&
- windowTime == other.windowTime &&
- startEventCounter == other.startEventCounter;
+ windowTime == other.windowTime;
} else {
return false;
}
}
@Override
public int hashCode() {
- return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer,
states, windowTime, startEventCounter);
+ return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer,
states, windowTime);
+ }
+
+ private static <T> boolean isEquivalentState(final State<T> s1, final
State<T> s2) {
+ return s1.getName().equals(s2.getName());
}
/**
- * Comparator used for imposing the assumption that IGNORE is always
the last StateTransition in a state.
- */
- private interface StateTransitionComparator<T> extends
Comparator<StateTransition<T>>, Serializable {}
- private final Comparator<StateTransition<T>> stateTransitionComparator
= new StateTransitionComparator<T>() {
- private static final long serialVersionUID =
-2775474935413622278L;
+ * Structures to keep decisions based on the transition actions, that
counts the number of taken actions.
+ */
+ private static class OutgoingEdges<T> {
+ private List<StateTransition<T>> edges = new ArrayList<>();
- @Override
- public int compare(final StateTransition<T> o1, final
StateTransition<T> o2) {
- if (o1.getAction() == o2.getAction()) {
- return 0;
+ private final State<T> currentState;
+
+ private int totalTakeBranches = 0;
+ private int totalIgnoreBranches = 0;
+
+ OutgoingEdges(final State<T> currentState) {
+ this.currentState = currentState;
+ }
+
+ void add(StateTransition<T> edge) {
+
+ if (!isSelfIgnore(edge)) {
+ if (edge.getAction() ==
StateTransitionAction.IGNORE) {
+ totalIgnoreBranches++;
+ } else if (edge.getAction() ==
StateTransitionAction.TAKE) {
+ totalTakeBranches++;
+ }
}
- return o1.getAction() == StateTransitionAction.IGNORE ?
1 : -1;
+
+ edges.add(edge);
+ }
+
+ int getTotalIgnoreBranches() {
+ return totalIgnoreBranches;
+ }
+ int getTotalTakeBranches() {
+ return totalTakeBranches;
}
- };
+
+ List<StateTransition<T>> getEdges() {
+ return edges;
+ }
+
--- End diff --
Why is this method needed? The two end-points of the IGNORE edge, won't
they be the same state?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---