Repository: flink Updated Branches: refs/heads/master 6b55e2ca3 -> d1475ee86
http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 6238e6c..104bc7b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -66,7 +66,6 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; -import org.apache.flink.streaming.util.WindowingTestHarness; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -2460,174 +2459,4 @@ public class WindowOperatorTest extends TestLogger { return "EventTimeTrigger()"; } } - - @Test - public void testEventTimeTumblingWindowsWithOffset() throws Exception { - final int WINDOW_SIZE = 2000; - final int OFFSET = 100; - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(OFFSET)); - - WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - EventTimeTrigger.create(), - 0); - - // normal element - testHarness.processElement(new Tuple2<>("key2", 1), 1000); - testHarness.processWatermark(1985); - - testHarness.addExpectedWatermark(1985); - - testHarness.processElement(new Tuple2<>("key2", 2), 1980); - testHarness.processElement(new Tuple2<>("key2", 3), 1998); - testHarness.processElement(new Tuple2<>("key2", 4), 2001); - - // verify that this does not yet fire our windows, as it would without offsets - testHarness.processWatermark(2010); - testHarness.addExpectedWatermark(2010); - - testHarness.processWatermark(2999); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2", 2), 1999 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2", 3), 1999 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2", 4), 1999 + OFFSET); - - testHarness.addExpectedWatermark(2999); - - testHarness.processWatermark(3999); - testHarness.addExpectedWatermark(3999); - - testHarness.compareActualToExpectedOutput("Output is not correct"); - - testHarness.close(); - } - - @Test - public void testEventTimeSlidingWindowsWithOffset() throws Exception { - final int WINDOW_SIZE = 2000; - final int SLIDE = 500; - final int OFFSET = 10; - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - SlidingEventTimeWindows windowAssigner = SlidingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(SLIDE),Time.milliseconds(OFFSET)); - - WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - EventTimeTrigger.create(), - 0); - - testHarness.processElement(new Tuple2<>("key2", 1), 333); - testHarness.processWatermark(6666); - - testHarness.addExpectedElement(new Tuple2<>("key2",1),499 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2",1),999 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2",1),1499 + OFFSET); - testHarness.addExpectedElement(new Tuple2<>("key2",1),1999 + OFFSET); - testHarness.addExpectedWatermark(6666); - testHarness.compareActualToExpectedOutput("Output is not correct"); - - testHarness.close(); - } - - @Test - public void testProcessingTimeTumblingWindowsWithOffset() throws Exception { - final int WINDOW_SIZE = 3000; - final int OFFSET = 1000; - - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE), - Time.milliseconds(OFFSET)); - - WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - ProcessingTimeTrigger.create(), - 0); - - testHarness.setProcessingTime(3); - - // timestamp is ignored in processing time - testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE); - testHarness.processElement(new Tuple2<>("key2", 1), 7000); - testHarness.processElement(new Tuple2<>("key2", 1), 7000); - - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - - testHarness.setProcessingTime(5000); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - - testHarness.setProcessingTime(7000); - - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.close(); - } - - @Test - public void testProcessingTimeSlidingWindowsWithOffset() throws Exception { - final int WINDOW_SIZE = 3000; - final int SLIDING = 1000; - final int OFFSET = 10; - - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - SlidingProcessingTimeWindows windowAssigner = SlidingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE), - Time.milliseconds(SLIDING),Time.milliseconds(OFFSET)); - - WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - ProcessingTimeTrigger.create(), - 0); - - testHarness.setProcessingTime(3); - - // timestamp is ignored in processing time - testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE); - - testHarness.setProcessingTime(1111); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET - 1); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 999); - - testHarness.processElement(new Tuple2<>("key2", 2),Long.MIN_VALUE); - testHarness.setProcessingTime(2222); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 1999); - testHarness.addExpectedElement(new Tuple2<>("key2", 2), OFFSET + 1999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.close(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java new file mode 100644 index 0000000..449d54b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.streaming.api.windowing.windows.Window; + +/** + * Helper class for emitting a value along with the window information from + * a {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction}. + */ +public class WindowedValue<T, W extends Window> { + private final T value; + private final W window; + + public WindowedValue(T value, W window) { + this.value = value; + this.window = window; + } + + public T value() { + return value; + } + + public W window() { + return window; + } + + @Override + public String toString() { + return "WindowedValue(" + value + ", " + window + ")"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java deleted file mode 100644 index 82c3d71..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windowing; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; -import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.WindowingTestHarness; -import org.junit.Test; - -public class WindowingTestHarnessTest { - - @Test - public void testEventTimeTumblingWindows() throws Exception { - final int WINDOW_SIZE = 2000; - - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)); - - WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - EventTimeTrigger.create(), - 0); - - // normal element - testHarness.processElement(new Tuple2<>("key2", 1), 1000); - testHarness.processWatermark(1985); - - testHarness.addExpectedWatermark(1985); - - // this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark - testHarness.processElement(new Tuple2<>("key2", 1), 1980); - - // dropped as late - testHarness.processElement(new Tuple2<>("key2", 1), 1998); - - testHarness.processElement(new Tuple2<>("key2", 1), 2001); - testHarness.processWatermark(2999); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999); - testHarness.addExpectedWatermark(2999); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 3999); - - testHarness.processWatermark(3999); - testHarness.addExpectedWatermark(3999); - - testHarness.compareActualToExpectedOutput("Output is not correct"); - - testHarness.close(); - } - - @Test - public void testProcessingTimeTumblingWindows() throws Exception { - final int WINDOW_SIZE = 3000; - - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE)); - - WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - ProcessingTimeTrigger.create(), - 0); - - testHarness.setProcessingTime(3); - - // timestamp is ignored in processing time - testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE); - testHarness.processElement(new Tuple2<>("key2", 1), 7000); - testHarness.processElement(new Tuple2<>("key2", 1), 7000); - - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - - testHarness.setProcessingTime(5000); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - testHarness.processElement(new Tuple2<>("key1", 1), 7000); - - testHarness.setProcessingTime(7000); - - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.close(); - } - - @Test - public void testSnapshotingAndRecovery() throws Exception { - - final int WINDOW_SIZE = 3000; - - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE)); - - WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>( - windowAssigner, - BasicTypeInfo.STRING_TYPE_INFO, - inputType, - new TupleKeySelector(), - EventTimeTrigger.create(), - 0); - - // add elements out-of-order - testHarness.processElement(new Tuple2<>("key2", 1), 3999); - testHarness.processElement(new Tuple2<>("key2", 1), 3000); - - testHarness.processElement(new Tuple2<>("key1", 1), 20); - testHarness.processElement(new Tuple2<>("key1", 1), 0); - testHarness.processElement(new Tuple2<>("key1", 1), 999); - - testHarness.processElement(new Tuple2<>("key2", 1), 1998); - testHarness.processElement(new Tuple2<>("key2", 1), 1999); - testHarness.processElement(new Tuple2<>("key2", 1), 1000); - - testHarness.processWatermark(999); - testHarness.addExpectedWatermark(999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.processWatermark(1999); - testHarness.addExpectedWatermark(1999); - testHarness.compareActualToExpectedOutput("Output was not correct."); - - // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); - testHarness.close(); - testHarness.restore(snapshot); - - testHarness.processWatermark(2999); - - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999); - testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999); - - testHarness.addExpectedWatermark(2999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.processWatermark(3999); - testHarness.addExpectedWatermark(3999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.processWatermark(4999); - testHarness.addExpectedWatermark(4999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - testHarness.processWatermark(5999); - - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 5999); - testHarness.addExpectedElement(new Tuple2<>("key2", 1), 5999); - testHarness.addExpectedWatermark(5999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - - - // those don't have any effect... - testHarness.processWatermark(6999); - testHarness.processWatermark(7999); - - testHarness.addExpectedWatermark(6999); - testHarness.addExpectedWatermark(7999); - - testHarness.compareActualToExpectedOutput("Output was not correct."); - } - - private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple2<String, Integer> value) throws Exception { - return value.f0; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 7fe4ebc..568410a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.util; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest; import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -554,6 +556,24 @@ public class AbstractStreamOperatorTestHarness<OUT> { return wasFailedExternally; } + @VisibleForTesting + public int numProcessingTimeTimers() { + if (operator instanceof AbstractStreamOperator) { + return ((AbstractStreamOperator) operator).numProcessingTimeTimers(); + } else { + throw new UnsupportedOperationException(); + } + } + + @VisibleForTesting + public int numEventTimeTimers() { + if (operator instanceof AbstractStreamOperator) { + return ((AbstractStreamOperator) operator).numEventTimeTimers(); + } else { + throw new UnsupportedOperationException(); + } + } + private class MockOutput implements Output<StreamRecord<OUT>> { private TypeSerializer<OUT> outputSerializer; http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 4abb6e2..cde5780 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -188,6 +189,22 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> return false; } + public int numKeyedStateEntries() { + if (keyedStateBackend instanceof HeapKeyedStateBackend) { + return ((HeapKeyedStateBackend) keyedStateBackend).numStateEntries(); + } else { + throw new UnsupportedOperationException(); + } + } + + public <N> int numKeyedStateEntries(N namespace) { + if (keyedStateBackend instanceof HeapKeyedStateBackend) { + return ((HeapKeyedStateBackend) keyedStateBackend).numStateEntries(namespace); + } else { + throw new UnsupportedOperationException(); + } + } + @Override public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { if (operatorStateHandles != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java index 58e8c6b..4b6925d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java @@ -36,26 +36,13 @@ import static org.junit.Assert.assertEquals; * Utils for working with the various test harnesses. */ public class TestHarnessUtil { - /** - * Extracts the StreamRecords from the given output list. - */ - @SuppressWarnings("unchecked") - public static <OUT> List<StreamRecord<OUT>> getStreamRecordsFromOutput(List<Object> output) { - List<StreamRecord<OUT>> resultElements = new LinkedList<StreamRecord<OUT>>(); - for (Object e: output) { - if (e instanceof StreamRecord) { - resultElements.add((StreamRecord<OUT>) e); - } - } - return resultElements; - } /** * Extracts the raw elements from the given output list. */ @SuppressWarnings("unchecked") public static <OUT> List<OUT> getRawElementsFromOutput(Queue<Object> output) { - List<OUT> resultElements = new LinkedList<OUT>(); + List<OUT> resultElements = new LinkedList<>(); for (Object e: output) { if (e instanceof StreamRecord) { resultElements.add(((StreamRecord<OUT>) e).getValue()); http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java deleted file mode 100644 index efb0d7e..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; -import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.util.Collector; -import org.apache.flink.util.Preconditions; - -import java.util.Comparator; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * A utility class that facilitates the testing of custom {@link Trigger Triggers} and - * {@link WindowAssigner WindowAssigners}. - * - * <p>For examples on how to use this class, see - * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowingTestHarnessTest}. - * - * <p>The input elements of type {@code IN} must implement the {@code equals()} method because - * it is used to compare the expected output to the actual output. - */ -public class WindowingTestHarness<K, IN, W extends Window> { - - private final OneInputStreamOperatorTestHarness<IN, IN> testHarness; - - private final ConcurrentLinkedQueue<Object> expectedOutputs = new ConcurrentLinkedQueue<>(); - - private volatile boolean isOpen = false; - - public WindowingTestHarness(WindowAssigner<? super IN, W> windowAssigner, - TypeInformation<K> keyType, - TypeInformation<IN> inputType, - KeySelector<IN, K> keySelector, - Trigger<? super IN, ? super W> trigger, - long allowedLateness) throws Exception { - - ListStateDescriptor<IN> windowStateDesc = - new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); - - WindowOperator<K, IN, Iterable<IN>, IN, W> operator = - new WindowOperator<>( - windowAssigner, - windowAssigner.getWindowSerializer(new ExecutionConfig()), - keySelector, - keyType.createSerializer(new ExecutionConfig()), - windowStateDesc, - new InternalIterableWindowFunction<>(new PassThroughFunction()), - trigger, - allowedLateness); - - testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType); - } - - /** - * Simulates the processing of a new incoming element. - */ - public void processElement(IN element, long timestamp) throws Exception { - openOperator(); - testHarness.processElement(new StreamRecord<>(element, timestamp)); - } - - /** - * Simulates the processing of a new incoming watermark. - */ - public void processWatermark(long timestamp) throws Exception { - openOperator(); - testHarness.processWatermark(new Watermark(timestamp)); - } - - /** - * Sets the current processing time to {@code timestamp}. - * This is useful when working on processing time. - */ - public void setProcessingTime(long timestamp) throws Exception { - openOperator(); - testHarness.setProcessingTime(timestamp); - } - - /** - * Gets the current output of the windowing operator, as produced by the - * synergies between the window assigner and the trigger. This will also - * contain the received watermarks. - */ - public ConcurrentLinkedQueue<Object> getOutput() throws Exception { - return testHarness.getOutput(); - } - - /** - * Closes the testing window operator. - */ - public void close() throws Exception { - if (isOpen) { - testHarness.close(); - isOpen = false; - } - } - - /** - * Adds a watermark to the expected output. - * - * <p>The expected output should contain the elements and watermarks that we expect the output of the operator to - * contain, in the correct order. This will be used to check if the produced output is the expected one, and - * thus determine the success or failure of the test. - */ - public void addExpectedWatermark(long timestamp) { - expectedOutputs.add(new Watermark(timestamp)); - } - - /** - * Adds an element to the expected output. - * - * <p>The expected output should contain the elements and watermarks that we expect the output of the operator to - * contain, in the correct order. This will be used to check if the produced output is the expected one, and - * thus determine the success or failure of the test. - */ - public void addExpectedElement(IN element, long timestamp) { - expectedOutputs.add(new StreamRecord<>(element, timestamp)); - } - - /** - * Compares the current produced output with the expected one. The latter contains elements and watermarks added - * using the {@link #addExpectedElement(Object, long)} and {@link #addExpectedWatermark(long)} methods. - * - * <p><b>NOTE:</b> This methods uses an {@code assert()} internally, thus failing the test if the {@code expected} output - * does not match the {@code actual} one. - */ - public void compareActualToExpectedOutput(String errorMessage) { - TestHarnessUtil.assertOutputEqualsSorted(errorMessage, expectedOutputs, testHarness.getOutput(), new StreamRecordComparator()); - } - - /** - * Takes a snapshot of the current state of the operator. This can be used to test fault-tolerance. - */ - public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception { - return testHarness.snapshot(checkpointId, timestamp); - } - - /** - * Resumes execution from the provided {@link OperatorStateHandles}. This is used to test recovery after a failure. - */ - public void restore(OperatorStateHandles stateHandles) throws Exception { - Preconditions.checkArgument(!isOpen, - "You are trying to restore() while the operator is still open. " + - "Please call close() first."); - - testHarness.setup(); - testHarness.initializeState(stateHandles); - openOperator(); - } - - private void openOperator() throws Exception { - if (!isOpen) { - testHarness.open(); - isOpen = true; - } - } - - private class PassThroughFunction implements WindowFunction<IN, IN, K, W> { - private static final long serialVersionUID = 1L; - - @Override - public void apply(K k, W window, Iterable<IN> input, Collector<IN> out) throws Exception { - for (IN in: input) { - out.collect(in); - } - } - } - - /** - * {@link Comparator} for sorting the expected and actual output by timestamp. - */ - @SuppressWarnings("unchecked") - private class StreamRecordComparator implements Comparator<Object> { - @Override - public int compare(Object o1, Object o2) { - if (o1 instanceof Watermark || o2 instanceof Watermark) { - return 0; - } else { - StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1; - StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2; - if (sr0.getTimestamp() != sr1.getTimestamp()) { - return (int) (sr0.getTimestamp() - sr1.getTimestamp()); - } - int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0); - if (comparison != 0) { - return comparison; - } else { - return sr0.getValue().f1 - sr1.getValue().f1; - } - } - } - } -}
