http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java deleted file mode 100644 index a66f74f..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.transforms.windowing.AfterAll; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; - -/** - * Generalized tests for {@link FinishedTriggers} implementations. - */ -public class FinishedTriggersProperties { - /** - * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set - * finished, it is correctly reported as finished. - */ - public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) { - assertFalse(finishedSet.isFinished(trigger)); - finishedSet.setFinished(trigger, true); - assertTrue(finishedSet.isFinished(trigger)); - } - - /** - * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly - * reported as finished. - */ - public static void verifyGetAfterSet(FinishedTriggers finishedSet) { - ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( - AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()), - AfterAll.of( - AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane()))); - - verifyGetAfterSet(finishedSet, trigger); - verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1)); - verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0)); - verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1)); - verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1)); - verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0)); - } - - /** - * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no - * others. - */ - public static void verifyClearRecursively(FinishedTriggers finishedSet) { - ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( - AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()), - AfterAll.of( - AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane()))); - - // Set them all finished. This method is not on a trigger as it makes no sense outside tests. - setFinishedRecursively(finishedSet, trigger); - assertTrue(finishedSet.isFinished(trigger)); - assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0))); - assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0))); - assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1))); - - // Clear just the second AfterAll - finishedSet.clearRecursively(trigger.subTriggers().get(1)); - - // Check that the first and all that are still finished - assertTrue(finishedSet.isFinished(trigger)); - verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0)); - verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1)); - } - - private static void setFinishedRecursively( - FinishedTriggers finishedSet, ExecutableTrigger trigger) { - finishedSet.setFinished(trigger, true); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { - setFinishedRecursively(finishedSet, subTrigger); - } - } - - private static void verifyFinishedRecursively( - FinishedTriggers finishedSet, ExecutableTrigger trigger) { - assertTrue(finishedSet.isFinished(trigger)); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { - verifyFinishedRecursively(finishedSet, subTrigger); - } - } - - private static void verifyUnfinishedRecursively( - FinishedTriggers finishedSet, ExecutableTrigger trigger) { - assertFalse(finishedSet.isFinished(trigger)); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { - verifyUnfinishedRecursively(finishedSet, subTrigger); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java deleted file mode 100644 index 072d264..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.theInstance; -import static org.junit.Assert.assertThat; - -import java.util.HashSet; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link FinishedTriggersSet}. - */ -@RunWith(JUnit4.class) -public class FinishedTriggersSetTest { - /** - * Tests that after a trigger is set to finished, it reads back as finished. - */ - @Test - public void testSetGet() { - FinishedTriggersProperties.verifyGetAfterSet( - FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>())); - } - - /** - * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no - * others. - */ - @Test - public void testClearRecursively() { - FinishedTriggersProperties.verifyClearRecursively( - FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>())); - } - - @Test - public void testCopy() throws Exception { - FinishedTriggersSet finishedSet = - FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()); - assertThat(finishedSet.copy().getFinishedTriggers(), - not(theInstance(finishedSet.getFinishedTriggers()))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java index 83077f4..63c71ed 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java @@ -18,15 +18,9 @@ package org.apache.beam.sdk.util; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.joda.time.Duration; -import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -43,23 +37,6 @@ public class ReshuffleTriggerTest { } @Test - public void testShouldFire() throws Exception { - TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger( - new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100))); - IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new Instant(400)); - assertTrue(tester.shouldFire(arbitraryWindow)); - } - - @Test - public void testOnTimer() throws Exception { - TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger( - new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100))); - IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new Instant(200)); - tester.fireIfShouldFire(arbitraryWindow); - assertFalse(tester.isMarkedFinished(arbitraryWindow)); - } - - @Test public void testToString() { Trigger trigger = new ReshuffleTrigger<>(); assertEquals("ReshuffleTrigger()", trigger.toString()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java deleted file mode 100644 index 5fe17ad..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ /dev/null @@ -1,410 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.InMemoryTimerInternals; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; -import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; -import org.apache.beam.sdk.util.state.TimerCallback; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Test utility that runs a {@link Trigger}, using in-memory stub implementation to provide - * the {@link StateInternals}. - * - * @param <W> The type of windows being used. - */ -public class TriggerTester<InputT, W extends BoundedWindow> { - - /** - * A {@link TriggerTester} specialized to {@link Integer} values, so elements and timestamps - * can be conflated. Today, triggers should not observed the element type, so this is the - * only trigger tester that needs to be used. - */ - public static class SimpleTriggerTester<W extends BoundedWindow> - extends TriggerTester<Integer, W> { - - private SimpleTriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception { - super(windowingStrategy); - } - - public void injectElements(int... values) throws Exception { - List<TimestampedValue<Integer>> timestampedValues = - Lists.newArrayListWithCapacity(values.length); - for (int value : values) { - timestampedValues.add(TimestampedValue.of(value, new Instant(value))); - } - injectElements(timestampedValues); - } - - public SimpleTriggerTester<W> withAllowedLateness(Duration allowedLateness) throws Exception { - return new SimpleTriggerTester<>( - windowingStrategy.withAllowedLateness(allowedLateness)); - } - } - - protected final WindowingStrategy<Object, W> windowingStrategy; - - private final TestInMemoryStateInternals<?> stateInternals = - new TestInMemoryStateInternals<Object>(null /* key */); - private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); - private final TriggerContextFactory<W> contextFactory; - private final WindowFn<Object, W> windowFn; - private final ActiveWindowSet<W> activeWindows; - private final Map<W, W> windowToMergeResult; - - /** - * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link Trigger} - * under test. - */ - private final ExecutableTrigger executableTrigger; - - /** - * A map from a window and trigger to whether that trigger is finished for the window. - */ - private final Map<W, FinishedTriggers> finishedSets; - - public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger( - Trigger trigger, WindowFn<Object, W> windowFn) - throws Exception { - WindowingStrategy<Object, W> windowingStrategy = - WindowingStrategy.of(windowFn).withTrigger(trigger) - // Merging requires accumulation mode or early firings can break up a session. - // Not currently an issue with the tester (because we never GC) but we don't want - // mystery failures due to violating this need. - .withMode(windowFn.isNonMerging() - ? AccumulationMode.DISCARDING_FIRED_PANES - : AccumulationMode.ACCUMULATING_FIRED_PANES); - - return new SimpleTriggerTester<>(windowingStrategy); - } - - public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> forAdvancedTrigger( - Trigger trigger, WindowFn<Object, W> windowFn) throws Exception { - WindowingStrategy<Object, W> strategy = - WindowingStrategy.of(windowFn).withTrigger(trigger) - // Merging requires accumulation mode or early firings can break up a session. - // Not currently an issue with the tester (because we never GC) but we don't want - // mystery failures due to violating this need. - .withMode(windowFn.isNonMerging() - ? AccumulationMode.DISCARDING_FIRED_PANES - : AccumulationMode.ACCUMULATING_FIRED_PANES); - - return new TriggerTester<>(strategy); - } - - protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception { - this.windowingStrategy = windowingStrategy; - this.windowFn = windowingStrategy.getWindowFn(); - this.executableTrigger = windowingStrategy.getTrigger(); - this.finishedSets = new HashMap<>(); - - this.activeWindows = - windowFn.isNonMerging() - ? new NonMergingActiveWindowSet<W>() - : new MergingActiveWindowSet<W>(windowFn, stateInternals); - this.windowToMergeResult = new HashMap<>(); - - this.contextFactory = - new TriggerContextFactory<>(windowingStrategy.getWindowFn(), stateInternals, activeWindows); - } - - /** - * Instructs the trigger to clear its state for the given window. - */ - public void clearState(W window) throws Exception { - executableTrigger.invokeClear(contextFactory.base(window, - new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window))); - } - - /** - * Asserts that the trigger has actually cleared all of its state for the given window. Since - * the trigger under test is the root, this makes the assert for all triggers regardless - * of their position in the trigger tree. - */ - public void assertCleared(W window) { - for (StateNamespace untypedNamespace : stateInternals.getNamespacesInUse()) { - if (untypedNamespace instanceof WindowAndTriggerNamespace) { - @SuppressWarnings("unchecked") - WindowAndTriggerNamespace<W> namespace = (WindowAndTriggerNamespace<W>) untypedNamespace; - if (namespace.getWindow().equals(window)) { - Set<?> tagsInUse = stateInternals.getTagsInUse(namespace); - assertTrue("Trigger has not cleared tags: " + tagsInUse, tagsInUse.isEmpty()); - } - } - } - } - - /** - * Returns {@code true} if the {@link Trigger} under test is finished for the given window. - */ - public boolean isMarkedFinished(W window) { - FinishedTriggers finishedSet = finishedSets.get(window); - if (finishedSet == null) { - return false; - } - - return finishedSet.isFinished(executableTrigger); - } - - private StateNamespace windowNamespace(W window) { - return StateNamespaces.window(windowFn.windowCoder(), checkNotNull(window)); - } - - /** - * Advance the input watermark to the specified time, then advance the output watermark as far as - * possible. - */ - public void advanceInputWatermark(Instant newInputWatermark) throws Exception { - // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 - timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark); - } - - /** Advance the processing time to the specified time. */ - public void advanceProcessingTime(Instant newProcessingTime) throws Exception { - // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 - timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime); - } - - /** - * Inject all the timestamped values (after passing through the window function) as if they - * arrived in a single chunk of a bundle (or work-unit). - */ - @SafeVarargs - public final void injectElements(TimestampedValue<InputT>... values) throws Exception { - injectElements(Arrays.asList(values)); - } - - public final void injectElements(Collection<TimestampedValue<InputT>> values) throws Exception { - for (TimestampedValue<InputT> value : values) { - WindowTracing.trace("TriggerTester.injectElements: {}", value); - } - - List<WindowedValue<InputT>> windowedValues = Lists.newArrayListWithCapacity(values.size()); - - for (TimestampedValue<InputT> input : values) { - try { - InputT value = input.getValue(); - Instant timestamp = input.getTimestamp(); - Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>( - windowFn, value, timestamp, GlobalWindow.INSTANCE)); - - for (W window : assignedWindows) { - activeWindows.addActiveForTesting(window); - - // Today, triggers assume onTimer firing at the watermark time, whether or not they - // explicitly set the timer themselves. So this tester must set it. - timerInternals.setTimer( - TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME)); - } - - windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - for (WindowedValue<InputT> windowedValue : windowedValues) { - for (BoundedWindow untypedWindow : windowedValue.getWindows()) { - // SDK is responsible for type safety - @SuppressWarnings("unchecked") - W window = mergeResult((W) untypedWindow); - - Trigger.OnElementContext context = contextFactory.createOnElementContext(window, - new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(), - executableTrigger, getFinishedSet(window)); - - if (!context.trigger().isFinished()) { - executableTrigger.invokeOnElement(context); - } - } - } - } - - public boolean shouldFire(W window) throws Exception { - Trigger.TriggerContext context = contextFactory.base( - window, - new TestTimers(windowNamespace(window)), - executableTrigger, getFinishedSet(window)); - executableTrigger.getSpec().prefetchShouldFire(context.state()); - return executableTrigger.invokeShouldFire(context); - } - - public void fireIfShouldFire(W window) throws Exception { - Trigger.TriggerContext context = contextFactory.base( - window, - new TestTimers(windowNamespace(window)), - executableTrigger, getFinishedSet(window)); - - executableTrigger.getSpec().prefetchShouldFire(context.state()); - if (executableTrigger.invokeShouldFire(context)) { - executableTrigger.getSpec().prefetchOnFire(context.state()); - executableTrigger.invokeOnFire(context); - if (context.trigger().isFinished()) { - activeWindows.remove(window); - executableTrigger.invokeClear(context); - } - } - } - - public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, boolean value) { - getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex), value); - } - - /** - * Invokes merge from the {@link WindowFn} a single time and passes the resulting merge - * events on to the trigger under test. Does not persist the fact that merging happened, - * since it is just to test the trigger's {@code OnMerge} method. - */ - public final void mergeWindows() throws Exception { - windowToMergeResult.clear(); - activeWindows.merge(new MergeCallback<W>() { - @Override - public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {} - - @Override - public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { - List<W> activeToBeMerged = new ArrayList<W>(); - for (W window : toBeMerged) { - windowToMergeResult.put(window, mergeResult); - if (activeWindows.isActive(window)) { - activeToBeMerged.add(window); - } - } - Map<W, FinishedTriggers> mergingFinishedSets = - Maps.newHashMapWithExpectedSize(activeToBeMerged.size()); - for (W oldWindow : activeToBeMerged) { - mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow)); - } - executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult, - new TestTimers(windowNamespace(mergeResult)), executableTrigger, - getFinishedSet(mergeResult), mergingFinishedSets)); - timerInternals.setTimer(TimerData.of( - windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME)); - } - }); - } - - public W mergeResult(W window) { - W result = windowToMergeResult.get(window); - return result == null ? window : result; - } - - private FinishedTriggers getFinishedSet(W window) { - FinishedTriggers finishedSet = finishedSets.get(window); - if (finishedSet == null) { - finishedSet = FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()); - finishedSets.put(window, finishedSet); - } - return finishedSet; - } - - private static class TestAssignContext<W extends BoundedWindow> - extends WindowFn<Object, W>.AssignContext { - private Object element; - private Instant timestamp; - private BoundedWindow window; - - public TestAssignContext( - WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) { - windowFn.super(); - this.element = element; - this.timestamp = timestamp; - this.window = window; - } - - @Override - public Object element() { - return element; - } - - @Override - public Instant timestamp() { - return timestamp; - } - - @Override - public BoundedWindow window() { - return window; - } - } - - private class TestTimers implements Timers { - private final StateNamespace namespace; - - public TestTimers(StateNamespace namespace) { - checkArgument(namespace instanceof WindowNamespace); - this.namespace = namespace; - } - - @Override - public void setTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain)); - } - - @Override - public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain)); - } - - @Override - public Instant currentProcessingTime() { - return timerInternals.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timerInternals.currentSynchronizedProcessingTime(); - } - - @Override - public Instant currentEventTime() { - return timerInternals.currentInputWatermarkTime(); - } - } -}