kennknowles commented on a change in pull request #1083:
URL: https://github.com/apache/beam/pull/1083#discussion_r737907783
##########
File path:
runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
##########
@@ -81,66 +91,75 @@ public void injectElements(int... values) throws Exception {
injectElements(timestampedValues);
}
- public SimpleTriggerTester<W> withAllowedLateness(Duration
allowedLateness) throws Exception {
- return new SimpleTriggerTester<>(
- windowingStrategy.withAllowedLateness(allowedLateness));
+ public SimpleTriggerStateMachineTester<W> withAllowedLateness(Duration
allowedLateness)
+ throws Exception {
+ return new SimpleTriggerStateMachineTester<>(
+ executableTrigger,
+ windowFn,
+ allowedLateness);
}
}
- protected final WindowingStrategy<Object, W> windowingStrategy;
-
private final TestInMemoryStateInternals<?> stateInternals =
new TestInMemoryStateInternals<Object>(null /* key */);
private final InMemoryTimerInternals timerInternals = new
InMemoryTimerInternals();
- private final TriggerContextFactory<W> contextFactory;
- private final WindowFn<Object, W> windowFn;
+ private final TriggerStateMachineContextFactory<W> contextFactory;
+ protected final WindowFn<Object, W> windowFn;
private final ActiveWindowSet<W> activeWindows;
private final Map<W, W> windowToMergeResult;
/**
- * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link
Trigger}
- * under test.
+ * An {@link ExecutableTriggerStateMachine} under test.
*/
- private final ExecutableTrigger executableTrigger;
+ protected final ExecutableTriggerStateMachine executableTrigger;
/**
* A map from a window and trigger to whether that trigger is finished for
the window.
*/
private final Map<W, FinishedTriggers> finishedSets;
- public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger(
- Trigger trigger, WindowFn<Object, W> windowFn)
+ public static <W extends BoundedWindow> SimpleTriggerStateMachineTester<W>
forTrigger(
+ TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn)
throws Exception {
- WindowingStrategy<Object, W> windowingStrategy =
- WindowingStrategy.of(windowFn).withTrigger(trigger)
- // Merging requires accumulation mode or early firings can break up a
session.
- // Not currently an issue with the tester (because we never GC) but we
don't want
- // mystery failures due to violating this need.
- .withMode(windowFn.isNonMerging()
- ? AccumulationMode.DISCARDING_FIRED_PANES
- : AccumulationMode.ACCUMULATING_FIRED_PANES);
- return new SimpleTriggerTester<>(windowingStrategy);
- }
+ ExecutableTriggerStateMachine executableTriggerStateMachine =
+ ExecutableTriggerStateMachine.create(stateMachine);
- public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W>
forAdvancedTrigger(
- Trigger trigger, WindowFn<Object, W> windowFn) throws Exception {
- WindowingStrategy<Object, W> strategy =
- WindowingStrategy.of(windowFn).withTrigger(trigger)
- // Merging requires accumulation mode or early firings can break up a
session.
- // Not currently an issue with the tester (because we never GC) but we
don't want
- // mystery failures due to violating this need.
- .withMode(windowFn.isNonMerging()
+ // Merging requires accumulation mode or early firings can break up a
session.
+ // Not currently an issue with the tester (because we never GC) but we
don't want
+ // mystery failures due to violating this need.
+ AccumulationMode mode =
+ windowFn.isNonMerging()
? AccumulationMode.DISCARDING_FIRED_PANES
- : AccumulationMode.ACCUMULATING_FIRED_PANES);
+ : AccumulationMode.ACCUMULATING_FIRED_PANES;
+
+ return new SimpleTriggerStateMachineTester<>(
+ executableTriggerStateMachine, windowFn, Duration.ZERO);
+ }
- return new TriggerTester<>(strategy);
+ public static <InputT, W extends BoundedWindow>
+ TriggerStateMachineTester<InputT, W> forAdvancedTrigger(
+ TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn)
throws Exception {
+ ExecutableTriggerStateMachine executableTriggerStateMachine =
+ ExecutableTriggerStateMachine.create(stateMachine);
+
+ // Merging requires accumulation mode or early firings can break up a
session.
+ // Not currently an issue with the tester (because we never GC) but we
don't want
+ // mystery failures due to violating this need.
+ AccumulationMode mode =
Review comment:
If it is unused since 2016, go ahead and remove :-)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]