[
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15612222#comment-15612222
]
ASF GitHub Bot commented on FLINK-4552:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85361303
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness<T, W extends Window> {
+
+ private static final Integer KEY = 1;
+
+ private final Trigger<T, W> trigger;
+ private final TypeSerializer<W> windowSerializer;
+
+ private final HeapKeyedStateBackend<Integer> stateBackend;
+ private final TestInternalTimerService<Integer, W> internalTimerService;
+
+ public TriggerTestHarness(
+ Trigger<T, W> trigger,
+ TypeSerializer<W> windowSerializer) throws Exception {
+ this.trigger = trigger;
+ this.windowSerializer = windowSerializer;
+
+ // we only ever use one key, other tests make sure that windows
work across different
+ // keys
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ @SuppressWarnings("unchecked")
+ HeapKeyedStateBackend<Integer> stateBackend =
(HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new
JobID(), new JobVertexID()));
+ this.stateBackend = stateBackend;
+
+ this.stateBackend.setCurrentKey(0);
+
+ this.internalTimerService = new TestInternalTimerService<>(new
KeyContext() {
+ @Override
+ public void setCurrentKey(Object key) {
+ // ignore
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return KEY;
+ }
+ });
+ }
+
+ public int numProcessingTimeTimers() {
+ return internalTimerService.numProcessingTimeTimers();
+ }
+
+ public int numProcessingTimeTimers(W window) {
+ return internalTimerService.numProcessingTimeTimers(window);
+ }
+
+ public int numEventTimeTimers() {
+ return internalTimerService.numEventTimeTimers();
+ }
+
+ public int numEventTimeTimers(W window) {
+ return internalTimerService.numEventTimeTimers(window);
+ }
+
+ public int numStateEntries() {
+ return stateBackend.numStateEntries();
+ }
+
+ public int numStateEntries(W window) {
+ return stateBackend.numStateEntries(window);
+ }
+
+ /**
+ * Injects one element into the trigger for the given window and
returns the result of
+ * {@link Trigger#onElement(Object, long, Window,
Trigger.TriggerContext)}
+ */
+ public TriggerResult processElement(StreamRecord<T> element, W window)
throws Exception {
+ TestTriggerContext<Integer, W> triggerContext = new
TestTriggerContext<>(
+ KEY,
+ window,
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+ return trigger.onElement(element.getValue(),
element.getTimestamp(), window, triggerContext);
+ }
+
+ /**
+ * Advanced processing time and checks whether we have exactly one
firing for the given
+ * window. The result of {@link Trigger#onProcessingTime(long, Window,
Trigger.TriggerContext)}
+ * is returned for that firing.
+ */
+ public TriggerResult advanceProcessingTime(long time, W window) throws
Exception {
+ Collection<Tuple2<W, TriggerResult>> firings =
advanceProcessingTime(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException("Must have exactly one
timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2<W, TriggerResult> firing = firings.iterator().next();
+
+ if (!firing.f0.equals(window)) {
+ throw new IllegalStateException("Trigger fired for
another window.");
+ }
+
+ return firing.f1;
+ }
+
+ /**
+ * Advanced the watermark and checks whether we have exactly one firing
for the given
+ * window. The result of {@link Trigger#onEventTime(long, Window,
Trigger.TriggerContext)}
+ * is returned for that firing.
+ */
+ public TriggerResult advanceWatermark(long time, W window) throws
Exception {
+ Collection<Tuple2<W, TriggerResult>> firings =
advanceWatermark(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException("Must have exactly one
timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2<W, TriggerResult> firing = firings.iterator().next();
--- End diff --
Same here.
> Refactor WindowOperator/Trigger Tests
> -------------------------------------
>
> Key: FLINK-4552
> URL: https://issues.apache.org/jira/browse/FLINK-4552
> Project: Flink
> Issue Type: Improvement
> Components: Windowing Operators
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and
> {{WindowFunction}} are all conflated in {{WindowOperatorTest}}. All of these
> test that a certain combination of a {{Trigger}}, {{WindowAssigner}} and
> {{WindowFunction}} produce the expected output.
> We should modularize these tests and spread them out across multiple files,
> possibly one per trigger, for the triggers. Also, we should extend/change the
> tests in some key ways:
> - {{WindowOperatorTest}} test should just verify that the interaction
> between {{WindowOperator}} and the various other parts works as expected,
> that the correct methods on {{Trigger}} and {{WindowFunction}} are called at
> the expected time and that snapshotting, timers, cleanup etc. work correctly.
> These tests should also verify that the different state types and
> {{WindowFunctions}} work correctly.
> - {{Trigger}} tests should present elements to triggers and verify that they
> fire at the correct times. The actual output of the {{WindowFunction}} is not
> important for these tests. We should also test that triggers correctly clean
> up state and timers.
> - {{WindowAssigner}} tests should test each window assigner and also verify
> that, for example, the offset parameter of time-based windows works correctly.
> There is already {{WindowingTestHarness}} but it is not used by tests, I
> think we can expand on that and provide more thorough test coverage while
> also making the tests more maintainable ({{WindowOperatorTest.java}} is
> nearing 3000 lines of code).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)