http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java deleted file mode 100644 index 0379fe0..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.extractor; - -import static org.junit.Assert.*; - -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple10; -import org.apache.flink.api.java.tuple.Tuple11; -import org.apache.flink.api.java.tuple.Tuple12; -import org.apache.flink.api.java.tuple.Tuple13; -import org.apache.flink.api.java.tuple.Tuple14; -import org.apache.flink.api.java.tuple.Tuple15; -import org.apache.flink.api.java.tuple.Tuple16; -import org.apache.flink.api.java.tuple.Tuple17; -import org.apache.flink.api.java.tuple.Tuple18; -import org.apache.flink.api.java.tuple.Tuple19; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple20; -import org.apache.flink.api.java.tuple.Tuple21; -import org.apache.flink.api.java.tuple.Tuple22; -import org.apache.flink.api.java.tuple.Tuple23; -import org.apache.flink.api.java.tuple.Tuple24; -import org.apache.flink.api.java.tuple.Tuple25; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.tuple.Tuple6; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.api.java.tuple.Tuple8; -import org.apache.flink.api.java.tuple.Tuple9; -import org.apache.flink.streaming.api.windowing.extractor.FieldsFromTuple; -import org.junit.Before; -import org.junit.Test; - -public class FieldsFromTupleTest { - - private double[] testDouble; - - @Before - public void init() { - testDouble = new double[Tuple.MAX_ARITY]; - for (int i = 0; i < Tuple.MAX_ARITY; i++) { - testDouble[i] = i; - } - } - - @Test - public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException { - Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance(); - for (int i = 0; i < Tuple.MAX_ARITY; i++) { - currentTuple.setField(testDouble[i], i); - } - - double[] expected = { testDouble[5], testDouble[3], testDouble[6], testDouble[7], - testDouble[0] }; - arrayEqualityCheck(expected, new FieldsFromTuple(5, 3, 6, 7, 0).extract(currentTuple)); - - double[] expected2 = { testDouble[0], testDouble[Tuple.MAX_ARITY - 1] }; - arrayEqualityCheck(expected2, - new FieldsFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple)); - - double[] expected3 = { testDouble[Tuple.MAX_ARITY - 1], testDouble[0] }; - arrayEqualityCheck(expected3, - new FieldsFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple)); - - double[] expected4 = { testDouble[13], testDouble[4], testDouble[5], testDouble[4], - testDouble[2], testDouble[8], testDouble[6], testDouble[2], testDouble[8], - testDouble[3], testDouble[5], testDouble[2], testDouble[16], testDouble[4], - testDouble[3], testDouble[2], testDouble[6], testDouble[4], testDouble[7], - testDouble[4], testDouble[2], testDouble[8], testDouble[7], testDouble[2] }; - arrayEqualityCheck(expected4, new FieldsFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16, - 4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple)); - } - - private void arrayEqualityCheck(double[] array1, double[] array2) { - assertEquals("The result arrays must have the same length", array1.length, array2.length); - for (int i = 0; i < array1.length; i++) { - assertEquals("Unequal fields at position " + i, array1[i], array2[i], 0d); - } - } - - private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class, - Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, - Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, - Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, - Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, - Tuple24.class, Tuple25.class }; - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java deleted file mode 100644 index 4773f4e..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.policy; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class CountEvictionPolicyTest { - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testCountEvictionPolicy() { - List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - int counter; - - // The count policy should not care about the triggered parameter - // Therefore its value switches after each use in this test. - boolean triggered = false; - // the size of the buffer should not matter as well! - - // Test count of different sizes (0..9) - for (int i = 0; i < 10; i++) { - EvictionPolicy evictionPolicy = new CountEvictionPolicy(i, i); - counter = 0; - - // Test first i steps (should not evict) - for (int j = 0; j < i; j++) { - counter++; - assertEquals("Evictionpolicy with count of " + i + " evicted tuples at add nr. " - + counter + ". It should not evict for the first " + i + " adds.", 0, - evictionPolicy.notifyEviction(tuples.get(j), (triggered = !triggered), - tuples.get(Math.abs((i - j)) % 10))); - } - - // Test the next three evictions - for (int j = 0; j < 3; j++) { - // The first add should evict now - counter++; - assertEquals("Evictionpolicy with count of " + i - + " did not evict correct number of tuples at the expected pos " + counter - + ".", i, evictionPolicy.notifyEviction(tuples.get(j), - (triggered = !triggered), tuples.get(Math.abs((i - j)) % 10))); - - // the next i-1 adds should not evict - for (int k = 0; k < i - 1; k++) { - counter++; - assertEquals("Evictionpolicy with count of " + i - + " evicted tuples at add nr. " + counter, 0, - evictionPolicy.notifyEviction(tuples.get(j), (triggered = !triggered), - tuples.get(Math.abs((i - j)) % 10))); - } - } - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testCountEvictionPolicyStartValuesAndEvictionAmount() { - - // The count policy should not care about the triggered parameter - // Therefore its value switches after each use in this test. - boolean triggered = false; - // the size of the buffer should not matter as well! - - List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - - // Text different eviction amounts (0..3) - for (int x = 0; x < 4; x++) { - - // Test count of different sizes (0..9) - for (int i = 0; i < 10; i++) { - - int counter = 0; - - // Test different start values (-5..5) - for (int j = -5; i < 6; i++) { - EvictionPolicy evictionPolicy = new CountEvictionPolicy(i, x, j); - // Add tuples without eviction - for (int k = 0; k < ((i - j > 0) ? i - j : 0); k++) { - counter++; - assertEquals("Evictionpolicy with count of " + i - + " did not evict correct number of tuples at the expected pos " - + counter + ".", 0, evictionPolicy.notifyEviction( - tuples.get(Math.abs(j)), (triggered = !triggered), - tuples.get(Math.abs((i - j)) % 10))); - } - // Expect eviction - counter++; - assertEquals("Evictionpolicy with count of " + i - + " did not evict correct number of tuples at the expected pos " - + counter + ".", x, evictionPolicy.notifyEviction( - tuples.get(Math.abs(j)), (triggered = !triggered), - tuples.get(Math.abs((i - j)) % 10))); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java deleted file mode 100644 index b7120e7..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.policy; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.streaming.api.windowing.helper.Count; -import org.junit.Test; - -public class CountTriggerPolicyTest { - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testCountTriggerPolicy() { - - List tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - int counter; - - // Test count of different sizes (0..9) - for (int i = 0; i < 10; i++) { - TriggerPolicy triggerPolicy = Count.of(i).toTrigger(); - counter=0; - - // Test first i steps (should not trigger) - for (int j = 0; j < i; j++) { - counter++; - assertFalse("Triggerpolicy with count of " + i + " triggered at add nr. " + counter - + ". It should not trigger for the first " + i + " adds.", - triggerPolicy.notifyTrigger(tuples.get(j))); - } - - // Test the next three triggers - for (int j = 0; j < 3; j++) { - // The first add should trigger now - counter++; - assertTrue("Triggerpolicy with count of " + i - + " did not trigger at the expected pos " + counter + ".", - triggerPolicy.notifyTrigger(tuples.get(j))); - - // the next i-1 adds should not trigger - for (int k = 0; k < i - 1; k++) { - counter++; - assertFalse("Triggerpolicy with count of " + i + " triggered at add nr. " - + counter, triggerPolicy.notifyTrigger(tuples.get(k))); - } - } - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testCountTriggerPolicyStartValues() { - - List tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - - // Test count of different sizes (0..9) - for (int i = 0; i < 10; i++) { - - // Test different start values (-5..5) - for (int j = -5; i < 6; i++) { - TriggerPolicy triggerPolicy = new CountTriggerPolicy(i, j); - // Add tuples without trigger - for (int k = 0; k < ((i - j > 0) ? i - j : 0); k++) { - assertFalse("Triggerpolicy with count of " + i + " and start value of " + j - + " triggered at add nr. " + (k + 1), - triggerPolicy.notifyTrigger(tuples.get(k % 10))); - } - // Expect trigger - assertTrue("Triggerpolicy with count of " + i + "and start value of " + j - + " did not trigger at the expected position.", - triggerPolicy.notifyTrigger(tuples.get(0))); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java deleted file mode 100644 index 424eb57..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.policy; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; -import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy; -import org.junit.Test; - -import java.util.List; -import java.util.Arrays; - -import static org.junit.Assert.*; - -public class DeltaPolicyTest { - - @SuppressWarnings({ "serial", "unchecked", "rawtypes" }) - @Test - public void testDelta() { - DeltaPolicy deltaPolicy = new DeltaPolicy(new DeltaFunction<Tuple2<Integer, Integer>>() { - @Override - public double getDelta(Tuple2<Integer, Integer> oldDataPoint, Tuple2<Integer, Integer> newDataPoint) { - return (double) newDataPoint.f0 - oldDataPoint.f0; - } - }, new Tuple2(0, 0), 2); - - List<Tuple2> tuples = Arrays.asList( - new Tuple2(1, 0), - new Tuple2(2, 0), - new Tuple2(3, 0), - new Tuple2(6, 0)); - - assertFalse(deltaPolicy.notifyTrigger(tuples.get(0))); - assertEquals(0, deltaPolicy.notifyEviction(tuples.get(0), false, 0)); - - assertFalse(deltaPolicy.notifyTrigger(tuples.get(1))); - assertEquals(0, deltaPolicy.notifyEviction(tuples.get(1), false, 1)); - - assertTrue(deltaPolicy.notifyTrigger(tuples.get(2))); - assertEquals(1, deltaPolicy.notifyEviction(tuples.get(2), true, 2)); - - assertTrue(deltaPolicy.notifyTrigger(tuples.get(3))); - assertEquals(2, deltaPolicy.notifyEviction(tuples.get(3), true, 2)); - } - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java deleted file mode 100644 index 8224b12..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.policy; - -import static org.junit.Assert.*; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple; -import org.junit.Test; - -public class PunctuationPolicyTest { - - // This value should not effect the policy. It is changed at each call to - // verify this. - private boolean triggered = false; - - @Test - public void PunctuationTriggerTestWithoutExtraction() { - PunctuationPolicy<Object, Object> policy = new PunctuationPolicy<Object, Object>( - new TestObject(0)); - assertTrue("The present punctuation was not detected. (POS 1)", - policy.notifyTrigger(new TestObject(0))); - assertFalse("There was a punctuation detected which wasn't present. (POS 2)", - policy.notifyTrigger(new TestObject(1))); - } - - @Test - public void PunctuationTriggerTestWithExtraction() { - @SuppressWarnings({ "unchecked", "rawtypes" }) - PunctuationPolicy<Tuple2<Object, Object>, Object> policy = new PunctuationPolicy<Tuple2<Object, Object>, Object>( - new TestObject(0), new FieldFromTuple(0)); - assertTrue("The present punctuation was not detected. (POS 3)", - policy.notifyTrigger(new Tuple2<Object, Object>(new TestObject(0), - new TestObject(1)))); - assertFalse("There was a punctuation detected which wasn't present. (POS 4)", - policy.notifyTrigger(new Tuple2<Object, Object>(new TestObject(1), - new TestObject(0)))); - } - - @Test - public void PunctuationEvictionTestWithoutExtraction() { - // The current buffer size should not effect the test. It's therefore - // always 0 here. - - PunctuationPolicy<Object, Object> policy = new PunctuationPolicy<Object, Object>( - new TestObject(0)); - assertEquals( - "The present punctuation was not detected or the number of deleted tuples was wrong. (POS 5)", - 0, policy.notifyEviction(new TestObject(0), (triggered = !triggered), 0)); - for (int i = 0; i < 10; i++) { - for (int j = 0; j < i; j++) { - assertEquals("There was a punctuation detected which wasn't present. (POS 6)", 0, - policy.notifyEviction(new TestObject(1), (triggered = !triggered), 0)); - } - assertEquals( - "The present punctuation was not detected or the number of deleted tuples was wrong. (POS 7)", - i + 1, policy.notifyEviction(new TestObject(0), (triggered = !triggered), 0)); - } - } - - @Test - public void PunctuationEvictionTestWithExtraction() { - // The current buffer size should not effect the test. It's therefore - // always 0 here. - - @SuppressWarnings({ "unchecked", "rawtypes" }) - PunctuationPolicy<Tuple2<Object, Object>, Object> policy = new PunctuationPolicy<Tuple2<Object, Object>, Object>( - new TestObject(0), new FieldFromTuple(0)); - assertEquals( - "The present punctuation was not detected or the number of deleted tuples was wrong. (POS 10)", - 0, policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(0), - new TestObject(1)), (triggered = !triggered), 0)); - for (int i = 0; i < 10; i++) { - for (int j = 0; j < i; j++) { - assertEquals("There was a punctuation detected which wasn't present. (POS 9)", 0, - policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(1), - new TestObject(0)), (triggered = !triggered), 0)); - } - assertEquals( - "The present punctuation was not detected or the number of deleted tuples was wrong. (POS 10)", - i + 1, policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(0), - new TestObject(1)), (triggered = !triggered), 0)); - } - } - - private class TestObject { - - private int id; - - public TestObject(int id) { - this.id = id; - } - - @Override - public boolean equals(Object o) { - if (o instanceof TestObject && ((TestObject) o).getId() == this.id) { - return true; - } else { - return false; - } - } - - public int getId() { - return id; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java deleted file mode 100644 index b5d502b..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.policy; - -import java.util.LinkedList; - -import org.apache.flink.streaming.api.windowing.helper.Timestamp; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TimeEvictionPolicyTest { - - @Test - public void timeEvictionTest() { - // create some test data - Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30, 31, 33, 36, 40, 41, 42, 43, 44, - 45, 47, 55 }; - Integer[] numToDelete = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 0, 0, 0, 3 }; - - // create a timestamp - @SuppressWarnings("serial") - Timestamp<Integer> timeStamp = new Timestamp<Integer>() { - - @Override - public long getTimestamp(Integer value) { - return value; - } - - }; - - // test different granularity - for (long granularity = 0; granularity < 40; granularity++) { - // create policy - TimeEvictionPolicy<Integer> policy = new TimeEvictionPolicy<Integer>(granularity, - new TimestampWrapper<Integer>(timeStamp, 0)); - - // The trigger status should not effect the policy. Therefore, it's - // value is changed after each usage. - boolean triggered = false; - - // The eviction should work similar with both, fake and real - // elements. Which kind is used is changed on every 3rd element in - // this test. - int fakeAndRealCounter = 0; - boolean fake = false; - - // test by adding values - LinkedList<Integer> buffer = new LinkedList<Integer>(); - for (int i = 0; i < times.length; i++) { - - // check if the current element should be a fake - fakeAndRealCounter++; - if (fakeAndRealCounter > 2) { - fake = !fake; - fakeAndRealCounter = 0; - } - - int result; - - if (fake) { - // Notify eviction with fake element - result = policy.notifyEvictionWithFakeElement(times[i], buffer.size()); - } else { - // Notify eviction with real element - result = policy.notifyEviction(times[i], (triggered = !triggered), - buffer.size()); - } - - // handle correctness of eviction - for (; result > 0 && !buffer.isEmpty(); result--) { - if (buffer.getFirst() <= times[i] - granularity) { - buffer.removeFirst(); - } else { - fail("The policy wanted to evict time " + buffer.getFirst() - + " while the current time was " + times[i] - + "and the granularity was " + granularity); - } - } - - // test that all required evictions have been done - if (!buffer.isEmpty()) { - assertTrue("The policy did not evict " + buffer.getFirst() - + " while the current time was " + times[i] - + " and the granularity was " + granularity, - (buffer.getFirst() >= times[i] - granularity)); - } - - // test influence of other evictions - for (int j = numToDelete[i % numToDelete.length]; j > 0; j--) { - if (!buffer.isEmpty()) { - buffer.removeFirst(); - } - } - - // add current element to buffer if it is no fake - if (!fake) { - buffer.add(times[i]); - } - - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java deleted file mode 100644 index 2bdbd96..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.policy; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.flink.streaming.api.windowing.helper.Timestamp; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; -import org.junit.Test; - -public class TimeTriggerPolicyTest { - - @Test - public void timeTriggerRegularNotifyTest() { - // create some test data - Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30 }; - - // create a timestamp - @SuppressWarnings("serial") - Timestamp<Integer> timeStamp = new Timestamp<Integer>() { - - @Override - public long getTimestamp(Integer value) { - return value; - } - - }; - - // test different granularity - for (long granularity = 0; granularity < 31; granularity++) { - // create policy - - TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, - new TimestampWrapper<Integer>(timeStamp, 0)); - - // remember window border - long currentTime = 0; - - // test by adding values - for (int i = 0; i < times.length; i++) { - boolean result = policy.notifyTrigger(times[i]); - // start time is included, but end time is excluded: >= - if (times[i] >= currentTime + granularity) { - if (granularity != 0) { - currentTime = times[i] - ((times[i] - currentTime) % granularity); - } - assertTrue("The policy did not trigger at pos " + i + " (current time border: " - + currentTime + "; current granularity: " + granularity - + "; data point time: " + times[i] + ")", result); - } else { - assertFalse("The policy triggered wrong at pos " + i - + " (current time border: " + currentTime + "; current granularity: " - + granularity + "; data point time: " + times[i] + ")", result); - } - } - } - - } - - @Test - public void timeTriggerPreNotifyTest() { - // create some test data - Integer[] times = { 1, 3, 20, 26 }; - - // create a timestamp - @SuppressWarnings("serial") - Timestamp<Integer> timeStamp = new Timestamp<Integer>() { - - @Override - public long getTimestamp(Integer value) { - return value; - } - - }; - - // create policy - TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, - new TimestampWrapper<Integer>(timeStamp, 0)); - - // expected result - Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } }; - - // call policy - for (int i = 0; i < times.length; i++) { - arrayEqualityCheck(result[i], policy.preNotifyTrigger(times[i])); - policy.notifyTrigger(times[i]); - } - } - - private void arrayEqualityCheck(Object[] array1, Object[] array2) { - assertEquals("The result arrays must have the same length", array1.length, array2.length); - for (int i = 0; i < array1.length; i++) { - assertEquals("Unequal fields at position " + i, array1[i], array2[i]); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java deleted file mode 100644 index 8cb03a7..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.policy; - -import static org.junit.Assert.*; - -import org.junit.Test; - -public class TumblingEvictionPolicyTest { - - @Test - public void testTumblingEviction() { - EvictionPolicy<Integer> policy = new TumblingEvictionPolicy<Integer>(); - - int counter = 0; - - for (int i = 0; i < 10; i++) { - for (int j = 0; j < i; j++) { - assertEquals(0, policy.notifyEviction(0, false, counter++)); - } - assertEquals(counter, policy.notifyEviction(0, true, counter)); - counter = 1; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java deleted file mode 100644 index de6c200..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.partitioner; - -import static org.junit.Assert.assertArrayEquals; - -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.junit.Before; -import org.junit.Test; - -public class BroadcastPartitionerTest { - - private BroadcastPartitioner<Tuple> broadcastPartitioner1; - private BroadcastPartitioner<Tuple> broadcastPartitioner2; - private BroadcastPartitioner<Tuple> broadcastPartitioner3; - - private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(); - private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null); - - @Before - public void setPartitioner() { - broadcastPartitioner1 = new BroadcastPartitioner<Tuple>(); - broadcastPartitioner2 = new BroadcastPartitioner<Tuple>(); - broadcastPartitioner3 = new BroadcastPartitioner<Tuple>(); - - } - - @Test - public void testSelectChannels() { - int[] first = new int[] { 0 }; - int[] second = new int[] { 0, 1 }; - int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 }; - sd.setInstance(streamRecord); - assertArrayEquals(first, broadcastPartitioner1.selectChannels(sd, 1)); - assertArrayEquals(second, broadcastPartitioner2.selectChannels(sd, 2)); - assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(sd, 6)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java deleted file mode 100644 index 0675242..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.partitioner; - -import static org.junit.Assert.*; - -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.junit.Before; -import org.junit.Test; - -public class DistributePartitionerTest { - - private DistributePartitioner<Tuple> distributePartitioner; - private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(); - private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>( - null); - - @Before - public void setPartitioner() { - distributePartitioner = new DistributePartitioner<Tuple>(false); - } - - @Test - public void testSelectChannelsLength() { - sd.setInstance(streamRecord); - assertEquals(1, distributePartitioner.selectChannels(sd, 1).length); - assertEquals(1, distributePartitioner.selectChannels(sd, 2).length); - assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length); - } - - @Test - public void testSelectChannelsInterval() { - sd.setInstance(streamRecord); - assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]); - assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]); - assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]); - assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java deleted file mode 100644 index b56649b..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.partitioner; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.junit.Before; -import org.junit.Test; - -public class FieldsPartitionerTest { - - private FieldsPartitioner<Tuple> fieldsPartitioner; - private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>() - .setObject(new Tuple2<String, Integer>("test", 0)); - private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>() - .setObject(new Tuple2<String, Integer>("test", 42)); - private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>( - null); - private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>( - null); - - @Before - public void setPartitioner() { - fieldsPartitioner = new FieldsPartitioner<Tuple>(new KeySelector<Tuple, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple value) throws Exception { - return value.getField(0); - } - }); - } - - @Test - public void testSelectChannelsLength() { - sd1.setInstance(streamRecord1); - assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1).length); - assertEquals(1, fieldsPartitioner.selectChannels(sd1, 2).length); - assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1024).length); - } - - @Test - public void testSelectChannelsGrouping() { - sd1.setInstance(streamRecord1); - sd2.setInstance(streamRecord2); - - assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1), - fieldsPartitioner.selectChannels(sd2, 1)); - assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 2), - fieldsPartitioner.selectChannels(sd2, 2)); - assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1024), - fieldsPartitioner.selectChannels(sd2, 1024)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java deleted file mode 100644 index b381d85..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.partitioner; - -import static org.junit.Assert.*; - -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.junit.Before; -import org.junit.Test; - -public class ForwardPartitionerTest { - - private DistributePartitioner<Tuple> forwardPartitioner; - private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(); - private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>( - null); - - @Before - public void setPartitioner() { - forwardPartitioner = new DistributePartitioner<Tuple>(true); - } - - @Test - public void testSelectChannelsLength() { - sd.setInstance(streamRecord); - assertEquals(1, forwardPartitioner.selectChannels(sd, 1).length); - assertEquals(1, forwardPartitioner.selectChannels(sd, 2).length); - assertEquals(1, forwardPartitioner.selectChannels(sd, 1024).length); - } - - @Test - public void testSelectChannelsInterval() { - sd.setInstance(streamRecord); - assertEquals(0, forwardPartitioner.selectChannels(sd, 1)[0]); - assertEquals(1, forwardPartitioner.selectChannels(sd, 2)[0]); - assertEquals(2, forwardPartitioner.selectChannels(sd, 1024)[0]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java deleted file mode 100644 index eebda64..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.partitioner; - -import static org.junit.Assert.assertArrayEquals; - -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.junit.Before; -import org.junit.Test; - -public class GlobalPartitionerTest { - - private GlobalPartitioner<Tuple> globalPartitioner; - private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(); - private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>( - null); - - @Before - public void setPartitioner() { - globalPartitioner = new GlobalPartitioner<Tuple>(); - } - - @Test - public void testSelectChannels() { - int[] result = new int[] { 0 }; - - sd.setInstance(streamRecord); - - assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1)); - assertArrayEquals(result, globalPartitioner.selectChannels(sd, 2)); - assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1024)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java deleted file mode 100644 index 3c03d07..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.partitioner; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.junit.Before; -import org.junit.Test; - -public class ShufflePartitionerTest { - - private ShufflePartitioner<Tuple> shufflePartitioner; - private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(); - private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>( - null); - - @Before - public void setPartitioner() { - shufflePartitioner = new ShufflePartitioner<Tuple>(); - } - - @Test - public void testSelectChannelsLength() { - sd.setInstance(streamRecord); - assertEquals(1, shufflePartitioner.selectChannels(sd, 1).length); - assertEquals(1, shufflePartitioner.selectChannels(sd, 2).length); - assertEquals(1, shufflePartitioner.selectChannels(sd, 1024).length); - } - - @Test - public void testSelectChannelsInterval() { - sd.setInstance(streamRecord); - assertEquals(0, shufflePartitioner.selectChannels(sd, 1)[0]); - - assertTrue(0 <= shufflePartitioner.selectChannels(sd, 2)[0]); - assertTrue(2 > shufflePartitioner.selectChannels(sd, 2)[0]); - - assertTrue(0 <= shufflePartitioner.selectChannels(sd, 1024)[0]); - assertTrue(1024 > shufflePartitioner.selectChannels(sd, 1024)[0]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java deleted file mode 100644 index 194403c..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.state; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.streaming.state.checkpoint.MapCheckpoint; -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; -import org.junit.Test; - -public class MapStateTest { - - @Test - public void testMapState() { - - Map<String, Integer> map = new HashMap<String, Integer>(); - map.put("a", 1); - map.put("b", 2); - map.put("c", 3); - map.remove("a"); - - MapState<String, Integer> mapState = new MapState<String, Integer>(); - mapState.put("a", 1); - mapState.put("b", 2); - mapState.put("c", 3); - - assertEquals(1, (int) mapState.remove("a")); - assertEquals(null, mapState.remove("a")); - assertEquals(2, mapState.size()); - assertEquals(map, mapState.state); - assertEquals(map.entrySet(), mapState.entrySet()); - assertTrue(mapState.containsKey("b")); - assertFalse(mapState.containsKey("a")); - - assertEquals(2, mapState.updatedItems.size()); - assertEquals(1, mapState.removedItems.size()); - - Map<String, Integer> map2 = new HashMap<String, Integer>(); - map2.put("a", 0); - map2.put("e", -1); - - mapState.putAll(map2); - - assertEquals(4, mapState.updatedItems.size()); - assertEquals(0, mapState.removedItems.size()); - - mapState.clear(); - assertEquals(new HashMap<String, Integer>(), mapState.state); - assertTrue(mapState.clear); - assertEquals(0, mapState.updatedItems.size()); - assertEquals(0, mapState.removedItems.size()); - - mapState.putAll(map); - assertEquals(map.keySet(), mapState.updatedItems); - - } - - @SuppressWarnings("unchecked") - @Test - public void testMapStateCheckpointing() { - - Map<String, Integer> map = new HashMap<String, Integer>(); - map.put("a", 1); - map.put("b", 2); - map.put("c", 3); - - MapState<String, Integer> mapState = new MapState<String, Integer>(); - mapState.putAll(map); - - StateCheckpoint<Map<String, Integer>> mcp = mapState.checkpoint(); - assertEquals(map, mcp.getCheckpointedState()); - - Map<String, Integer> map2 = new HashMap<String, Integer>(); - map2.put("a", 0); - map2.put("e", -1); - - mapState.put("a", 0); - mapState.put("e", -1); - mapState.remove("b"); - StateCheckpoint<Map<String, Integer>> mcp2 = new MapCheckpoint<String, Integer>(mapState); - assertEquals(map2, mcp2.getCheckpointedState()); - mcp.update(mcp2); - assertEquals(mapState.state, mcp.getCheckpointedState()); - - mapState.clear(); - mapState.put("a", 1); - mapState.put("a", 2); - mapState.put("b", -3); - mapState.put("c", 0); - mapState.remove("b"); - mcp.update(mapState.checkpoint()); - assertEquals(mapState.state, mcp.getCheckpointedState()); - - MapState<String, Integer> mapState2 = (MapState<String, Integer>) new MapState<String, Integer>() - .restore(mcp); - - assertTrue(mapState2.stateEquals(mapState)); - mapState2.reBuild(mapState2.repartition(10)); - assertTrue(mapState2.stateEquals(mapState)); - - MapState<Integer, Integer> mapState3 = new MapState<Integer, Integer>(); - mapState3.put(1, 1); - mapState3.put(2, 1); - - mapState3.reBuild(mapState3.repartition(2)[0]); - assertTrue(mapState3.containsKey(2)); - assertFalse(mapState3.containsKey(1)); - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java deleted file mode 100644 index 6cb8f51..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.state; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; -import org.junit.Test; - -public class OperatorStateTest { - - @Test - public void testOperatorState() { - OperatorState<Integer> os = new SimpleState<Integer>(5); - - StateCheckpoint<Integer> scp = os.checkpoint(); - - assertTrue(os.stateEquals(new SimpleState<Integer>().restore(scp))); - - assertEquals((Integer) 5, os.getState()); - - os.setState(10); - - assertEquals((Integer) 10, os.getState()); - - scp.update(os.checkpoint()); - - assertEquals((Integer)10,scp.getCheckpointedState()); - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java deleted file mode 100644 index ea94f98..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.StreamConfig; -import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; -import org.apache.flink.streaming.api.streamvertex.StreamTaskContext; -import org.apache.flink.streaming.io.CoReaderIterator; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; - -public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> { - // private Collection<IN1> input1; - // private Collection<IN2> input2; - private Iterator<IN1> inputIterator1; - private Iterator<IN2> inputIterator2; - private List<OUT> outputs; - - private Collector<OUT> collector; - private StreamRecordSerializer<IN1> inDeserializer1; - private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator; - private StreamRecordSerializer<IN2> inDeserializer2; - - public MockCoContext(Collection<IN1> input1, Collection<IN2> input2) { - - if (input1.isEmpty() || input2.isEmpty()) { - throw new RuntimeException("Inputs must not be empty"); - } - - this.inputIterator1 = input1.iterator(); - this.inputIterator2 = input2.iterator(); - - TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next()); - inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1); - TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next()); - inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2); - - mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2); - - outputs = new ArrayList<OUT>(); - collector = new MockCollector<OUT>(outputs); - } - - private int currentInput = 1; - private StreamRecord<IN1> reuse1; - private StreamRecord<IN2> reuse2; - - private class MockCoReaderIterator extends - CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> { - - public MockCoReaderIterator(TypeSerializer<StreamRecord<IN1>> serializer1, - TypeSerializer<StreamRecord<IN2>> serializer2) { - super(null, serializer1, serializer2); - reuse1 = inDeserializer1.createInstance(); - reuse2 = inDeserializer2.createInstance(); - } - - @Override - public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException { - this.delegate1.setInstance(target1); - this.delegate2.setInstance(target2); - - int inputNumber = nextRecord(); - target1.setObject(reuse1.getObject()); - target2.setObject(reuse2.getObject()); - - return inputNumber; - } - } - - private Integer nextRecord() { - if (inputIterator1.hasNext() && inputIterator2.hasNext()) { - switch (currentInput) { - case 1: - return next1(); - case 2: - return next2(); - default: - return 0; - } - } - - if (inputIterator1.hasNext()) { - return next1(); - } - - if (inputIterator2.hasNext()) { - return next2(); - } - - return 0; - } - - private int next1() { - reuse1 = inDeserializer1.createInstance(); - reuse1.setObject(inputIterator1.next()); - currentInput = 2; - return 1; - } - - private int next2() { - reuse2 = inDeserializer2.createInstance(); - reuse2.setObject(inputIterator2.next()); - currentInput = 1; - return 2; - } - - public List<OUT> getOutputs() { - return outputs; - } - - public Collector<OUT> getCollector() { - return collector; - } - - public StreamRecordSerializer<IN1> getInDeserializer1() { - return inDeserializer1; - } - - public StreamRecordSerializer<IN2> getInDeserializer2() { - return inDeserializer2; - } - - public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() { - return mockIterator; - } - - public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable, - List<IN1> input1, List<IN2> input2) { - MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2); - invokable.setup(mockContext); - - try { - invokable.open(null); - invokable.invoke(); - invokable.close(); - } catch (Exception e) { - throw new RuntimeException("Cannot invoke invokable.", e); - } - - return mockContext.getOutputs(); - } - - @Override - public StreamConfig getConfig() { - return null; - } - - @Override - public ClassLoader getUserCodeClassLoader() { - return null; - } - - @SuppressWarnings("unchecked") - @Override - public <X> MutableObjectIterator<X> getInput(int index) { - switch (index) { - case 0: - return (MutableObjectIterator<X>) inputIterator1; - case 1: - return (MutableObjectIterator<X>) inputIterator2; - default: - throw new IllegalArgumentException("CoStreamVertex has only 2 inputs"); - } - } - - @SuppressWarnings("unchecked") - @Override - public <X> StreamRecordSerializer<X> getInputSerializer(int index) { - switch (index) { - case 0: - return (StreamRecordSerializer<X>) inDeserializer1; - case 1: - return (StreamRecordSerializer<X>) inDeserializer2; - default: - throw new IllegalArgumentException("CoStreamVertex has only 2 inputs"); - } - } - - @SuppressWarnings("unchecked") - @Override - public <X, Y> CoReaderIterator<X, Y> getCoReader() { - return (CoReaderIterator<X, Y>) mockIterator; - } - - @Override - public Collector<OUT> getOutputCollector() { - return collector; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java deleted file mode 100644 index e8b96c5..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.io.Serializable; -import java.util.Collection; - -import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.util.Collector; - -public class MockCollector<T> implements Collector<T> { - private Collection<T> outputs; - - public MockCollector(Collection<T> outputs) { - this.outputs = outputs; - } - - @Override - public void collect(T record) { - T copied = SerializationUtils.deserialize(SerializationUtils - .serialize((Serializable) record)); - outputs.add(copied); - } - - @Override - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java deleted file mode 100644 index 5537052..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.StreamConfig; -import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; -import org.apache.flink.streaming.api.streamvertex.StreamTaskContext; -import org.apache.flink.streaming.io.CoReaderIterator; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; - -public class MockContext<IN, OUT> implements StreamTaskContext<OUT> { - private Collection<IN> inputs; - private List<OUT> outputs; - - private Collector<OUT> collector; - private StreamRecordSerializer<IN> inDeserializer; - private MutableObjectIterator<StreamRecord<IN>> iterator; - - public MockContext(Collection<IN> inputs) { - this.inputs = inputs; - if (inputs.isEmpty()) { - throw new RuntimeException("Inputs must not be empty"); - } - - TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next()); - inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo); - - iterator = new MockInputIterator(); - outputs = new ArrayList<OUT>(); - collector = new MockCollector<OUT>(outputs); - } - - private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> { - Iterator<IN> listIterator; - - public MockInputIterator() { - listIterator = inputs.iterator(); - } - - @Override - public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException { - if (listIterator.hasNext()) { - reuse.setObject(listIterator.next()); - } else { - reuse = null; - } - return reuse; - } - - @Override - public StreamRecord<IN> next() throws IOException { - if (listIterator.hasNext()) { - StreamRecord<IN> result = new StreamRecord<IN>(); - result.setObject(listIterator.next()); - return result; - } else { - return null; - } - } - } - - public List<OUT> getOutputs() { - return outputs; - } - - public Collector<OUT> getCollector() { - return collector; - } - - public StreamRecordSerializer<IN> getInDeserializer() { - return inDeserializer; - } - - public MutableObjectIterator<StreamRecord<IN>> getIterator() { - return iterator; - } - - public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable, - List<IN> inputs) { - MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs); - invokable.setup(mockContext); - try { - invokable.open(null); - invokable.invoke(); - invokable.close(); - } catch (Exception e) { - throw new RuntimeException("Cannot invoke invokable.", e); - } - - return mockContext.getOutputs(); - } - - @Override - public StreamConfig getConfig() { - return null; - } - - @Override - public ClassLoader getUserCodeClassLoader() { - return null; - } - - @SuppressWarnings("unchecked") - @Override - public <X> MutableObjectIterator<X> getInput(int index) { - if (index == 0) { - return (MutableObjectIterator<X>) iterator; - } else { - throw new IllegalArgumentException("There is only 1 input"); - } - } - - @SuppressWarnings("unchecked") - @Override - public <X> StreamRecordSerializer<X> getInputSerializer(int index) { - if (index == 0) { - return (StreamRecordSerializer<X>) inDeserializer; - } else { - throw new IllegalArgumentException("There is only 1 input"); - } - } - - @Override - public Collector<OUT> getOutputCollector() { - return collector; - } - - @Override - public <X, Y> CoReaderIterator<X, Y> getCoReader() { - throw new IllegalArgumentException("CoReader not available"); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java deleted file mode 100644 index 88673a3..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.mock; - -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamvertex.MockRecordWriter; -import org.mockito.Mockito; - -public class MockRecordWriterFactory { - - @SuppressWarnings("unchecked") - public static MockRecordWriter create() { - MockRecordWriter recWriter = mock(MockRecordWriter.class); - - Mockito.when(recWriter.initList()).thenCallRealMethod(); - doCallRealMethod().when(recWriter).emit(Mockito.any(SerializationDelegate.class)); - - recWriter.initList(); - - return recWriter; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java deleted file mode 100644 index bb92e8e..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.streaming.api.function.source.SourceFunction; - -public class MockSource<T> { - - public static <T> List<T> createAndExecute(SourceFunction<T> source) { - List<T> outputs = new ArrayList<T>(); - try { - source.invoke(new MockCollector<T>(outputs)); - } catch (Exception e) { - throw new RuntimeException("Cannot invoke source.", e); - } - return outputs; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java deleted file mode 100644 index 95a5d9b..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import akka.actor.ActorRef; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.client.JobClient; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; - -public class TestStreamEnvironment extends StreamExecutionEnvironment { - private static final String DEFAULT_JOBNAME = "TestStreamingJob"; - private static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job"; - - private long memorySize; - - public TestStreamEnvironment(int degreeOfParallelism, long memorySize){ - this.setDegreeOfParallelism(degreeOfParallelism); - - this.memorySize = memorySize; - } - - @Override - public void execute() throws Exception { - execute(DEFAULT_JOBNAME); - } - - @Override - public void execute(String jobName) throws Exception { - JobGraph jobGraph = streamGraph.getJobGraph(jobName); - - Configuration configuration = jobGraph.getJobConfiguration(); - - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, - getDegreeOfParallelism()); - configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize); - - ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); - - try{ - ActorRef client = cluster.getJobClient(); - JobClient.submitJobAndWait(jobGraph, false, client, cluster.timeout()); - }catch(JobExecutionException e){ - if(e.getMessage().contains("GraphConversionException")){ - throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e); - }else{ - throw e; - } - }finally{ - cluster.stop(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties b/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties deleted file mode 100644 index 2fb9345..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,19 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml b/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml deleted file mode 100644 index 4f56748..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> - <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/> -</configuration> \ No newline at end of file