http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java
deleted file mode 100644
index 0379fe0..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.windowing.extractor.FieldsFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldsFromTupleTest {
-
-       private double[] testDouble;
-
-       @Before
-       public void init() {
-               testDouble = new double[Tuple.MAX_ARITY];
-               for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-                       testDouble[i] = i;
-               }
-       }
-
-       @Test
-       public void testUserSpecifiedOrder() throws InstantiationException, 
IllegalAccessException {
-               Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 
1].newInstance();
-               for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-                       currentTuple.setField(testDouble[i], i);
-               }
-
-               double[] expected = { testDouble[5], testDouble[3], 
testDouble[6], testDouble[7],
-                               testDouble[0] };
-               arrayEqualityCheck(expected, new FieldsFromTuple(5, 3, 6, 7, 
0).extract(currentTuple));
-
-               double[] expected2 = { testDouble[0], 
testDouble[Tuple.MAX_ARITY - 1] };
-               arrayEqualityCheck(expected2,
-                               new FieldsFromTuple(0, Tuple.MAX_ARITY - 
1).extract(currentTuple));
-
-               double[] expected3 = { testDouble[Tuple.MAX_ARITY - 1], 
testDouble[0] };
-               arrayEqualityCheck(expected3,
-                               new FieldsFromTuple(Tuple.MAX_ARITY - 1, 
0).extract(currentTuple));
-
-               double[] expected4 = { testDouble[13], testDouble[4], 
testDouble[5], testDouble[4],
-                               testDouble[2], testDouble[8], testDouble[6], 
testDouble[2], testDouble[8],
-                               testDouble[3], testDouble[5], testDouble[2], 
testDouble[16], testDouble[4],
-                               testDouble[3], testDouble[2], testDouble[6], 
testDouble[4], testDouble[7],
-                               testDouble[4], testDouble[2], testDouble[8], 
testDouble[7], testDouble[2] };
-               arrayEqualityCheck(expected4, new FieldsFromTuple(13, 4, 5, 4, 
2, 8, 6, 2, 8, 3, 5, 2, 16,
-                               4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 
2).extract(currentTuple));
-       }
-
-       private void arrayEqualityCheck(double[] array1, double[] array2) {
-               assertEquals("The result arrays must have the same length", 
array1.length, array2.length);
-               for (int i = 0; i < array1.length; i++) {
-                       assertEquals("Unequal fields at position " + i, 
array1[i], array2[i], 0d);
-               }
-       }
-
-       private static final Class<?>[] CLASSES = new Class<?>[] { 
Tuple1.class, Tuple2.class,
-                       Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, 
Tuple7.class, Tuple8.class,
-                       Tuple9.class, Tuple10.class, Tuple11.class, 
Tuple12.class, Tuple13.class,
-                       Tuple14.class, Tuple15.class, Tuple16.class, 
Tuple17.class, Tuple18.class,
-                       Tuple19.class, Tuple20.class, Tuple21.class, 
Tuple22.class, Tuple23.class,
-                       Tuple24.class, Tuple25.class };
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
deleted file mode 100644
index 4773f4e..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class CountEvictionPolicyTest {
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Test
-       public void testCountEvictionPolicy() {
-               List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 
9);
-               int counter;
-
-               // The count policy should not care about the triggered 
parameter
-               // Therefore its value switches after each use in this test.
-               boolean triggered = false;
-               // the size of the buffer should not matter as well!
-
-               // Test count of different sizes (0..9)
-               for (int i = 0; i < 10; i++) {
-                       EvictionPolicy evictionPolicy = new 
CountEvictionPolicy(i, i);
-                       counter = 0;
-
-                       // Test first i steps (should not evict)
-                       for (int j = 0; j < i; j++) {
-                               counter++;
-                               assertEquals("Evictionpolicy with count of " + 
i + " evicted tuples at add nr. "
-                                               + counter + ". It should not 
evict for the first " + i + " adds.", 0,
-                                               
evictionPolicy.notifyEviction(tuples.get(j), (triggered = !triggered),
-                                                               
tuples.get(Math.abs((i - j)) % 10)));
-                       }
-
-                       // Test the next three evictions
-                       for (int j = 0; j < 3; j++) {
-                               // The first add should evict now
-                               counter++;
-                               assertEquals("Evictionpolicy with count of " + i
-                                               + " did not evict correct 
number of tuples at the expected pos " + counter
-                                               + ".", i, 
evictionPolicy.notifyEviction(tuples.get(j),
-                                               (triggered = !triggered), 
tuples.get(Math.abs((i - j)) % 10)));
-
-                               // the next i-1 adds should not evict
-                               for (int k = 0; k < i - 1; k++) {
-                                       counter++;
-                                       assertEquals("Evictionpolicy with count 
of " + i
-                                                       + " evicted tuples at 
add nr. " + counter, 0,
-                                                       
evictionPolicy.notifyEviction(tuples.get(j), (triggered = !triggered),
-                                                                       
tuples.get(Math.abs((i - j)) % 10)));
-                               }
-                       }
-               }
-       }
-
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Test
-       public void testCountEvictionPolicyStartValuesAndEvictionAmount() {
-
-               // The count policy should not care about the triggered 
parameter
-               // Therefore its value switches after each use in this test.
-               boolean triggered = false;
-               // the size of the buffer should not matter as well!
-
-               List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 
9);
-
-               // Text different eviction amounts (0..3)
-               for (int x = 0; x < 4; x++) {
-
-                       // Test count of different sizes (0..9)
-                       for (int i = 0; i < 10; i++) {
-
-                               int counter = 0;
-
-                               // Test different start values (-5..5)
-                               for (int j = -5; i < 6; i++) {
-                                       EvictionPolicy evictionPolicy = new 
CountEvictionPolicy(i, x, j);
-                                       // Add tuples without eviction
-                                       for (int k = 0; k < ((i - j > 0) ? i - 
j : 0); k++) {
-                                               counter++;
-                                               assertEquals("Evictionpolicy 
with count of " + i
-                                                               + " did not 
evict correct number of tuples at the expected pos "
-                                                               + counter + 
".", 0, evictionPolicy.notifyEviction(
-                                                               
tuples.get(Math.abs(j)), (triggered = !triggered),
-                                                               
tuples.get(Math.abs((i - j)) % 10)));
-                                       }
-                                       // Expect eviction
-                                       counter++;
-                                       assertEquals("Evictionpolicy with count 
of " + i
-                                                       + " did not evict 
correct number of tuples at the expected pos "
-                                                       + counter + ".", x, 
evictionPolicy.notifyEviction(
-                                                       
tuples.get(Math.abs(j)), (triggered = !triggered),
-                                                       tuples.get(Math.abs((i 
- j)) % 10)));
-                               }
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
deleted file mode 100644
index b7120e7..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.junit.Test;
-
-public class CountTriggerPolicyTest {
-
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Test
-       public void testCountTriggerPolicy() {
-
-               List tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-               int counter;
-
-               // Test count of different sizes (0..9)
-               for (int i = 0; i < 10; i++) {
-                       TriggerPolicy triggerPolicy = Count.of(i).toTrigger();
-                       counter=0;
-                       
-                       // Test first i steps (should not trigger)
-                       for (int j = 0; j < i; j++) {
-                               counter++;
-                               assertFalse("Triggerpolicy with count of " + i 
+ " triggered at add nr. " + counter
-                                               + ". It should not trigger for 
the first " + i + " adds.",
-                                               
triggerPolicy.notifyTrigger(tuples.get(j)));
-                       }
-
-                       // Test the next three triggers
-                       for (int j = 0; j < 3; j++) {
-                               // The first add should trigger now
-                               counter++;
-                               assertTrue("Triggerpolicy with count of " + i
-                                               + " did not trigger at the 
expected pos " + counter + ".",
-                                               
triggerPolicy.notifyTrigger(tuples.get(j)));
-
-                               // the next i-1 adds should not trigger
-                               for (int k = 0; k < i - 1; k++) {
-                                       counter++;
-                                       assertFalse("Triggerpolicy with count 
of " + i + " triggered at add nr. "
-                                                       + counter, 
triggerPolicy.notifyTrigger(tuples.get(k)));
-                               }
-                       }
-               }
-       }
-
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Test
-       public void testCountTriggerPolicyStartValues() {
-
-               List tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
-               // Test count of different sizes (0..9)
-               for (int i = 0; i < 10; i++) {
-
-                       // Test different start values (-5..5)
-                       for (int j = -5; i < 6; i++) {
-                               TriggerPolicy triggerPolicy = new 
CountTriggerPolicy(i, j);
-                               // Add tuples without trigger
-                               for (int k = 0; k < ((i - j > 0) ? i - j : 0); 
k++) {
-                                       assertFalse("Triggerpolicy with count 
of " + i + " and start value of " + j
-                                                       + " triggered at add 
nr. " + (k + 1),
-                                                       
triggerPolicy.notifyTrigger(tuples.get(k % 10)));
-                               }
-                               // Expect trigger
-                               assertTrue("Triggerpolicy with count of " + i + 
"and start value of " + j
-                                               + " did not trigger at the 
expected position.",
-                                               
triggerPolicy.notifyTrigger(tuples.get(0)));
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
deleted file mode 100644
index 424eb57..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Arrays;
-
-import static org.junit.Assert.*;
-
-public class DeltaPolicyTest {
-
-       @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
-       @Test
-       public void testDelta() {
-               DeltaPolicy deltaPolicy = new DeltaPolicy(new 
DeltaFunction<Tuple2<Integer, Integer>>() {
-                       @Override
-                       public double getDelta(Tuple2<Integer, Integer> 
oldDataPoint, Tuple2<Integer, Integer> newDataPoint) {
-                               return (double) newDataPoint.f0 - 
oldDataPoint.f0;
-                       }
-               }, new Tuple2(0, 0), 2);
-
-               List<Tuple2> tuples = Arrays.asList(
-                               new Tuple2(1, 0),
-                               new Tuple2(2, 0),
-                               new Tuple2(3, 0),
-                               new Tuple2(6, 0));
-
-               assertFalse(deltaPolicy.notifyTrigger(tuples.get(0)));
-               assertEquals(0, deltaPolicy.notifyEviction(tuples.get(0), 
false, 0));
-
-               assertFalse(deltaPolicy.notifyTrigger(tuples.get(1)));
-               assertEquals(0, deltaPolicy.notifyEviction(tuples.get(1), 
false, 1));
-
-               assertTrue(deltaPolicy.notifyTrigger(tuples.get(2)));
-               assertEquals(1, deltaPolicy.notifyEviction(tuples.get(2), true, 
2));
-
-               assertTrue(deltaPolicy.notifyTrigger(tuples.get(3)));
-               assertEquals(2, deltaPolicy.notifyEviction(tuples.get(3), true, 
2));
-       }
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
deleted file mode 100644
index 8224b12..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple;
-import org.junit.Test;
-
-public class PunctuationPolicyTest {
-
-       // This value should not effect the policy. It is changed at each call 
to
-       // verify this.
-       private boolean triggered = false;
-
-       @Test
-       public void PunctuationTriggerTestWithoutExtraction() {
-               PunctuationPolicy<Object, Object> policy = new 
PunctuationPolicy<Object, Object>(
-                               new TestObject(0));
-               assertTrue("The present punctuation was not detected. (POS 1)",
-                               policy.notifyTrigger(new TestObject(0)));
-               assertFalse("There was a punctuation detected which wasn't 
present. (POS 2)",
-                               policy.notifyTrigger(new TestObject(1)));
-       }
-
-       @Test
-       public void PunctuationTriggerTestWithExtraction() {
-               @SuppressWarnings({ "unchecked", "rawtypes" })
-               PunctuationPolicy<Tuple2<Object, Object>, Object> policy = new 
PunctuationPolicy<Tuple2<Object, Object>, Object>(
-                               new TestObject(0), new FieldFromTuple(0));
-               assertTrue("The present punctuation was not detected. (POS 3)",
-                               policy.notifyTrigger(new Tuple2<Object, 
Object>(new TestObject(0),
-                                               new TestObject(1))));
-               assertFalse("There was a punctuation detected which wasn't 
present. (POS 4)",
-                               policy.notifyTrigger(new Tuple2<Object, 
Object>(new TestObject(1),
-                                               new TestObject(0))));
-       }
-
-       @Test
-       public void PunctuationEvictionTestWithoutExtraction() {
-               // The current buffer size should not effect the test. It's 
therefore
-               // always 0 here.
-
-               PunctuationPolicy<Object, Object> policy = new 
PunctuationPolicy<Object, Object>(
-                               new TestObject(0));
-               assertEquals(
-                               "The present punctuation was not detected or 
the number of deleted tuples was wrong. (POS 5)",
-                               0, policy.notifyEviction(new TestObject(0), 
(triggered = !triggered), 0));
-               for (int i = 0; i < 10; i++) {
-                       for (int j = 0; j < i; j++) {
-                               assertEquals("There was a punctuation detected 
which wasn't present. (POS 6)", 0,
-                                               policy.notifyEviction(new 
TestObject(1), (triggered = !triggered), 0));
-                       }
-                       assertEquals(
-                                       "The present punctuation was not 
detected or the number of deleted tuples was wrong. (POS 7)",
-                                       i + 1, policy.notifyEviction(new 
TestObject(0), (triggered = !triggered), 0));
-               }
-       }
-
-       @Test
-       public void PunctuationEvictionTestWithExtraction() {
-               // The current buffer size should not effect the test. It's 
therefore
-               // always 0 here.
-
-               @SuppressWarnings({ "unchecked", "rawtypes" })
-               PunctuationPolicy<Tuple2<Object, Object>, Object> policy = new 
PunctuationPolicy<Tuple2<Object, Object>, Object>(
-                               new TestObject(0), new FieldFromTuple(0));
-               assertEquals(
-                               "The present punctuation was not detected or 
the number of deleted tuples was wrong. (POS 10)",
-                               0, policy.notifyEviction(new Tuple2<Object, 
Object>(new TestObject(0),
-                                               new TestObject(1)), (triggered 
= !triggered), 0));
-               for (int i = 0; i < 10; i++) {
-                       for (int j = 0; j < i; j++) {
-                               assertEquals("There was a punctuation detected 
which wasn't present. (POS 9)", 0,
-                                               policy.notifyEviction(new 
Tuple2<Object, Object>(new TestObject(1),
-                                                               new 
TestObject(0)), (triggered = !triggered), 0));
-                       }
-                       assertEquals(
-                                       "The present punctuation was not 
detected or the number of deleted tuples was wrong. (POS 10)",
-                                       i + 1, policy.notifyEviction(new 
Tuple2<Object, Object>(new TestObject(0),
-                                                       new TestObject(1)), 
(triggered = !triggered), 0));
-               }
-       }
-
-       private class TestObject {
-
-               private int id;
-
-               public TestObject(int id) {
-                       this.id = id;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (o instanceof TestObject && ((TestObject) o).getId() 
== this.id) {
-                               return true;
-                       } else {
-                               return false;
-                       }
-               }
-
-               public int getId() {
-                       return id;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
deleted file mode 100644
index b5d502b..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TimeEvictionPolicyTest {
-
-       @Test
-       public void timeEvictionTest() {
-               // create some test data
-               Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30, 31, 
33, 36, 40, 41, 42, 43, 44,
-                               45, 47, 55 };
-               Integer[] numToDelete = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 
0, 0, 0, 0, 3 };
-
-               // create a timestamp
-               @SuppressWarnings("serial")
-               Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-                       @Override
-                       public long getTimestamp(Integer value) {
-                               return value;
-                       }
-
-               };
-
-               // test different granularity
-               for (long granularity = 0; granularity < 40; granularity++) {
-                       // create policy
-                       TimeEvictionPolicy<Integer> policy = new 
TimeEvictionPolicy<Integer>(granularity,
-                                       new 
TimestampWrapper<Integer>(timeStamp, 0));
-
-                       // The trigger status should not effect the policy. 
Therefore, it's
-                       // value is changed after each usage.
-                       boolean triggered = false;
-
-                       // The eviction should work similar with both, fake and 
real
-                       // elements. Which kind is used is changed on every 3rd 
element in
-                       // this test.
-                       int fakeAndRealCounter = 0;
-                       boolean fake = false;
-
-                       // test by adding values
-                       LinkedList<Integer> buffer = new LinkedList<Integer>();
-                       for (int i = 0; i < times.length; i++) {
-
-                               // check if the current element should be a fake
-                               fakeAndRealCounter++;
-                               if (fakeAndRealCounter > 2) {
-                                       fake = !fake;
-                                       fakeAndRealCounter = 0;
-                               }
-
-                               int result;
-
-                               if (fake) {
-                                       // Notify eviction with fake element
-                                       result = 
policy.notifyEvictionWithFakeElement(times[i], buffer.size());
-                               } else {
-                                       // Notify eviction with real element
-                                       result = 
policy.notifyEviction(times[i], (triggered = !triggered),
-                                                       buffer.size());
-                               }
-
-                               // handle correctness of eviction
-                               for (; result > 0 && !buffer.isEmpty(); 
result--) {
-                                       if (buffer.getFirst() <= times[i] - 
granularity) {
-                                               buffer.removeFirst();
-                                       } else {
-                                               fail("The policy wanted to 
evict time " + buffer.getFirst()
-                                                               + " while the 
current time was " + times[i]
-                                                               + "and the 
granularity was " + granularity);
-                                       }
-                               }
-
-                               // test that all required evictions have been 
done
-                               if (!buffer.isEmpty()) {
-                                       assertTrue("The policy did not evict " 
+ buffer.getFirst()
-                                                       + " while the current 
time was " + times[i]
-                                                       + " and the granularity 
was " + granularity,
-                                                       (buffer.getFirst() >= 
times[i] - granularity));
-                               }
-
-                               // test influence of other evictions
-                               for (int j = numToDelete[i % 
numToDelete.length]; j > 0; j--) {
-                                       if (!buffer.isEmpty()) {
-                                               buffer.removeFirst();
-                                       }
-                               }
-
-                               // add current element to buffer if it is no 
fake
-                               if (!fake) {
-                                       buffer.add(times[i]);
-                               }
-
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
deleted file mode 100644
index 2bdbd96..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
-
-public class TimeTriggerPolicyTest {
-
-       @Test
-       public void timeTriggerRegularNotifyTest() {
-               // create some test data
-               Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30 };
-
-               // create a timestamp
-               @SuppressWarnings("serial")
-               Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-                       @Override
-                       public long getTimestamp(Integer value) {
-                               return value;
-                       }
-
-               };
-
-               // test different granularity
-               for (long granularity = 0; granularity < 31; granularity++) {
-                       // create policy
-
-                       TriggerPolicy<Integer> policy = new 
TimeTriggerPolicy<Integer>(granularity,
-                                       new 
TimestampWrapper<Integer>(timeStamp, 0));
-
-                       // remember window border
-                       long currentTime = 0;
-
-                       // test by adding values
-                       for (int i = 0; i < times.length; i++) {
-                               boolean result = policy.notifyTrigger(times[i]);
-                               // start time is included, but end time is 
excluded: >=
-                               if (times[i] >= currentTime + granularity) {
-                                       if (granularity != 0) {
-                                               currentTime = times[i] - 
((times[i] - currentTime) % granularity);
-                                       }
-                                       assertTrue("The policy did not trigger 
at pos " + i + " (current time border: "
-                                                       + currentTime + "; 
current granularity: " + granularity
-                                                       + "; data point time: " 
+ times[i] + ")", result);
-                               } else {
-                                       assertFalse("The policy triggered wrong 
at pos " + i
-                                                       + " (current time 
border: " + currentTime + "; current granularity: "
-                                                       + granularity + "; data 
point time: " + times[i] + ")", result);
-                               }
-                       }
-               }
-
-       }
-
-       @Test
-       public void timeTriggerPreNotifyTest() {
-               // create some test data
-               Integer[] times = { 1, 3, 20, 26 };
-
-               // create a timestamp
-               @SuppressWarnings("serial")
-               Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-                       @Override
-                       public long getTimestamp(Integer value) {
-                               return value;
-                       }
-
-               };
-
-               // create policy
-               TimeTriggerPolicy<Integer> policy = new 
TimeTriggerPolicy<Integer>(5,
-                               new TimestampWrapper<Integer>(timeStamp, 0));
-
-               // expected result
-               Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };
-
-               // call policy
-               for (int i = 0; i < times.length; i++) {
-                       arrayEqualityCheck(result[i], 
policy.preNotifyTrigger(times[i]));
-                       policy.notifyTrigger(times[i]);
-               }
-       }
-
-       private void arrayEqualityCheck(Object[] array1, Object[] array2) {
-               assertEquals("The result arrays must have the same length", 
array1.length, array2.length);
-               for (int i = 0; i < array1.length; i++) {
-                       assertEquals("Unequal fields at position " + i, 
array1[i], array2[i]);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
deleted file mode 100644
index 8cb03a7..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-
-public class TumblingEvictionPolicyTest {
-
-       @Test
-       public void testTumblingEviction() {
-               EvictionPolicy<Integer> policy = new 
TumblingEvictionPolicy<Integer>();
-
-               int counter = 0;
-
-               for (int i = 0; i < 10; i++) {
-                       for (int j = 0; j < i; j++) {
-                               assertEquals(0, policy.notifyEviction(0, false, 
counter++));
-                       }
-                       assertEquals(counter, policy.notifyEviction(0, true, 
counter));
-                       counter = 1;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
deleted file mode 100644
index de6c200..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BroadcastPartitionerTest {
-
-       private BroadcastPartitioner<Tuple> broadcastPartitioner1;
-       private BroadcastPartitioner<Tuple> broadcastPartitioner2;
-       private BroadcastPartitioner<Tuple> broadcastPartitioner3;
-       
-       private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
-       private SerializationDelegate<StreamRecord<Tuple>> sd = new 
SerializationDelegate<StreamRecord<Tuple>>(null);
-
-       @Before
-       public void setPartitioner() {
-               broadcastPartitioner1 = new BroadcastPartitioner<Tuple>();
-               broadcastPartitioner2 = new BroadcastPartitioner<Tuple>();
-               broadcastPartitioner3 = new BroadcastPartitioner<Tuple>();
-
-       }
-
-       @Test
-       public void testSelectChannels() {
-               int[] first = new int[] { 0 };
-               int[] second = new int[] { 0, 1 };
-               int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
-               sd.setInstance(streamRecord);
-               assertArrayEquals(first, 
broadcastPartitioner1.selectChannels(sd, 1));
-               assertArrayEquals(second, 
broadcastPartitioner2.selectChannels(sd, 2));
-               assertArrayEquals(sixth, 
broadcastPartitioner3.selectChannels(sd, 6));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
deleted file mode 100644
index 0675242..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class DistributePartitionerTest {
-       
-       private DistributePartitioner<Tuple> distributePartitioner;
-       private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
-       private SerializationDelegate<StreamRecord<Tuple>> sd = new 
SerializationDelegate<StreamRecord<Tuple>>(
-                       null);
-       
-       @Before
-       public void setPartitioner() {
-               distributePartitioner = new DistributePartitioner<Tuple>(false);
-       }
-       
-       @Test
-       public void testSelectChannelsLength() {
-               sd.setInstance(streamRecord);
-               assertEquals(1, distributePartitioner.selectChannels(sd, 
1).length);
-               assertEquals(1, distributePartitioner.selectChannels(sd, 
2).length);
-               assertEquals(1, distributePartitioner.selectChannels(sd, 
1024).length);
-       }
-       
-       @Test
-       public void testSelectChannelsInterval() {
-               sd.setInstance(streamRecord);
-               assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
-               assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
-               assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
-               assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
deleted file mode 100644
index b56649b..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldsPartitionerTest {
-
-       private FieldsPartitioner<Tuple> fieldsPartitioner;
-       private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
-                       .setObject(new Tuple2<String, Integer>("test", 0));
-       private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
-                       .setObject(new Tuple2<String, Integer>("test", 42));
-       private SerializationDelegate<StreamRecord<Tuple>> sd1 = new 
SerializationDelegate<StreamRecord<Tuple>>(
-                       null);
-       private SerializationDelegate<StreamRecord<Tuple>> sd2 = new 
SerializationDelegate<StreamRecord<Tuple>>(
-                       null);
-
-       @Before
-       public void setPartitioner() {
-               fieldsPartitioner = new FieldsPartitioner<Tuple>(new 
KeySelector<Tuple, String>() {
-
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public String getKey(Tuple value) throws Exception {
-                               return value.getField(0);
-                       }
-               });
-       }
-
-       @Test
-       public void testSelectChannelsLength() {
-               sd1.setInstance(streamRecord1);
-               assertEquals(1, fieldsPartitioner.selectChannels(sd1, 
1).length);
-               assertEquals(1, fieldsPartitioner.selectChannels(sd1, 
2).length);
-               assertEquals(1, fieldsPartitioner.selectChannels(sd1, 
1024).length);
-       }
-
-       @Test
-       public void testSelectChannelsGrouping() {
-               sd1.setInstance(streamRecord1);
-               sd2.setInstance(streamRecord2);
-
-               assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1),
-                               fieldsPartitioner.selectChannels(sd2, 1));
-               assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 2),
-                               fieldsPartitioner.selectChannels(sd2, 2));
-               assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1024),
-                               fieldsPartitioner.selectChannels(sd2, 1024));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
deleted file mode 100644
index b381d85..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ForwardPartitionerTest {
-
-       private DistributePartitioner<Tuple> forwardPartitioner;
-       private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
-       private SerializationDelegate<StreamRecord<Tuple>> sd = new 
SerializationDelegate<StreamRecord<Tuple>>(
-                       null);
-
-       @Before
-       public void setPartitioner() {
-               forwardPartitioner = new DistributePartitioner<Tuple>(true);
-       }
-
-       @Test
-       public void testSelectChannelsLength() {
-               sd.setInstance(streamRecord);
-               assertEquals(1, forwardPartitioner.selectChannels(sd, 
1).length);
-               assertEquals(1, forwardPartitioner.selectChannels(sd, 
2).length);
-               assertEquals(1, forwardPartitioner.selectChannels(sd, 
1024).length);
-       }
-
-       @Test
-       public void testSelectChannelsInterval() {
-               sd.setInstance(streamRecord);
-               assertEquals(0, forwardPartitioner.selectChannels(sd, 1)[0]);
-               assertEquals(1, forwardPartitioner.selectChannels(sd, 2)[0]);
-               assertEquals(2, forwardPartitioner.selectChannels(sd, 1024)[0]);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
deleted file mode 100644
index eebda64..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GlobalPartitionerTest {
-
-       private GlobalPartitioner<Tuple> globalPartitioner;
-       private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
-       private SerializationDelegate<StreamRecord<Tuple>> sd = new 
SerializationDelegate<StreamRecord<Tuple>>(
-                       null);
-
-       @Before
-       public void setPartitioner() {
-               globalPartitioner = new GlobalPartitioner<Tuple>();
-       }
-
-       @Test
-       public void testSelectChannels() {
-               int[] result = new int[] { 0 };
-
-               sd.setInstance(streamRecord);
-
-               assertArrayEquals(result, globalPartitioner.selectChannels(sd, 
1));
-               assertArrayEquals(result, globalPartitioner.selectChannels(sd, 
2));
-               assertArrayEquals(result, globalPartitioner.selectChannels(sd, 
1024));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
deleted file mode 100644
index 3c03d07..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ShufflePartitionerTest {
-
-       private ShufflePartitioner<Tuple> shufflePartitioner;
-       private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
-       private SerializationDelegate<StreamRecord<Tuple>> sd = new 
SerializationDelegate<StreamRecord<Tuple>>(
-                       null);
-
-       @Before
-       public void setPartitioner() {
-               shufflePartitioner = new ShufflePartitioner<Tuple>();
-       }
-
-       @Test
-       public void testSelectChannelsLength() {
-               sd.setInstance(streamRecord);
-               assertEquals(1, shufflePartitioner.selectChannels(sd, 
1).length);
-               assertEquals(1, shufflePartitioner.selectChannels(sd, 
2).length);
-               assertEquals(1, shufflePartitioner.selectChannels(sd, 
1024).length);
-       }
-
-       @Test
-       public void testSelectChannelsInterval() {
-               sd.setInstance(streamRecord);
-               assertEquals(0, shufflePartitioner.selectChannels(sd, 1)[0]);
-
-               assertTrue(0 <= shufflePartitioner.selectChannels(sd, 2)[0]);
-               assertTrue(2 > shufflePartitioner.selectChannels(sd, 2)[0]);
-
-               assertTrue(0 <= shufflePartitioner.selectChannels(sd, 1024)[0]);
-               assertTrue(1024 > shufflePartitioner.selectChannels(sd, 
1024)[0]);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
deleted file mode 100644
index 194403c..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
-import org.junit.Test;
-
-public class MapStateTest {
-
-       @Test
-       public void testMapState() {
-
-               Map<String, Integer> map = new HashMap<String, Integer>();
-               map.put("a", 1);
-               map.put("b", 2);
-               map.put("c", 3);
-               map.remove("a");
-
-               MapState<String, Integer> mapState = new MapState<String, 
Integer>();
-               mapState.put("a", 1);
-               mapState.put("b", 2);
-               mapState.put("c", 3);
-
-               assertEquals(1, (int) mapState.remove("a"));
-               assertEquals(null, mapState.remove("a"));
-               assertEquals(2, mapState.size());
-               assertEquals(map, mapState.state);
-               assertEquals(map.entrySet(), mapState.entrySet());
-               assertTrue(mapState.containsKey("b"));
-               assertFalse(mapState.containsKey("a"));
-
-               assertEquals(2, mapState.updatedItems.size());
-               assertEquals(1, mapState.removedItems.size());
-
-               Map<String, Integer> map2 = new HashMap<String, Integer>();
-               map2.put("a", 0);
-               map2.put("e", -1);
-
-               mapState.putAll(map2);
-
-               assertEquals(4, mapState.updatedItems.size());
-               assertEquals(0, mapState.removedItems.size());
-
-               mapState.clear();
-               assertEquals(new HashMap<String, Integer>(), mapState.state);
-               assertTrue(mapState.clear);
-               assertEquals(0, mapState.updatedItems.size());
-               assertEquals(0, mapState.removedItems.size());
-
-               mapState.putAll(map);
-               assertEquals(map.keySet(), mapState.updatedItems);
-
-       }
-
-       @SuppressWarnings("unchecked")
-       @Test
-       public void testMapStateCheckpointing() {
-
-               Map<String, Integer> map = new HashMap<String, Integer>();
-               map.put("a", 1);
-               map.put("b", 2);
-               map.put("c", 3);
-
-               MapState<String, Integer> mapState = new MapState<String, 
Integer>();
-               mapState.putAll(map);
-
-               StateCheckpoint<Map<String, Integer>> mcp = 
mapState.checkpoint();
-               assertEquals(map, mcp.getCheckpointedState());
-
-               Map<String, Integer> map2 = new HashMap<String, Integer>();
-               map2.put("a", 0);
-               map2.put("e", -1);
-
-               mapState.put("a", 0);
-               mapState.put("e", -1);
-               mapState.remove("b");
-               StateCheckpoint<Map<String, Integer>> mcp2 = new 
MapCheckpoint<String, Integer>(mapState);
-               assertEquals(map2, mcp2.getCheckpointedState());
-               mcp.update(mcp2);
-               assertEquals(mapState.state, mcp.getCheckpointedState());
-
-               mapState.clear();
-               mapState.put("a", 1);
-               mapState.put("a", 2);
-               mapState.put("b", -3);
-               mapState.put("c", 0);
-               mapState.remove("b");
-               mcp.update(mapState.checkpoint());
-               assertEquals(mapState.state, mcp.getCheckpointedState());
-
-               MapState<String, Integer> mapState2 = (MapState<String, 
Integer>) new MapState<String, Integer>()
-                               .restore(mcp);
-
-               assertTrue(mapState2.stateEquals(mapState));
-               mapState2.reBuild(mapState2.repartition(10));
-               assertTrue(mapState2.stateEquals(mapState));
-
-               MapState<Integer, Integer> mapState3 = new MapState<Integer, 
Integer>();
-               mapState3.put(1, 1);
-               mapState3.put(2, 1);
-
-               mapState3.reBuild(mapState3.repartition(2)[0]);
-               assertTrue(mapState3.containsKey(2));
-               assertFalse(mapState3.containsKey(1));
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
deleted file mode 100644
index 6cb8f51..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
-import org.junit.Test;
-
-public class OperatorStateTest {
-
-       @Test
-       public void testOperatorState() {
-               OperatorState<Integer> os = new SimpleState<Integer>(5);
-               
-               StateCheckpoint<Integer> scp = os.checkpoint();
-               
-               assertTrue(os.stateEquals(new 
SimpleState<Integer>().restore(scp)));
-               
-               assertEquals((Integer) 5, os.getState());
-               
-               os.setState(10);
-               
-               assertEquals((Integer) 10, os.getState());
-               
-               scp.update(os.checkpoint());
-               
-               assertEquals((Integer)10,scp.getCheckpointedState());
-               
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
deleted file mode 100644
index ea94f98..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
-       // private Collection<IN1> input1;
-       // private Collection<IN2> input2;
-       private Iterator<IN1> inputIterator1;
-       private Iterator<IN2> inputIterator2;
-       private List<OUT> outputs;
-
-       private Collector<OUT> collector;
-       private StreamRecordSerializer<IN1> inDeserializer1;
-       private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> 
mockIterator;
-       private StreamRecordSerializer<IN2> inDeserializer2;
-
-       public MockCoContext(Collection<IN1> input1, Collection<IN2> input2) {
-
-               if (input1.isEmpty() || input2.isEmpty()) {
-                       throw new RuntimeException("Inputs must not be empty");
-               }
-
-               this.inputIterator1 = input1.iterator();
-               this.inputIterator2 = input2.iterator();
-
-               TypeInformation<IN1> inTypeInfo1 = 
TypeExtractor.getForObject(input1.iterator().next());
-               inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1);
-               TypeInformation<IN2> inTypeInfo2 = 
TypeExtractor.getForObject(input2.iterator().next());
-               inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2);
-
-               mockIterator = new MockCoReaderIterator(inDeserializer1, 
inDeserializer2);
-
-               outputs = new ArrayList<OUT>();
-               collector = new MockCollector<OUT>(outputs);
-       }
-
-       private int currentInput = 1;
-       private StreamRecord<IN1> reuse1;
-       private StreamRecord<IN2> reuse2;
-
-       private class MockCoReaderIterator extends
-                       CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
-
-               public MockCoReaderIterator(TypeSerializer<StreamRecord<IN1>> 
serializer1,
-                               TypeSerializer<StreamRecord<IN2>> serializer2) {
-                       super(null, serializer1, serializer2);
-                       reuse1 = inDeserializer1.createInstance();
-                       reuse2 = inDeserializer2.createInstance();
-               }
-
-               @Override
-               public int next(StreamRecord<IN1> target1, StreamRecord<IN2> 
target2) throws IOException {
-                       this.delegate1.setInstance(target1);
-                       this.delegate2.setInstance(target2);
-
-                       int inputNumber = nextRecord();
-                       target1.setObject(reuse1.getObject());
-                       target2.setObject(reuse2.getObject());
-
-                       return inputNumber;
-               }
-       }
-
-       private Integer nextRecord() {
-               if (inputIterator1.hasNext() && inputIterator2.hasNext()) {
-                       switch (currentInput) {
-                       case 1:
-                               return next1();
-                       case 2:
-                               return next2();
-                       default:
-                               return 0;
-                       }
-               }
-
-               if (inputIterator1.hasNext()) {
-                       return next1();
-               }
-
-               if (inputIterator2.hasNext()) {
-                       return next2();
-               }
-
-               return 0;
-       }
-
-       private int next1() {
-               reuse1 = inDeserializer1.createInstance();
-               reuse1.setObject(inputIterator1.next());
-               currentInput = 2;
-               return 1;
-       }
-
-       private int next2() {
-               reuse2 = inDeserializer2.createInstance();
-               reuse2.setObject(inputIterator2.next());
-               currentInput = 1;
-               return 2;
-       }
-
-       public List<OUT> getOutputs() {
-               return outputs;
-       }
-
-       public Collector<OUT> getCollector() {
-               return collector;
-       }
-
-       public StreamRecordSerializer<IN1> getInDeserializer1() {
-               return inDeserializer1;
-       }
-
-       public StreamRecordSerializer<IN2> getInDeserializer2() {
-               return inDeserializer2;
-       }
-
-       public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> 
getIterator() {
-               return mockIterator;
-       }
-
-       public static <IN1, IN2, OUT> List<OUT> 
createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
-                       List<IN1> input1, List<IN2> input2) {
-               MockCoContext<IN1, IN2, OUT> mockContext = new 
MockCoContext<IN1, IN2, OUT>(input1, input2);
-               invokable.setup(mockContext);
-
-               try {
-                       invokable.open(null);
-                       invokable.invoke();
-                       invokable.close();
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot invoke invokable.", 
e);
-               }
-
-               return mockContext.getOutputs();
-       }
-
-       @Override
-       public StreamConfig getConfig() {
-               return null;
-       }
-
-       @Override
-       public ClassLoader getUserCodeClassLoader() {
-               return null;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public <X> MutableObjectIterator<X> getInput(int index) {
-               switch (index) {
-               case 0:
-                       return (MutableObjectIterator<X>) inputIterator1;
-               case 1:
-                       return (MutableObjectIterator<X>) inputIterator2;
-               default:
-                       throw new IllegalArgumentException("CoStreamVertex has 
only 2 inputs");
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
-               switch (index) {
-               case 0:
-                       return (StreamRecordSerializer<X>) inDeserializer1;
-               case 1:
-                       return (StreamRecordSerializer<X>) inDeserializer2;
-               default:
-                       throw new IllegalArgumentException("CoStreamVertex has 
only 2 inputs");
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public <X, Y> CoReaderIterator<X, Y> getCoReader() {
-               return (CoReaderIterator<X, Y>) mockIterator;
-       }
-
-       @Override
-       public Collector<OUT> getOutputCollector() {
-               return collector;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
deleted file mode 100644
index e8b96c5..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.util.Collector;
-
-public class MockCollector<T> implements Collector<T> {
-       private Collection<T> outputs;
-
-       public MockCollector(Collection<T> outputs) {
-               this.outputs = outputs;
-       }
-
-       @Override
-       public void collect(T record) {
-               T copied = SerializationUtils.deserialize(SerializationUtils
-                               .serialize((Serializable) record));
-               outputs.add(copied);
-       }
-
-       @Override
-       public void close() {
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
deleted file mode 100644
index 5537052..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
-       private Collection<IN> inputs;
-       private List<OUT> outputs;
-
-       private Collector<OUT> collector;
-       private StreamRecordSerializer<IN> inDeserializer;
-       private MutableObjectIterator<StreamRecord<IN>> iterator;
-
-       public MockContext(Collection<IN> inputs) {
-               this.inputs = inputs;
-               if (inputs.isEmpty()) {
-                       throw new RuntimeException("Inputs must not be empty");
-               }
-
-               TypeInformation<IN> inTypeInfo = 
TypeExtractor.getForObject(inputs.iterator().next());
-               inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
-
-               iterator = new MockInputIterator();
-               outputs = new ArrayList<OUT>();
-               collector = new MockCollector<OUT>(outputs);
-       }
-
-       private class MockInputIterator implements 
MutableObjectIterator<StreamRecord<IN>> {
-               Iterator<IN> listIterator;
-
-               public MockInputIterator() {
-                       listIterator = inputs.iterator();
-               }
-
-               @Override
-               public StreamRecord<IN> next(StreamRecord<IN> reuse) throws 
IOException {
-                       if (listIterator.hasNext()) {
-                               reuse.setObject(listIterator.next());
-                       } else {
-                               reuse = null;
-                       }
-                       return reuse;
-               }
-
-               @Override
-               public StreamRecord<IN> next() throws IOException {
-                       if (listIterator.hasNext()) {
-                               StreamRecord<IN> result = new 
StreamRecord<IN>();
-                               result.setObject(listIterator.next());
-                               return result;
-                       } else {
-                                return null;
-                       }
-               }
-       }
-
-       public List<OUT> getOutputs() {
-               return outputs;
-       }
-
-       public Collector<OUT> getCollector() {
-               return collector;
-       }
-
-       public StreamRecordSerializer<IN> getInDeserializer() {
-               return inDeserializer;
-       }
-
-       public MutableObjectIterator<StreamRecord<IN>> getIterator() {
-               return iterator;
-       }
-
-       public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, 
OUT> invokable,
-                       List<IN> inputs) {
-               MockContext<IN, OUT> mockContext = new MockContext<IN, 
OUT>(inputs);
-               invokable.setup(mockContext);
-               try {
-                       invokable.open(null);
-                       invokable.invoke();
-                       invokable.close();
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot invoke invokable.", 
e);
-               }
-
-               return mockContext.getOutputs();
-       }
-
-       @Override
-       public StreamConfig getConfig() {
-               return null;
-       }
-
-       @Override
-       public ClassLoader getUserCodeClassLoader() {
-               return null;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public <X> MutableObjectIterator<X> getInput(int index) {
-               if (index == 0) {
-                       return (MutableObjectIterator<X>) iterator;
-               } else {
-                       throw new IllegalArgumentException("There is only 1 
input");
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
-               if (index == 0) {
-                       return (StreamRecordSerializer<X>) inDeserializer;
-               } else {
-                       throw new IllegalArgumentException("There is only 1 
input");
-               }
-       }
-
-       @Override
-       public Collector<OUT> getOutputCollector() {
-               return collector;
-       }
-
-       @Override
-       public <X, Y> CoReaderIterator<X, Y> getCoReader() {
-               throw new IllegalArgumentException("CoReader not available");
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
deleted file mode 100644
index 88673a3..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.mock;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
-import org.mockito.Mockito;
-
-public class MockRecordWriterFactory {
-
-       @SuppressWarnings("unchecked")
-       public static MockRecordWriter create() {
-               MockRecordWriter recWriter = mock(MockRecordWriter.class);
-               
-               Mockito.when(recWriter.initList()).thenCallRealMethod();
-               
doCallRealMethod().when(recWriter).emit(Mockito.any(SerializationDelegate.class));
-               
-               recWriter.initList();
-               
-               return recWriter;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
deleted file mode 100644
index bb92e8e..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-
-public class MockSource<T> {
-
-       public static <T> List<T> createAndExecute(SourceFunction<T> source) {
-               List<T> outputs = new ArrayList<T>();
-               try {
-                       source.invoke(new MockCollector<T>(outputs));
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot invoke source.", e);
-               }
-               return outputs;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
deleted file mode 100644
index 95a5d9b..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import akka.actor.ActorRef;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-public class TestStreamEnvironment extends StreamExecutionEnvironment {
-       private static final String DEFAULT_JOBNAME = "TestStreamingJob";
-       private static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute 
empty job";
-
-       private long memorySize;
-
-       public TestStreamEnvironment(int degreeOfParallelism, long memorySize){
-               this.setDegreeOfParallelism(degreeOfParallelism);
-
-               this.memorySize = memorySize;
-       }
-
-       @Override
-       public void execute() throws Exception {
-               execute(DEFAULT_JOBNAME);
-       }
-
-       @Override
-       public void execute(String jobName) throws Exception {
-               JobGraph jobGraph = streamGraph.getJobGraph(jobName);
-
-               Configuration configuration = jobGraph.getJobConfiguration();
-
-               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-                               getDegreeOfParallelism());
-               
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
-
-               ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(configuration);
-
-               try{
-                       ActorRef client = cluster.getJobClient();
-                       JobClient.submitJobAndWait(jobGraph, false, client, 
cluster.timeout());
-               }catch(JobExecutionException e){
-                       if(e.getMessage().contains("GraphConversionException")){
-                               throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, 
e);
-                       }else{
-                               throw e;
-                       }
-               }finally{
-                       cluster.stop();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
deleted file mode 100644
index 2fb9345..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
deleted file mode 100644
index 4f56748..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
-</configuration>
\ No newline at end of file

Reply via email to