http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
 
b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
deleted file mode 100644
index b9dba77..0000000
--- 
a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.java8.wordcount;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-import java.util.Arrays;
-
-/**
- * Implements the streaming "WordCount" program that computes a simple word 
occurrences
- * over text files.
- *
- * <p>The input is a plain text file with lines separated by newline 
characters.
- *
- * <p>Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>write a compact Flink Streaming program with Java 8 Lambda Expressions.
- * </ul>
- *
- */
-public class WordCount {
-
-       // 
*************************************************************************
-       //     PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up the execution environment
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // get input data
-               DataStream<String> text = getTextDataStream(env);
-
-               DataStream<Tuple2<String, Integer>> counts =
-                               // normalize and split each line
-                               text.map(line -> 
line.toLowerCase().split("\\W+"))
-                               // convert split line in pairs (2-tuples) 
containing: (word,1)
-                               .flatMap((String[] tokens, 
Collector<Tuple2<String, Integer>> out) -> {
-                                       // emit the pairs with non-zero-length 
words
-                                       Arrays.stream(tokens)
-                                       .filter(t -> t.length() > 0)
-                                       .forEach(t -> out.collect(new 
Tuple2<>(t, 1)));
-                               })
-                               // group by the tuple field "0" and sum up 
tuple field "1"
-                               .keyBy(0)
-                               .sum(1);
-
-               // emit result
-               if (fileOutput) {
-                       counts.writeAsCsv(outputPath);
-               } else {
-                       counts.print();
-               }
-
-               // execute program
-               env.execute("Streaming WordCount Example");
-       }
-
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String textPath;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 2) {
-                               textPath = args[0];
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("Usage: WordCount <text 
path> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing WordCount example with 
built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from a file.");
-                       System.out.println("  Usage: WordCount <text path> 
<result path>");
-               }
-               return true;
-       }
-
-       private static DataStream<String> 
getTextDataStream(StreamExecutionEnvironment env) {
-               if (fileOutput) {
-                       // read the text file from given input path
-                       return env.readTextFile(textPath);
-               } else {
-                       // get default test text data
-                       return env.fromElements(WordCountData.WORDS);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
 
b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
deleted file mode 100644
index de1f395..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.type.lambdas;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.lang.reflect.Method;
-
-import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-/**
- * Tests the type extractor for lambda functions.
- */
-@SuppressWarnings("serial")
-public class LambdaExtractionTest {
-
-       private static final TypeInformation<Tuple2<Tuple1<Integer>, Boolean>> 
NESTED_TUPLE_BOOLEAN_TYPE =
-                       new TypeHint<Tuple2<Tuple1<Integer>, 
Boolean>>(){}.getTypeInfo();
-
-       private static final TypeInformation<Tuple2<Tuple1<Integer>, Double>> 
NESTED_TUPLE_DOUBLE_TYPE =
-                       new TypeHint<Tuple2<Tuple1<Integer>, 
Double>>(){}.getTypeInfo();
-
-       @Test
-       public void testIdentifyLambdas() {
-               try {
-                       MapFunction<?, ?> anonymousFromInterface = new 
MapFunction<String, Integer>() {
-                               @Override
-                               public Integer map(String value) {
-                                       return Integer.parseInt(value);
-                               }
-                       };
-
-                       MapFunction<?, ?> anonymousFromClass = new 
RichMapFunction<String, Integer>() {
-                               @Override
-                               public Integer map(String value) {
-                                       return Integer.parseInt(value);
-                               }
-                       };
-
-                       MapFunction<?, ?> fromProperClass = new StaticMapper();
-
-                       MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
-                               @Override
-                               public Tuple2<Integer, Long> map(Integer value) 
{
-                                       return new Tuple2<>(value, 1L);
-                               }
-                       };
-
-                       MapFunction<String, Integer> staticLambda = 
Integer::parseInt;
-                       MapFunction<Integer, String> instanceLambda = 
Object::toString;
-                       MapFunction<String, Integer> constructorLambda = 
Integer::new;
-
-                       
assertNull(checkAndExtractLambda(anonymousFromInterface));
-                       assertNull(checkAndExtractLambda(anonymousFromClass));
-                       assertNull(checkAndExtractLambda(fromProperClass));
-                       assertNull(checkAndExtractLambda(fromDerived));
-                       assertNotNull(checkAndExtractLambda(staticLambda));
-                       assertNotNull(checkAndExtractLambda(instanceLambda));
-                       assertNotNull(checkAndExtractLambda(constructorLambda));
-                       assertNotNull(checkAndExtractLambda(STATIC_LAMBDA));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       private static class StaticMapper implements MapFunction<String, 
Integer> {
-               @Override
-               public Integer map(String value) {
-                       return Integer.parseInt(value);
-               }
-       }
-
-       private interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
-               @Override
-               Tuple2<T, Long> map(T value) throws Exception;
-       }
-
-       private static final MapFunction<String, Integer> STATIC_LAMBDA = 
Integer::parseInt;
-
-       private static class MyClass {
-               private String s = "mystring";
-
-               public MapFunction<Integer, String> getMapFunction() {
-                       return (i) -> s;
-               }
-       }
-
-       @Test
-       public void testLambdaWithMemberVariable() {
-               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new 
MyClass().getMapFunction(), Types.INT);
-               Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
-       }
-
-       @Test
-       public void testLambdaWithLocalVariable() {
-               String s = "mystring";
-               final int k = 24;
-               int j = 26;
-
-               MapFunction<Integer, String> f = (i) -> s + k + j;
-
-               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
Types.INT);
-               Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
-       }
-
-       @Test
-       public void testMapLambda() {
-               MapFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
-
-               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
NESTED_TUPLE_BOOLEAN_TYPE);
-               if (!(ti instanceof MissingTypeInfo)) {
-                       Assert.assertTrue(ti.isTupleType());
-                       Assert.assertEquals(2, ti.getArity());
-                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-               }
-       }
-
-       @Test
-       public void testFlatMapLambda() {
-               FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
-               TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, 
NESTED_TUPLE_BOOLEAN_TYPE);
-               if (!(ti instanceof MissingTypeInfo)) {
-                       Assert.assertTrue(ti.isTupleType());
-                       Assert.assertEquals(2, ti.getArity());
-                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-               }
-       }
-
-       @Test
-       public void testMapPartitionLambda() {
-               MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
-               TypeInformation<?> ti = 
TypeExtractor.getMapPartitionReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-               if (!(ti instanceof MissingTypeInfo)) {
-                       Assert.assertTrue(ti.isTupleType());
-                       Assert.assertEquals(2, ti.getArity());
-                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-               }
-       }
-
-       @Test
-       public void testGroupReduceLambda() {
-               GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
-               TypeInformation<?> ti = 
TypeExtractor.getGroupReduceReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-               if (!(ti instanceof MissingTypeInfo)) {
-                       Assert.assertTrue(ti.isTupleType());
-                       Assert.assertEquals(2, ti.getArity());
-                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-               }
-       }
-
-       @Test
-       public void testFlatJoinLambda() {
-               FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, 
o) -> {};
-
-               TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, 
NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-               if (!(ti instanceof MissingTypeInfo)) {
-                       Assert.assertTrue(ti.isTupleType());
-                       Assert.assertEquals(2, ti.getArity());
-                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-               }
-       }
-
-       @Test
-       public void testJoinLambda() {
-               JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) 
-> null;
-
-               TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, 
NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-               if (!(ti instanceof MissingTypeInfo)) {
-                       Assert.assertTrue(ti.isTupleType());
-                       Assert.assertEquals(2, ti.getArity());
-                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-               }
-       }
-
-       @Test
-       public void testCoGroupLambda() {
-               CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, 
o) -> {};
-
-               TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, 
NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-               if (!(ti instanceof MissingTypeInfo)) {
-                       Assert.assertTrue(ti.isTupleType());
-                       Assert.assertEquals(2, ti.getArity());
-                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-               }
-       }
-
-       @Test
-       public void testCrossLambda() {
-               CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) 
-> null;
-
-               TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, 
NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-               if (!(ti instanceof MissingTypeInfo)) {
-                       Assert.assertTrue(ti.isTupleType());
-                       Assert.assertEquals(2, ti.getArity());
-                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-               }
-       }
-
-       @Test
-       public void testKeySelectorLambda() {
-               KeySelector<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
-
-               TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, 
NESTED_TUPLE_BOOLEAN_TYPE);
-               if (!(ti instanceof MissingTypeInfo)) {
-                       Assert.assertTrue(ti.isTupleType());
-                       Assert.assertEquals(2, ti.getArity());
-                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-               }
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Test
-       public void testLambdaTypeErasure() {
-               MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null;
-               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new 
TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true);
-               Assert.assertTrue(ti instanceof MissingTypeInfo);
-       }
-
-       @Test
-       public void testPartitionerLambda() {
-               Partitioner<Tuple2<Integer, String>> partitioner = (key, 
numPartitions) -> key.f1.length() % numPartitions;
-               final TypeInformation<?> ti = 
TypeExtractor.getPartitionerTypes(partitioner);
-
-               Assert.assertTrue(ti.isTupleType());
-               Assert.assertEquals(2, ti.getArity());
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), 
BasicTypeInfo.INT_TYPE_INFO);
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-       }
-
-       private static class MyType {
-               private int key;
-
-               public int getKey() {
-                       return key;
-               }
-
-               public void setKey(int key) {
-                       this.key = key;
-               }
-
-               protected int getKey2() {
-                       return 0;
-               }
-       }
-
-       @Test
-       public void testInstanceMethodRefSameType() {
-               MapFunction<MyType, Integer> f = MyType::getKey;
-               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
TypeExtractor.createTypeInfo(MyType.class));
-               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-       }
-
-       @Test
-       public void testInstanceMethodRefSuperType() {
-               MapFunction<Integer, String> f = Object::toString;
-               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
BasicTypeInfo.INT_TYPE_INFO);
-               Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
-       }
-
-       private static class MySubtype extends MyType {
-               public boolean test;
-       }
-
-       @Test
-       public void testInstanceMethodRefSuperTypeProtected() {
-               MapFunction<MySubtype, Integer> f = MyType::getKey2;
-               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
TypeExtractor.createTypeInfo(MySubtype.class));
-               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-       }
-
-       @Test
-       public void testConstructorMethodRef() {
-               MapFunction<String, Integer> f = Integer::new;
-               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
BasicTypeInfo.STRING_TYPE_INFO);
-               Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-       }
-
-       private interface InterfaceWithDefaultMethod {
-               void samMethod();
-
-               default void defaultMethod() {
-
-               }
-       }
-
-       @Test
-       public void testSamMethodExtractionInterfaceWithDefaultMethod() {
-               final Method sam = 
TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithDefaultMethod.class);
-               assertNotNull(sam);
-               assertEquals("samMethod", sam.getName());
-       }
-
-       private interface InterfaceWithMultipleMethods {
-               void firstMethod();
-
-               void secondMethod();
-       }
-
-       @Test(expected = InvalidTypesException.class)
-       public void getSingleAbstractMethodMultipleMethods() throws Exception {
-               
TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithMultipleMethods.class);
-       }
-
-       private interface InterfaceWithoutAbstractMethod {
-               default void defaultMethod() {
-
-               }
-       }
-
-       @Test(expected = InvalidTypesException.class)
-       public void getSingleAbstractMethodNoAbstractMethods() throws Exception 
{
-               
TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithoutAbstractMethod.class);
-       }
-
-       private abstract class AbstractClassWithSingleAbstractMethod {
-               public abstract void defaultMethod();
-       }
-
-       @Test(expected = InvalidTypesException.class)
-       public void getSingleAbstractMethodNotAnInterface() throws Exception {
-               
TypeExtractionUtils.getSingleAbstractMethod(AbstractClassWithSingleAbstractMethod.class);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java 
b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
deleted file mode 100644
index 7cbdf6a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for lambda support in CEP.
- */
-public class CEPLambdaTest extends TestLogger {
-       /**
-        * Test event class.
-        */
-       public static class EventA {}
-
-       /**
-        * Test event class.
-        */
-       public static class EventB {}
-
-       /**
-        * Tests that a Java8 lambda can be passed as a CEP select function.
-        */
-       @Test
-       public void testLambdaSelectFunction() {
-               TypeInformation<EventA> eventTypeInformation = 
TypeExtractor.getForClass(EventA.class);
-               TypeInformation<EventB> outputTypeInformation = 
TypeExtractor.getForClass(EventB.class);
-
-               DataStream<EventA> inputStream = new DataStream<>(
-                       StreamExecutionEnvironment.getExecutionEnvironment(),
-                       new SourceTransformation<>(
-                               "source",
-                               null,
-                               eventTypeInformation,
-                               1));
-
-               Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
-
-               PatternStream<EventA> patternStream = new 
PatternStream<>(inputStream, dummyPattern);
-
-               DataStream<EventB> result = patternStream.select(
-                               (Map<String, List<EventA>> map) -> new EventB()
-               );
-
-               assertEquals(outputTypeInformation, result.getType());
-       }
-
-       /**
-        * Tests that a Java8 lambda can be passed as a CEP flat select 
function.
-        */
-       @Test
-       public void testLambdaFlatSelectFunction() {
-               TypeInformation<EventA> eventTypeInformation = 
TypeExtractor.getForClass(EventA.class);
-               TypeInformation<EventB> outputTypeInformation = 
TypeExtractor.getForClass(EventB.class);
-
-               DataStream<EventA> inputStream = new DataStream<>(
-                       StreamExecutionEnvironment.getExecutionEnvironment(),
-                       new SourceTransformation<>(
-                               "source",
-                               null,
-                               eventTypeInformation,
-                               1));
-
-               Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
-
-               PatternStream<EventA> patternStream = new 
PatternStream<>(inputStream, dummyPattern);
-
-               DataStream<EventB> result = patternStream.flatSelect(
-                       (Map<String, List<EventA>> map, Collector<EventB> 
collector) -> collector.collect(new EventB())
-               );
-
-               assertEquals(outputTypeInformation, result.getType());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
 
b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
deleted file mode 100644
index ca11275..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda1;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda2;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda3;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda4;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.jar.JarInputStream;
-import java.util.zip.ZipEntry;
-
-/**
- * Tests for the {@link JarFileCreator}.
- */
-public class JarFileCreatorLambdaTest {
-       @Test
-       public void testFilterFunctionOnLambda1() throws Exception {
-               File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
-               JarFileCreator jfc = new JarFileCreator(out);
-               jfc.addClass(FilterLambda1.class)
-                       .createJarFile();
-
-               Set<String> ans = new HashSet<String>();
-               
ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda1.class");
-               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-
-               Assert.assertTrue("Jar file for java 8 lambda is not correct", 
validate(ans, out));
-               out.delete();
-       }
-
-       @Test
-       public void testFilterFunctionOnLambda2() throws Exception{
-               File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
-               JarFileCreator jfc = new JarFileCreator(out);
-               jfc.addClass(FilterLambda2.class)
-                       .createJarFile();
-
-               Set<String> ans = new HashSet<String>();
-               
ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda2.class");
-               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-
-               Assert.assertTrue("Jar file for java 8 lambda is not correct", 
validate(ans, out));
-               out.delete();
-       }
-
-       @Test
-       public void testFilterFunctionOnLambda3() throws Exception {
-               File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
-               JarFileCreator jfc = new JarFileCreator(out);
-               jfc.addClass(FilterLambda3.class)
-                       .createJarFile();
-
-               Set<String> ans = new HashSet<String>();
-               
ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda3.class");
-               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-               
ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunction.class");
-
-               Assert.assertTrue("Jar file for java 8 lambda is not correct", 
validate(ans, out));
-               out.delete();
-       }
-
-       @Test
-       public void testFilterFunctionOnLambda4() throws Exception {
-               File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
-               JarFileCreator jfc = new JarFileCreator(out);
-               jfc.addClass(FilterLambda4.class)
-                       .createJarFile();
-
-               Set<String> ans = new HashSet<String>();
-               
ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda4.class");
-               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-               
ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper$UtilFunction.class");
-
-               Assert.assertTrue("Jar file for java 8 lambda is not correct", 
validate(ans, out));
-               out.delete();
-       }
-
-       public boolean validate(Set<String> expected, File out) throws 
Exception {
-               int count = expected.size();
-               try (JarInputStream jis = new JarInputStream(new 
FileInputStream(out))) {
-                       ZipEntry ze;
-                       while ((ze = jis.getNextEntry()) != null) {
-                               count--;
-                               expected.remove(ze.getName());
-                       }
-               }
-               return count == 0 && expected.size() == 0;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
 
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
deleted file mode 100644
index 12abff9..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * A lambda filter using a static method.
- */
-public class FilterLambda1 {
-
-       public static void main(String[] args) throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<String> input = env.fromElements("Please filter", "the 
words", "but not this");
-
-               FilterFunction<String> filter = (v) -> WordFilter.filter(v);
-
-               DataSet<String> output = input.filter(filter);
-               output.print();
-
-               env.execute();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
 
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
deleted file mode 100644
index 9555607..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda1}, but the filter lambda is directly passed 
to {@link DataSet#filter(FilterFunction)}.
- */
-public class FilterLambda2 {
-
-       public static void main(String[] args) throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<String> input = env.fromElements("Please filter", "the 
words", "but not this");
-
-               DataSet<String> output = input.filter((v) -> 
WordFilter.filter(v));
-               output.print();
-
-               env.execute();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
 
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
deleted file mode 100644
index b493722..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda2}, but uses a getter to retrieve a lambda 
filter instance.
- */
-public class FilterLambda3 {
-
-       public static void main(String[] args) throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<String> input = env.fromElements("Please filter", "the 
words", "but not this");
-
-               DataSet<String> output = 
input.filter(UtilFunction.getWordFilter());
-               output.print();
-
-               env.execute();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
 
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
deleted file mode 100644
index 606ef5e..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda3} with additional indirection.
- */
-public class FilterLambda4 {
-
-       public static void main(String[] args) throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<String> input = env.fromElements("Please filter", "the 
words", "but not this");
-
-               DataSet<String> output = 
input.filter(UtilFunctionWrapper.UtilFunction.getWordFilter());
-               output.print();
-
-               env.execute();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
 
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
deleted file mode 100644
index 1d5394a..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * Static factory for a lambda filter function.
- */
-public class UtilFunction {
-       public static FilterFunction<String> getWordFilter() {
-               return (v) -> WordFilter.filter(v);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
 
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
deleted file mode 100644
index de8f68a..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * A wrapper around {@link WordFilter} to introduce additional indirection.
- */
-public class UtilFunctionWrapper {
-       /**
-        * Static factory for a lambda filter function.
-        */
-       public static class UtilFunction {
-               public static FilterFunction<String> getWordFilter() {
-                       return (v) -> WordFilter.filter(v);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
 
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
deleted file mode 100644
index 4a5b16f..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-/**
- * Static filter method for lambda tests.
- */
-public class WordFilter {
-       public static boolean filter(String value) {
-               return !value.contains("not");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
deleted file mode 100644
index cee34af..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda allreduce functions.
- */
-public class AllGroupReduceITCase extends JavaProgramTestBase {
-
-       private static final String EXPECTED_RESULT = "aaabacad\n";
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", 
"ad");
-               DataSet<String> concatDs = stringDs.reduceGroup((values, out) 
-> {
-                       String conc = "";
-                       for (String s : values) {
-                               conc = conc.concat(s);
-                       }
-                       out.collect(conc);
-               });
-               concatDs.writeAsText(resultPath);
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
deleted file mode 100644
index a70f37a..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cogroup functions.
- */
-public class CoGroupITCase extends JavaProgramTestBase {
-
-       private static final String EXPECTED_RESULT = "6\n3\n";
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected void testProgram() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<Integer, String>> left = env.fromElements(
-                       new Tuple2<Integer, String>(1, "hello"),
-                       new Tuple2<Integer, String>(2, "what's"),
-                       new Tuple2<Integer, String>(2, "up")
-               );
-               DataSet<Tuple2<Integer, String>> right = env.fromElements(
-                       new Tuple2<Integer, String>(1, "not"),
-                       new Tuple2<Integer, String>(1, "much"),
-                       new Tuple2<Integer, String>(2, "really")
-               );
-               DataSet<Integer> joined = 
left.coGroup(right).where(0).equalTo(0)
-                       .with((values1, values2, out) -> {
-                               int sum = 0;
-                               for (Tuple2<Integer, String> next : values1) {
-                                       sum += next.f0;
-                               }
-                               for (Tuple2<Integer, String> next : values2) {
-                                       sum += next.f0;
-                               }
-                               out.collect(sum);
-                       });
-               joined.writeAsText(resultPath);
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
deleted file mode 100644
index 32cd910..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cross functions.
- */
-public class CrossITCase extends JavaProgramTestBase {
-
-       private static final String EXPECTED_RESULT = "2,hello not\n" +
-                       "3,what's not\n" +
-                       "3,up not\n" +
-                       "2,hello much\n" +
-                       "3,what's much\n" +
-                       "3,up much\n" +
-                       "3,hello really\n" +
-                       "4,what's really\n" +
-                       "4,up really";
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected void testProgram() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<Integer, String>> left = env.fromElements(
-                               new Tuple2<Integer, String>(1, "hello"),
-                               new Tuple2<Integer, String>(2, "what's"),
-                               new Tuple2<Integer, String>(2, "up")
-                               );
-               DataSet<Tuple2<Integer, String>> right = env.fromElements(
-                               new Tuple2<Integer, String>(1, "not"),
-                               new Tuple2<Integer, String>(1, "much"),
-                               new Tuple2<Integer, String>(2, "really")
-                               );
-               DataSet<Tuple2<Integer, String>> joined = left.cross(right)
-                               .with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 
+ " " + s.f1));
-               joined.writeAsCsv(resultPath);
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
deleted file mode 100644
index 6ad1058..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda filter functions.
- */
-public class FilterITCase extends JavaProgramTestBase {
-
-       private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
-                       "4,3,Hello world, how are you?\n";
-
-       public static DataSet<Tuple3<Integer, Long, String>> 
get3TupleDataSet(ExecutionEnvironment env) {
-
-               List<Tuple3<Integer, Long, String>> data = new 
ArrayList<Tuple3<Integer, Long, String>>();
-               data.add(new Tuple3<>(1, 1L, "Hi"));
-               data.add(new Tuple3<>(2, 2L, "Hello"));
-               data.add(new Tuple3<>(3, 2L, "Hello world"));
-               data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
-               data.add(new Tuple3<>(5, 3L, "I am fine."));
-               data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
-               data.add(new Tuple3<>(7, 4L, "Comment#1"));
-               data.add(new Tuple3<>(8, 4L, "Comment#2"));
-               data.add(new Tuple3<>(9, 4L, "Comment#3"));
-               data.add(new Tuple3<>(10, 4L, "Comment#4"));
-               data.add(new Tuple3<>(11, 5L, "Comment#5"));
-               data.add(new Tuple3<>(12, 5L, "Comment#6"));
-               data.add(new Tuple3<>(13, 5L, "Comment#7"));
-               data.add(new Tuple3<>(14, 5L, "Comment#8"));
-               data.add(new Tuple3<>(15, 5L, "Comment#9"));
-               data.add(new Tuple3<>(16, 6L, "Comment#10"));
-               data.add(new Tuple3<>(17, 6L, "Comment#11"));
-               data.add(new Tuple3<>(18, 6L, "Comment#12"));
-               data.add(new Tuple3<>(19, 6L, "Comment#13"));
-               data.add(new Tuple3<>(20, 6L, "Comment#14"));
-               data.add(new Tuple3<>(21, 6L, "Comment#15"));
-
-               Collections.shuffle(data);
-
-               return env.fromCollection(data);
-       }
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple3<Integer, Long, String>> ds = 
get3TupleDataSet(env);
-               DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-                               filter(value -> value.f2.contains("world"));
-               filterDs.writeAsCsv(resultPath);
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
deleted file mode 100644
index f793450..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class FlatJoinITCase extends JavaProgramTestBase {
-
-       private static final String EXPECTED_RESULT = "2,what's really\n" +
-                       "2,up really\n" +
-                       "1,hello not\n" +
-                       "1,hello much\n";
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected void testProgram() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<Integer, String>> left = env.fromElements(
-                               new Tuple2<Integer, String>(1, "hello"),
-                               new Tuple2<Integer, String>(2, "what's"),
-                               new Tuple2<Integer, String>(2, "up")
-                               );
-               DataSet<Tuple2<Integer, String>> right = env.fromElements(
-                               new Tuple2<Integer, String>(1, "not"),
-                               new Tuple2<Integer, String>(1, "much"),
-                               new Tuple2<Integer, String>(2, "really")
-                               );
-               DataSet<Tuple2<Integer, String>> joined = 
left.join(right).where(0).equalTo(0)
-                               .with((t, s, out) -> out.collect(new 
Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1)));
-               joined.writeAsCsv(resultPath);
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
deleted file mode 100644
index d395d7d..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda flatmap functions.
- */
-public class FlatMapITCase extends JavaProgramTestBase {
-
-       private static final String EXPECTED_RESULT = "bb\n" +
-                       "bb\n" +
-                       "bc\n" +
-                       "bd\n";
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", 
"ad");
-               DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> 
out.collect(s.replace("a", "b")));
-               flatMappedDs.writeAsText(resultPath);
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
deleted file mode 100644
index 53db541..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda groupreduce functions.
- */
-public class GroupReduceITCase extends JavaProgramTestBase {
-
-       private static final String EXPECTED_RESULT = "abad\n" +
-                       "aaac\n";
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected void testProgram() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<Integer, String>> stringDs = env.fromElements(
-                               new Tuple2<>(1, "aa"),
-                               new Tuple2<>(2, "ab"),
-                               new Tuple2<>(1, "ac"),
-                               new Tuple2<>(2, "ad")
-                               );
-               DataSet<String> concatDs = stringDs
-                               .groupBy(0)
-                               .reduceGroup((values, out) -> {
-                                       String conc = "";
-                                       for (Tuple2<Integer, String> next : 
values) {
-                                               conc = conc.concat(next.f1);
-                                       }
-                                       out.collect(conc);
-                               });
-               concatDs.writeAsText(resultPath);
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
deleted file mode 100644
index d86ea49..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class JoinITCase extends JavaProgramTestBase {
-
-       private static final String EXPECTED_RESULT = "2,what's really\n" +
-                       "2,up really\n" +
-                       "1,hello not\n" +
-                       "1,hello much\n";
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected void testProgram() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<Integer, String>> left = env.fromElements(
-                               new Tuple2<Integer, String>(1, "hello"),
-                               new Tuple2<Integer, String>(2, "what's"),
-                               new Tuple2<Integer, String>(2, "up")
-                               );
-               DataSet<Tuple2<Integer, String>> right = env.fromElements(
-                               new Tuple2<Integer, String>(1, "not"),
-                               new Tuple2<Integer, String>(1, "much"),
-                               new Tuple2<Integer, String>(2, "really")
-                               );
-               DataSet<Tuple2<Integer, String>> joined = 
left.join(right).where(0).equalTo(0)
-                               .with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + 
s.f1));
-               joined.writeAsCsv(resultPath);
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
deleted file mode 100644
index 15a9b9d..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda map functions.
- */
-public class MapITCase extends JavaProgramTestBase {
-
-       private static class Trade {
-
-               public String v;
-
-               public Trade(String v) {
-                       this.v = v;
-               }
-
-               @Override
-               public String toString() {
-                       return v;
-               }
-       }
-
-       private static final String EXPECTED_RESULT = "22\n" +
-                       "22\n" +
-                       "23\n" +
-                       "24\n";
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
-               DataSet<String> mappedDs = stringDs
-                       .map(Object::toString)
-                       .map (s -> s.replace("1", "2"))
-                       .map(Trade::new)
-                       .map(Trade::toString);
-               mappedDs.writeAsText(resultPath);
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
 
b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
deleted file mode 100644
index 712132c..0000000
--- 
a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda reduce functions.
- */
-public class ReduceITCase extends JavaProgramTestBase {
-
-       private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
-                       "2,3,2,Hallo Welt wie,1\n" +
-                       "2,2,1,Hallo Welt,2\n" +
-                       "3,9,0,P-),2\n" +
-                       "3,6,5,BCD,3\n" +
-                       "4,17,0,P-),1\n" +
-                       "4,17,0,P-),2\n" +
-                       "5,11,10,GHI,1\n" +
-                       "5,29,0,P-),2\n" +
-                       "5,25,0,P-),3\n";
-
-       public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> 
get5TupleDataSet(ExecutionEnvironment env) {
-
-               List<Tuple5<Integer, Long, Integer, String, Long>> data = new 
ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
-               data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
-               data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
-               data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
-               data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
-               data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
-               data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
-               data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
-               data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
-               data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
-               data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
-               data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
-               data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
-               data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
-               data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
-               data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
-
-               Collections.shuffle(data);
-
-               TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> 
type = new
-                               TupleTypeInfo<Tuple5<Integer, Long,  Integer, 
String, Long>>(
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.LONG_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.LONG_TYPE_INFO
-               );
-
-               return env.fromCollection(data, type);
-       }
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
get5TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs 
= ds
-                               .groupBy(4, 0)
-                               .reduce((in1, in2) -> {
-                                       Tuple5<Integer, Long, Integer, String, 
Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
-                                       out.setFields(in1.f0, in1.f1 + in2.f1, 
0, "P-)", in1.f4);
-                                       return out;
-                               });
-
-               reduceDs.writeAsCsv(resultPath);
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/resources/log4j-test.properties 
b/flink-java8/src/test/resources/log4j-test.properties
deleted file mode 100644
index c977d4c..0000000
--- a/flink-java8/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 0e6c2fe..521665f 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -104,8 +104,7 @@ public class PatternStream<T> {
                        PatternSelectFunction.class,
                        0,
                        1,
-                       new int[]{0, 1, 0},
-                       new int[]{},
+                       TypeExtractor.NO_INDEX,
                        inputStream.getType(),
                        null,
                        false);
@@ -173,8 +172,7 @@ public class PatternStream<T> {
                        PatternSelectFunction.class,
                        0,
                        1,
-                       new int[]{0, 1, 0},
-                       new int[]{},
+                       TypeExtractor.NO_INDEX,
                        inputStream.getType(),
                        null,
                        false);
@@ -259,8 +257,7 @@ public class PatternStream<T> {
                        PatternSelectFunction.class,
                        0,
                        1,
-                       new int[]{0, 1, 0},
-                       new int[]{},
+                       TypeExtractor.NO_INDEX,
                        inputStream.getType(),
                        null,
                        false);
@@ -270,8 +267,7 @@ public class PatternStream<T> {
                        PatternTimeoutFunction.class,
                        0,
                        1,
-                       new int[]{0, 1, 0},
-                       new int[]{},
+                       TypeExtractor.NO_INDEX,
                        inputStream.getType(),
                        null,
                        false);
@@ -314,7 +310,6 @@ public class PatternStream<T> {
                        PatternFlatSelectFunction.class,
                        0,
                        1,
-                       new int[] {0, 1, 0},
                        new int[] {1, 0},
                        inputStream.getType(),
                        null,
@@ -381,7 +376,6 @@ public class PatternStream<T> {
                        PatternFlatSelectFunction.class,
                        0,
                        1,
-                       new int[]{0, 1, 0},
                        new int[]{1, 0},
                        inputStream.getType(),
                        null,
@@ -465,7 +459,6 @@ public class PatternStream<T> {
                        PatternFlatTimeoutFunction.class,
                        0,
                        1,
-                       new int[]{0, 1, 0},
                        new int[]{2, 0},
                        inputStream.getType(),
                        null,
@@ -476,7 +469,6 @@ public class PatternStream<T> {
                        PatternFlatSelectFunction.class,
                        0,
                        1,
-                       new int[]{0, 1, 0},
                        new int[]{1, 0},
                        inputStream.getType(),
                        null,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 6d1013c..e397d31 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
@@ -96,19 +97,15 @@ public class CEPITCase extends AbstractTestBase {
                        }
                });
 
-               DataStream<String> result = CEP.pattern(input, 
pattern).select(new PatternSelectFunction<Event, String>() {
+               DataStream<String> result = CEP.pattern(input, 
pattern).flatSelect((p, o) -> {
+                       StringBuilder builder = new StringBuilder();
 
-                       @Override
-                       public String select(Map<String, List<Event>> pattern) {
-                               StringBuilder builder = new StringBuilder();
+                       
builder.append(p.get("start").get(0).getId()).append(",")
+                               
.append(p.get("middle").get(0).getId()).append(",")
+                               .append(p.get("end").get(0).getId());
 
-                               
builder.append(pattern.get("start").get(0).getId()).append(",")
-                                       
.append(pattern.get("middle").get(0).getId()).append(",")
-                                       
.append(pattern.get("end").get(0).getId());
-
-                               return builder.toString();
-                       }
-               });
+                       o.collect(builder.toString());
+               }, Types.STRING);
 
                List<String> resultList = new ArrayList<>();
 
@@ -170,18 +167,14 @@ public class CEPITCase extends AbstractTestBase {
                                }
                        });
 
-               DataStream<String> result = CEP.pattern(input, 
pattern).select(new PatternSelectFunction<Event, String>() {
-
-                       @Override
-                       public String select(Map<String, List<Event>> pattern) {
-                               StringBuilder builder = new StringBuilder();
+               DataStream<String> result = CEP.pattern(input, 
pattern).select(p -> {
+                       StringBuilder builder = new StringBuilder();
 
-                               
builder.append(pattern.get("start").get(0).getId()).append(",")
-                                       
.append(pattern.get("middle").get(0).getId()).append(",")
-                                       
.append(pattern.get("end").get(0).getId());
+                       
builder.append(p.get("start").get(0).getId()).append(",")
+                               
.append(p.get("middle").get(0).getId()).append(",")
+                               .append(p.get("end").get(0).getId());
 
-                               return builder.toString();
-                       }
+                       return builder.toString();
                });
 
                List<String> resultList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index 6dcf766..95310b4 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -78,7 +78,6 @@ public class Translate {
                        TranslateFunction.class,
                        0,
                        1,
-                       new int[]{0},
                        new int[]{1},
                        oldType,
                        null,
@@ -162,7 +161,6 @@ public class Translate {
                        TranslateFunction.class,
                        0,
                        1,
-                       new int[] {0},
                        new int[] {1},
                        oldType,
                        null,
@@ -248,7 +246,6 @@ public class Translate {
                        TranslateFunction.class,
                        0,
                        1,
-                       new int[]{0},
                        new int[]{1},
                        oldType,
                        null,
@@ -332,7 +329,6 @@ public class Translate {
                        TranslateFunction.class,
                        0,
                        1,
-                       new int[]{0},
                        new int[]{1},
                        oldType,
                        null,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 0ca6eb9..33399f8 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -156,25 +156,6 @@ under the License.
                <pluginManagement>
                        <plugins>
 
-                               <!-- If you want to use Java 8 Lambda 
Expressions uncomment the following lines -->
-                               <!--
-                               <plugin>
-                                       
<artifactId>maven-compiler-plugin</artifactId>
-                                       <configuration>
-                                               <source>${java.version}</source>
-                                               <target>${java.version}</target>
-                                               <compilerId>jdt</compilerId>
-                                       </configuration>
-                                       <dependencies>
-                                               <dependency>
-                                                       
<groupId>org.eclipse.tycho</groupId>
-                                                       
<artifactId>tycho-compiler-jdt</artifactId>
-                                                       
<version>0.21.0</version>
-                                               </dependency>
-                                       </dependencies>
-                               </plugin>
-                               -->
-
                                <!-- This improves the out-of-the-box 
experience in Eclipse by resolving some warnings. -->
                                <plugin>
                                        <groupId>org.eclipse.m2e</groupId>

Reply via email to