Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2199#discussion_r69552555
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
---
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * A utility class that facilitates the testing of custom {@link Trigger
Triggers} and
+ * {@link WindowAssigner WindowAssigners}. For examples on how to use this
class, the user
+ * can read the {@link
org.apache.flink.streaming.runtime.operators.windowing.WindowingTestHarnessTest}.
+ */
+public class WindowingTestHarness<K, IN, W extends Window> {
+
+ private final TestTimeServiceProvider timeServiceProvider;
+
+ private final OneInputStreamOperatorTestHarness<IN, IN> testHarness;
+
+ private final ConcurrentLinkedQueue<Object> expectedOutputs = new
ConcurrentLinkedQueue<>();
+
+ private volatile boolean isOpen = false;
+
+ public WindowingTestHarness(ExecutionConfig executionConfig,
+
WindowAssigner<? super IN, W> windowAssigner,
+
TypeInformation<K> keyType,
+
TypeInformation<IN> inputType,
+
TypeSerializer<W> windowSerializer,
+ KeySelector<IN,
K> keySelector,
+ Trigger<? super
IN, ? super W> trigger,
+ long
allowedLateness) {
+
+ ListStateDescriptor<IN> windowStateDesc = new
ListStateDescriptor<>("window-contents",
+ inputType.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<K, IN, Iterable<IN>, IN, W> operator =
+ new WindowOperator<>(
+ windowAssigner,
+ windowSerializer,
+ keySelector,
+ keyType.createSerializer(executionConfig),
+ windowStateDesc,
+ new InternalIterableWindowFunction<>(new
PassThroughFunction()),
+ trigger,
+ allowedLateness);
+
+ operator.setInputType(inputType, executionConfig);
+
+ timeServiceProvider = new TestTimeServiceProvider();
+ testHarness = new OneInputStreamOperatorTestHarness<>(operator,
executionConfig, timeServiceProvider);
+ testHarness.configureForKeyedStream(keySelector, keyType);
+ }
+
+ /**
+ * Simulates the processing of a new incoming element.
+ */
+ public void processElement(IN element, long timestamp) throws Exception
{
+ openOperator();
+ testHarness.processElement(new StreamRecord<>(element,
timestamp));
+ }
+
+ /**
+ * Simulates the processing of a new incoming watermark.
+ */
+ public void processWatermark(long timestamp) throws Exception {
+ openOperator();
+ testHarness.processWatermark(new Watermark(timestamp));
+ }
+
+ /**
+ * Sets the current processing time to {@code timestamp}.
+ * This is useful when working on processing time.
+ */
+ public void setProcessingTime(long timestamp) throws Exception {
+ openOperator();
+ timeServiceProvider.setCurrentTime(timestamp);
+ }
+
+ /**
+ * Gets the current output of the windowing operator, as produced by the
+ * synergies between the window assigner and the trigger. This will also
+ * contain the received watermarks.
+ */
+ public ConcurrentLinkedQueue<Object> getOutput() throws Exception {
+ return testHarness.getOutput();
+ }
+
+ /**
+ * Closes the testing window operator.
+ */
+ public void close() throws Exception {
+ if (isOpen) {
+ testHarness.close();
+ isOpen = false;
+ }
+ }
+
+ /**
+ * Adds a watermark to the expected output.
+ * The expected output should contain the elements and watermarks that
we expect the output of the operator to
+ * contain, in the correct order. This will be used to check if the
produced output is the expected one, and
+ * thus determine the success or failure of the test.
+ */
+ public void addExpectedWatermark(long timestamp) {
+ expectedOutputs.add(new Watermark(timestamp));
+ }
+
+ /**
+ * Adds an element to the expected output.
+ * The expected output should contain the elements and watermarks that
we expect the output of the operator to
+ * contain, in the correct order. This will be used to check if the
produced output is the expected one, and
+ * thus determine the success or failure of the test.
+ */
+ public void addExpectedElement(IN element, long timestamp) {
+ expectedOutputs.add(new StreamRecord<>(element, timestamp));
+ }
+
+ /**
--- End diff --
We should maybe mention that this internally uses an assert and the test
thus fails when the expected output does not match the actual output.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---