Repository: flink
Updated Branches:
  refs/heads/release-1.6 402745eba -> 4862101dd


http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
index 8f8016e..f5d07de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
+import org.apache.flink.runtime.util.jartestprogram.FilterWithIndirection;
+import org.apache.flink.runtime.util.jartestprogram.FilterWithLambda;
+import org.apache.flink.runtime.util.jartestprogram.FilterWithMethodReference;
 import 
org.apache.flink.runtime.util.jartestprogram.WordCountWithAnonymousClass;
 import org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass;
 import 
org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass2;
@@ -28,6 +30,7 @@ import 
org.apache.flink.runtime.util.jartestprogram.AnonymousInNonStaticMethod;
 import 
org.apache.flink.runtime.util.jartestprogram.AnonymousInNonStaticMethod2;
 import org.apache.flink.runtime.util.jartestprogram.NestedAnonymousInnerClass;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import java.io.File;
 import java.io.FileInputStream;
@@ -37,7 +40,6 @@ import java.util.Set;
 import java.util.jar.JarInputStream;
 import java.util.zip.ZipEntry;
 
-
 public class JarFileCreatorTest {
 
        //anonymous inner class in static method accessing a local variable in 
its closure.
@@ -48,14 +50,14 @@ public class JarFileCreatorTest {
                jfc.addClass(AnonymousInStaticMethod.class)
                        .createJarFile();
 
-               Set<String> ans = new HashSet<String>();
+               Set<String> ans = new HashSet<>();
                
ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod$1.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod$A.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod.class");
 
                Assert.assertTrue("Jar file for Anonymous Inner Class is not 
correct", validate(ans, out));
 
-               out.delete();
+               Assert.assertTrue(out.delete());
        }
 
        //anonymous inner class in non static method accessing a local variable 
in its closure.
@@ -66,14 +68,14 @@ public class JarFileCreatorTest {
                jfc.addClass(AnonymousInNonStaticMethod.class)
                        .createJarFile();
 
-               Set<String> ans = new HashSet<String>();
+               Set<String> ans = new HashSet<>();
                
ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod$1.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod$A.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod.class");
 
                Assert.assertTrue("Jar file for Anonymous Inner Class is not 
correct", validate(ans, out));
 
-               out.delete();
+               Assert.assertTrue(out.delete());
        }
 
        //anonymous inner class in non static method accessing a field of its 
enclosing class.
@@ -84,14 +86,14 @@ public class JarFileCreatorTest {
                jfc.addClass(AnonymousInNonStaticMethod2.class)
                        .createJarFile();
 
-               Set<String> ans = new HashSet<String>();
+               Set<String> ans = new HashSet<>();
                
ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2$1.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2$A.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2.class");
 
                Assert.assertTrue("Jar file for Anonymous Inner Class is not 
correct", validate(ans, out));
 
-               out.delete();
+               Assert.assertTrue(out.delete());
        }
 
        //anonymous inner class in an anonymous inner class accessing a field 
of the outermost enclosing class.
@@ -102,7 +104,7 @@ public class JarFileCreatorTest {
                jfc.addClass(NestedAnonymousInnerClass.class)
                        .createJarFile();
 
-               Set<String> ans = new HashSet<String>();
+               Set<String> ans = new HashSet<>();
                
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1$1.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1.class");
@@ -110,7 +112,54 @@ public class JarFileCreatorTest {
 
                Assert.assertTrue("Jar file for Anonymous Inner Class is not 
correct", validate(ans, out));
 
-               out.delete();
+               Assert.assertTrue(out.delete());
+       }
+
+       @Ignore // this is currently not supported (see FLINK-9520)
+       @Test
+       public void testFilterWithMethodReference() throws Exception {
+               File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
+               JarFileCreator jfc = new JarFileCreator(out);
+               jfc.addClass(FilterWithMethodReference.class)
+                       .createJarFile();
+
+               Set<String> ans = new HashSet<>();
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
+
+               Assert.assertTrue("Jar file for Java 8 method reference is not 
correct", validate(ans, out));
+               Assert.assertTrue(out.delete());
+       }
+
+       @Test
+       public void testFilterWithLambda() throws Exception{
+               File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
+               JarFileCreator jfc = new JarFileCreator(out);
+               jfc.addClass(FilterWithLambda.class)
+                       .createJarFile();
+
+               Set<String> ans = new HashSet<>();
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
+
+               Assert.assertTrue("Jar file for Java 8 lambda is not correct", 
validate(ans, out));
+               Assert.assertTrue(out.delete());
+       }
+
+       @Test
+       public void testFilterWithIndirection() throws Exception {
+               File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
+               JarFileCreator jfc = new JarFileCreator(out);
+               jfc.addClass(FilterWithIndirection.class)
+                       .createJarFile();
+
+               Set<String> ans = new HashSet<>();
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
+               
ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper$UtilFunction.class");
+
+               Assert.assertTrue("Jar file for java 8 lambda is not correct", 
validate(ans, out));
+               Assert.assertTrue(out.delete());
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -123,14 +172,14 @@ public class JarFileCreatorTest {
                jfc.addClass(WordCountWithExternalClass.class)
                        .createJarFile();
 
-               Set<String> ans = new HashSet<String>();
+               Set<String> ans = new HashSet<>();
                
ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.class");
 
                Assert.assertTrue("Jar file for External Class is not correct", 
validate(ans, out));
 
-               out.delete();
+               Assert.assertTrue(out.delete());
        }
 
        @Test
@@ -140,14 +189,14 @@ public class JarFileCreatorTest {
                jfc.addClass(WordCountWithInnerClass.class)
                        .createJarFile();
 
-               Set<String> ans = new HashSet<String>();
+               Set<String> ans = new HashSet<>();
                
ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
 
                Assert.assertTrue("Jar file for Inner Class is not correct", 
validate(ans, out));
 
-               out.delete();
+               Assert.assertTrue(out.delete());
        }
 
        @Test
@@ -157,14 +206,14 @@ public class JarFileCreatorTest {
                jfc.addClass(WordCountWithAnonymousClass.class)
                        .createJarFile();
 
-               Set<String> ans = new HashSet<String>();
+               Set<String> ans = new HashSet<>();
                
ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass$1.class");
 
                Assert.assertTrue("Jar file for Anonymous Class is not 
correct", validate(ans, out));
 
-               out.delete();
+               Assert.assertTrue(out.delete());
        }
 
        @Test
@@ -174,7 +223,7 @@ public class JarFileCreatorTest {
                jfc.addClass(WordCountWithExternalClass2.class)
                        .createJarFile();
 
-               Set<String> ans = new HashSet<String>();
+               Set<String> ans = new HashSet<>();
                
ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.class");
@@ -182,7 +231,7 @@ public class JarFileCreatorTest {
 
                Assert.assertTrue("Jar file for Extend Identifier is not 
correct", validate(ans, out));
 
-               out.delete();
+               Assert.assertTrue(out.delete());
        }
 
        @Test
@@ -193,7 +242,7 @@ public class JarFileCreatorTest {
                        .addPackage("org.apache.flink.util")
                        .createJarFile();
 
-               Set<String> ans = new HashSet<String>();
+               Set<String> ans = new HashSet<>();
                
ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class");
                
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
@@ -201,7 +250,7 @@ public class JarFileCreatorTest {
 
                Assert.assertTrue("Jar file for UDF package is not correct", 
validate(ans, out));
 
-               out.delete();
+               Assert.assertTrue(out.delete());
        }
 
        private boolean validate(Set<String> expected, File out) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java
new file mode 100644
index 0000000..12026e9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Filter with additional indirections.
+ */
+public class FilterWithIndirection {
+
+       public static void main(String[] args) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<String> input = env.fromElements("Please filter", "the 
words", "but not this");
+
+               DataSet<String> output = 
input.filter(UtilFunctionWrapper.UtilFunction.getWordFilter());
+               output.print();
+
+               env.execute();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java
new file mode 100644
index 0000000..ffa5756
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Filter with lambda that is directly passed to {@link 
DataSet#filter(FilterFunction)}.
+ */
+public class FilterWithLambda {
+
+       @SuppressWarnings("Convert2MethodRef")
+       public static void main(String[] args) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<String> input = env.fromElements("Please filter", "the 
words", "but not this");
+
+               DataSet<String> output = input.filter((v) -> 
WordFilter.filter(v));
+               output.print();
+
+               env.execute();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java
new file mode 100644
index 0000000..ddb76b4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * A lambda filter using a static method.
+ */
+public class FilterWithMethodReference {
+
+       public static void main(String[] args) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<String> input = env.fromElements("Please filter", "the 
words", "but not this");
+
+               FilterFunction<String> filter = WordFilter::filter;
+
+               DataSet<String> output = input.filter(filter);
+               output.print();
+
+               env.execute();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
new file mode 100644
index 0000000..89fca0d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * Static factory for a lambda filter function.
+ */
+public class UtilFunction {
+
+       @SuppressWarnings("Convert2MethodRef")
+       public static FilterFunction<String> getWordFilter() {
+               return (v) -> WordFilter.filter(v);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
new file mode 100644
index 0000000..498c4cf
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A wrapper around {@link WordFilter} to introduce additional indirection.
+ */
+public class UtilFunctionWrapper {
+
+       /**
+        * Static factory for a lambda filter function.
+        */
+       public static class UtilFunction {
+               @SuppressWarnings("Convert2MethodRef")
+               public static FilterFunction<String> getWordFilter() {
+                       return (v) -> WordFilter.filter(v);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
new file mode 100644
index 0000000..2d072cb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+/**
+ * Static filter method for lambda tests.
+ */
+public class WordFilter {
+
+       public static boolean filter(String value) {
+               return !value.contains("not");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 7fb3822..3f935e3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -521,7 +521,6 @@ public class AllWindowedStream<T, W extends Window> {
                        AllWindowFunction.class,
                        0,
                        1,
-                       new int[]{1, 0},
                        new int[]{2, 0},
                        inType,
                        null,
@@ -537,7 +536,6 @@ public class AllWindowedStream<T, W extends Window> {
                        0,
                        1,
                        TypeExtractor.NO_INDEX,
-                       TypeExtractor.NO_INDEX,
                        inType,
                        null,
                        false);

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
index cb18a3f..a3e28ab 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
@@ -72,7 +72,6 @@ public class AsyncDataStream {
                        AsyncFunction.class,
                        0,
                        1,
-                       new int[]{0},
                        new int[]{1, 0},
                        in.getType(),
                        Utils.getCallLocationName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
index cb7d8c9..30047cb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
@@ -131,8 +131,6 @@ public class BroadcastConnectedStream<IN1, IN2> {
                                2,
                                3,
                                TypeExtractor.NO_INDEX,
-                               TypeExtractor.NO_INDEX,
-                               TypeExtractor.NO_INDEX,
                                getType1(),
                                getType2(),
                                Utils.getCallLocationName(),
@@ -183,8 +181,6 @@ public class BroadcastConnectedStream<IN1, IN2> {
                                1,
                                2,
                                TypeExtractor.NO_INDEX,
-                               TypeExtractor.NO_INDEX,
-                               TypeExtractor.NO_INDEX,
                                getType1(),
                                getType2(),
                                Utils.getCallLocationName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index c2ebdf4..55009e1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -95,9 +96,24 @@ public class CoGroupedStreams<T1, T2> {
 
        /**
         * Specifies a {@link KeySelector} for elements from the first input.
+        *
+        * @param keySelector The KeySelector to be used for extracting the 
first input's key for partitioning.
         */
        public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
-               TypeInformation<KEY> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+               Preconditions.checkNotNull(keySelector);
+               final TypeInformation<KEY> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+               return where(keySelector, keyType);
+       }
+
+       /**
+        * Specifies a {@link KeySelector} for elements from the first input 
with explicit type information.
+        *
+        * @param keySelector The KeySelector to be used for extracting the 
first input's key for partitioning.
+        * @param keyType The type information describing the key type.
+        */
+       public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, 
TypeInformation<KEY> keyType)  {
+               Preconditions.checkNotNull(keySelector);
+               Preconditions.checkNotNull(keyType);
                return new Where<>(input1.clean(keySelector), keyType);
        }
 
@@ -121,12 +137,28 @@ public class CoGroupedStreams<T1, T2> {
 
                /**
                 * Specifies a {@link KeySelector} for elements from the second 
input.
+                *
+                * @param keySelector The KeySelector to be used for extracting 
the second input's key for partitioning.
                 */
                public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
-                       TypeInformation<KEY> otherKey = 
TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
-                       if (!otherKey.equals(this.keyType)) {
+                       Preconditions.checkNotNull(keySelector);
+                       final TypeInformation<KEY> otherKey = 
TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+                       return equalTo(keySelector, otherKey);
+               }
+
+               /**
+                * Specifies a {@link KeySelector} for elements from the second 
input with explicit type information for the key type.
+                *
+                * @param keySelector The KeySelector to be used for extracting 
the key for partitioning.
+                * @param keyType The type information describing the key type.
+                */
+               public EqualTo equalTo(KeySelector<T2, KEY> keySelector, 
TypeInformation<KEY> keyType)  {
+                       Preconditions.checkNotNull(keySelector);
+                       Preconditions.checkNotNull(keyType);
+
+                       if (!keyType.equals(this.keyType)) {
                                throw new IllegalArgumentException("The keys 
for the two inputs are not equal: " +
-                                               "first key = " + this.keyType + 
" , second key = " + otherKey);
+                                               "first key = " + this.keyType + 
" , second key = " + keyType);
                        }
 
                        return new EqualTo(input2.clean(keySelector));

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index e244bd2..0ada54a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -192,6 +192,28 @@ public class ConnectedStreams<IN1, IN2> {
        }
 
        /**
+        * KeyBy operation for connected data stream. Assigns keys to the 
elements of
+        * input1 and input2 using keySelector1 and keySelector2 with explicit 
type information
+        * for the common key type.
+        *
+        * @param keySelector1
+        *            The {@link KeySelector} used for grouping the first input
+        * @param keySelector2
+        *            The {@link KeySelector} used for grouping the second input
+        * @param keyType The type information of the common key type.
+        * @return The partitioned {@link ConnectedStreams}
+        */
+       public <KEY> ConnectedStreams<IN1, IN2> keyBy(
+                       KeySelector<IN1, KEY> keySelector1,
+                       KeySelector<IN2, KEY> keySelector2,
+                       TypeInformation<KEY> keyType) {
+               return new ConnectedStreams<>(
+                       environment,
+                       inputStream1.keyBy(keySelector1, keyType),
+                       inputStream2.keyBy(keySelector2, keyType));
+       }
+
+       /**
         * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
         * the output to a common type. The transformation calls a
         * {@link CoMapFunction#map1} for each element of the first input and
@@ -210,8 +232,6 @@ public class ConnectedStreams<IN1, IN2> {
                        1,
                        2,
                        TypeExtractor.NO_INDEX,
-                       TypeExtractor.NO_INDEX,
-                       TypeExtractor.NO_INDEX,
                        getType1(),
                        getType2(),
                        Utils.getCallLocationName(),
@@ -244,8 +264,6 @@ public class ConnectedStreams<IN1, IN2> {
                        1,
                        2,
                        TypeExtractor.NO_INDEX,
-                       TypeExtractor.NO_INDEX,
-                       TypeExtractor.NO_INDEX,
                        getType1(),
                        getType2(),
                        Utils.getCallLocationName(),
@@ -281,8 +299,6 @@ public class ConnectedStreams<IN1, IN2> {
                        1,
                        2,
                        TypeExtractor.NO_INDEX,
-                       TypeExtractor.NO_INDEX,
-                       TypeExtractor.NO_INDEX,
                        getType1(),
                        getType2(),
                        Utils.getCallLocationName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 78ba8e4..8e24ad7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -286,10 +286,25 @@ public class DataStream<T> {
         * @return The {@link DataStream} with partitioned state (i.e. 
KeyedStream)
         */
        public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
+               Preconditions.checkNotNull(key);
                return new KeyedStream<>(this, clean(key));
        }
 
        /**
+        * It creates a new {@link KeyedStream} that uses the provided key with 
explicit type information
+        * for partitioning its operator states.
+        *
+        * @param key The KeySelector to be used for extracting the key for 
partitioning.
+        * @param keyType The type information describing the key type.
+        * @return The {@link DataStream} with partitioned state (i.e. 
KeyedStream)
+        */
+       public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, 
TypeInformation<K> keyType) {
+               Preconditions.checkNotNull(key);
+               Preconditions.checkNotNull(keyType);
+               return new KeyedStream<>(this, clean(key), keyType);
+       }
+
+       /**
         * Partitions the operator state of a {@link DataStream} by the given 
key positions.
         *
         * @param fields
@@ -621,7 +636,6 @@ public class DataStream<T> {
                        0,
                        1,
                        TypeExtractor.NO_INDEX,
-                       TypeExtractor.NO_INDEX,
                        getType(),
                        Utils.getCallLocationName(),
                        true);

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
index f614ab0..3d22275 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -206,5 +206,9 @@ public class IterativeStream<T> extends 
SingleOutputStreamOperator<T> {
                        throw groupingException;
                }
 
+               @Override
+               public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> 
keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) {
+                       throw groupingException;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index 088fab9..bb67c09 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -79,9 +79,24 @@ public class JoinedStreams<T1, T2> {
 
        /**
         * Specifies a {@link KeySelector} for elements from the first input.
+        *
+        * @param keySelector The KeySelector to be used for extracting the key 
for partitioning.
         */
        public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
-               TypeInformation<KEY> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+               requireNonNull(keySelector);
+               final TypeInformation<KEY> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+               return where(keySelector, keyType);
+       }
+
+       /**
+        * Specifies a {@link KeySelector} for elements from the first input 
with explicit type information for the key type.
+        *
+        * @param keySelector The KeySelector to be used for extracting the 
first input's key for partitioning.
+        * @param keyType The type information describing the key type.
+        */
+       public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, 
TypeInformation<KEY> keyType)  {
+               requireNonNull(keySelector);
+               requireNonNull(keyType);
                return new Where<>(input1.clean(keySelector), keyType);
        }
 
@@ -105,12 +120,28 @@ public class JoinedStreams<T1, T2> {
 
                /**
                 * Specifies a {@link KeySelector} for elements from the second 
input.
+                *
+                * @param keySelector The KeySelector to be used for extracting 
the second input's key for partitioning.
                 */
                public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
-                       TypeInformation<KEY> otherKey = 
TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
-                       if (!otherKey.equals(this.keyType)) {
+                       requireNonNull(keySelector);
+                       final TypeInformation<KEY> otherKey = 
TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+                       return equalTo(keySelector, otherKey);
+               }
+
+               /**
+                * Specifies a {@link KeySelector} for elements from the second 
input with explicit type information for the key type.
+                *
+                * @param keySelector The KeySelector to be used for extracting 
the second input's key for partitioning.
+                * @param keyType The type information describing the key type.
+                */
+               public EqualTo equalTo(KeySelector<T2, KEY> keySelector, 
TypeInformation<KEY> keyType)  {
+                       requireNonNull(keySelector);
+                       requireNonNull(keyType);
+
+                       if (!keyType.equals(this.keyType)) {
                                throw new IllegalArgumentException("The keys 
for the two inputs are not equal: " +
-                                               "first key = " + this.keyType + 
" , second key = " + otherKey);
+                                               "first key = " + this.keyType + 
" , second key = " + keyType);
                        }
 
                        return new EqualTo(input2.clean(keySelector));
@@ -226,8 +257,6 @@ public class JoinedStreams<T1, T2> {
                                0,
                                1,
                                2,
-                               new int[]{0},
-                               new int[]{1},
                                TypeExtractor.NO_INDEX,
                                input1.getType(),
                                input2.getType(),
@@ -309,8 +338,6 @@ public class JoinedStreams<T1, T2> {
                                0,
                                1,
                                2,
-                               new int[]{0},
-                               new int[]{1},
                                new int[]{2, 0},
                                input1.getType(),
                                input2.getType(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 32a5c96..84df716 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -70,6 +70,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -305,7 +306,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
                        0,
                        1,
                        TypeExtractor.NO_INDEX,
-                       TypeExtractor.NO_INDEX,
                        getType(),
                        Utils.getCallLocationName(),
                        true);
@@ -366,7 +366,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
                                1,
                                2,
                                TypeExtractor.NO_INDEX,
-                               TypeExtractor.NO_INDEX,
                                getType(),
                                Utils.getCallLocationName(),
                                true);
@@ -480,8 +479,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
        @PublicEvolving
        public static class IntervalJoined<IN1, IN2, KEY> {
 
-               private static final String INTERVAL_JOIN_FUNC_NAME = 
"IntervalJoin";
-
                private final KeyedStream<IN1, KEY> left;
                private final KeyedStream<IN2, KEY> right;
 
@@ -534,33 +531,52 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
                }
 
                /**
-                * Completes the join operation with the user function that is 
executed for each joined pair
+                * Completes the join operation with the given user function 
that is executed for each joined pair
                 * of elements.
-                * @param udf The user-defined function
-                * @param <OUT> The output type
-                * @return Returns a DataStream
+                *
+                * @param processJoinFunction The user-defined process join 
function.
+                * @param <OUT> The output type.
+                * @return The transformed {@link DataStream}.
                 */
                @PublicEvolving
-               public <OUT> DataStream<OUT> process(ProcessJoinFunction<IN1, 
IN2, OUT> udf) {
-
-                       ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = 
left.getExecutionEnvironment().clean(udf);
+               public <OUT> SingleOutputStreamOperator<OUT> 
process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
+                       Preconditions.checkNotNull(processJoinFunction);
 
-                       TypeInformation<OUT> resultType = 
TypeExtractor.getBinaryOperatorReturnType(
-                               cleanedUdf,
-                               ProcessJoinFunction.class,    // 
ProcessJoinFunction<IN1, IN2, OUT>
-                               0,     //                                       
    0    1    2
+                       final TypeInformation<OUT> outputType = 
TypeExtractor.getBinaryOperatorReturnType(
+                               processJoinFunction,
+                               ProcessJoinFunction.class,
+                               0,
                                1,
                                2,
-                               new int[]{0},                   // lambda input 
1 type arg indices
-                               new int[]{1},                   // lambda input 
1 type arg indices
-                               TypeExtractor.NO_INDEX,         // output arg 
indices
-                               left.getType(),                 // input 1 type 
information
-                               right.getType(),                // input 2 type 
information
-                               INTERVAL_JOIN_FUNC_NAME ,
-                               false
+                               TypeExtractor.NO_INDEX,
+                               left.getType(),
+                               right.getType(),
+                               Utils.getCallLocationName(),
+                               true
                        );
 
-                       IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
+                       return process(processJoinFunction, outputType);
+               }
+
+               /**
+                * Completes the join operation with the given user function 
that is executed for each joined pair
+                * of elements. This methods allows for passing explicit type 
information for the output type.
+                *
+                * @param processJoinFunction The user-defined process join 
function.
+                * @param outputType The type information for the output type.
+                * @param <OUT> The output type.
+                * @return The transformed {@link DataStream}.
+                */
+               @PublicEvolving
+               public <OUT> SingleOutputStreamOperator<OUT> process(
+                               ProcessJoinFunction<IN1, IN2, OUT> 
processJoinFunction,
+                               TypeInformation<OUT> outputType) {
+                       Preconditions.checkNotNull(processJoinFunction);
+                       Preconditions.checkNotNull(outputType);
+
+                       final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = 
left.getExecutionEnvironment().clean(processJoinFunction);
+
+                       final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator 
=
                                new IntervalJoinOperator<>(
                                        lowerBound,
                                        upperBound,
@@ -574,8 +590,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
                        return left
                                .connect(right)
                                .keyBy(keySelector1, keySelector2)
-                               .transform(INTERVAL_JOIN_FUNC_NAME , 
resultType, operator);
-
+                               .transform("Interval Join", outputType, 
operator);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 50c6f1a..1f09b73 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -896,7 +896,6 @@ public class WindowedStream<T, K, W extends Window> {
                        WindowFunction.class,
                        0,
                        1,
-                       new int[]{2, 0},
                        new int[]{3, 0},
                        inType,
                        null,
@@ -913,7 +912,6 @@ public class WindowedStream<T, K, W extends Window> {
                        0,
                        1,
                        TypeExtractor.NO_INDEX,
-                       TypeExtractor.NO_INDEX,
                        inType,
                        functionName,
                        false);

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 5baa980..cfb5adc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -21,12 +21,16 @@ import 
org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
@@ -35,7 +39,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for {@link TypeFill}.
+ * Tests for handling missing type information either by calling {@code 
returns()} or having an
+ * explicit type information parameter.
  */
 @SuppressWarnings("serial")
 public class TypeFillTest {
@@ -43,6 +48,7 @@ public class TypeFillTest {
        @Test
        public void test() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
                try {
                        env.addSource(new TestSource<Integer>()).print();
@@ -71,12 +77,53 @@ public class TypeFillTest {
                        fail();
                } catch (Exception ignored) {}
 
+               try {
+                       source.keyBy(new TestKeySelector<Long, 
String>()).print();
+                       fail();
+               } catch (Exception ignored) {}
+
+               try {
+                       source.connect(source).keyBy(new TestKeySelector<Long, 
String>(), new TestKeySelector<>());
+                       fail();
+               } catch (Exception ignored) {}
+
+               try {
+                       source.coGroup(source).where(new 
TestKeySelector<>()).equalTo(new TestKeySelector<>());
+                       fail();
+               } catch (Exception ignored) {}
+
+               try {
+                       source.join(source).where(new 
TestKeySelector<>()).equalTo(new TestKeySelector<>());
+                       fail();
+               } catch (Exception ignored) {}
+
+               try {
+                       source.keyBy((in) -> in)
+                               .intervalJoin(source.keyBy((in) -> in))
+                               .between(Time.milliseconds(10L), 
Time.milliseconds(10L))
+                               .process(new TestProcessJoinFunction<>())
+                               .print();
+                       fail();
+               } catch (Exception ignored) {}
+
                env.addSource(new TestSource<Integer>()).returns(Integer.class);
                source.map(new TestMap<Long, 
Long>()).returns(Long.class).print();
                source.flatMap(new TestFlatMap<Long, Long>()).returns(new 
TypeHint<Long>(){}).print();
                source.connect(source).map(new TestCoMap<Long, Long, 
Integer>()).returns(BasicTypeInfo.INT_TYPE_INFO).print();
                source.connect(source).flatMap(new TestCoFlatMap<Long, Long, 
Integer>())
                                .returns(BasicTypeInfo.INT_TYPE_INFO).print();
+               source.connect(source).keyBy(new TestKeySelector<>(), new 
TestKeySelector<>(), Types.STRING);
+               source.coGroup(source).where(new TestKeySelector<>(), 
Types.STRING).equalTo(new TestKeySelector<>(), Types.STRING);
+               source.join(source).where(new TestKeySelector<>(), 
Types.STRING).equalTo(new TestKeySelector<>(), Types.STRING);
+               source.keyBy((in) -> in)
+                       .intervalJoin(source.keyBy((in) -> in))
+                       .between(Time.milliseconds(10L), Time.milliseconds(10L))
+                       .process(new TestProcessJoinFunction<Long, Long, 
String>())
+                       .returns(Types.STRING);
+               source.keyBy((in) -> in)
+                       .intervalJoin(source.keyBy((in) -> in))
+                       .between(Time.milliseconds(10L), Time.milliseconds(10L))
+                       .process(new TestProcessJoinFunction<>(), Types.STRING);
 
                assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
                                source.map(new TestMap<Long, 
Long>()).returns(Long.class).getType());
@@ -142,4 +189,20 @@ public class TypeFillTest {
                public void flatMap2(IN2 value, Collector<OUT> out) throws 
Exception {}
 
        }
+
+       private static class TestKeySelector<IN, KEY> implements 
KeySelector<IN, KEY> {
+
+               @Override
+               public KEY getKey(IN value) throws Exception {
+                       return null;
+               }
+       }
+
+       private static class TestProcessJoinFunction<IN1, IN2, OUT> extends 
ProcessJoinFunction<IN1, IN2, OUT> {
+
+               @Override
+               public void processElement(IN1 left, IN2 right, Context ctx, 
Collector<OUT> out) throws Exception {
+                       // nothing to do
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
index 453f525..21583fd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -669,6 +669,39 @@ public class CoGroupITCase extends 
MultipleProgramsTestBase {
                compareResultAsTuples(result, expected);
        }
 
+       @Test
+       public void testCoGroupLambda() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<Integer, String>> left = env.fromElements(
+                       new Tuple2<>(1, "hello"),
+                       new Tuple2<>(2, "what's"),
+                       new Tuple2<>(2, "up")
+               );
+               DataSet<Tuple2<Integer, String>> right = env.fromElements(
+                       new Tuple2<>(1, "not"),
+                       new Tuple2<>(1, "much"),
+                       new Tuple2<>(2, "really")
+               );
+               DataSet<Integer> joined = 
left.coGroup(right).where(0).equalTo(0)
+                       .with((Iterable<Tuple2<Integer, String>> values1, 
Iterable<Tuple2<Integer, String>> values2,
+                                       Collector<Integer> out) -> {
+                               int sum = 0;
+                               for (Tuple2<Integer, String> next : values1) {
+                                       sum += next.f0;
+                               }
+                               for (Tuple2<Integer, String> next : values2) {
+                                       sum += next.f0;
+                               }
+                               out.collect(sum);
+                       }).returns(Integer.class);
+               List<Integer> result = joined.collect();
+
+               String expected = "6\n3\n";
+
+               compareResultAsText(result, expected);
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  UDF classes
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
index dfb3efb..f145555 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
@@ -515,4 +515,38 @@ public class MapITCase extends MultipleProgramsTestBase {
                        return value;
                }
        }
+
+       @Test
+       public void testMapWithLambdas() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
+               DataSet<String> mappedDs = stringDs
+                       .map(Object::toString)
+                       .map(s -> s.replace("1", "2"))
+                       .map(Trade::new)
+                       .map(Trade::toString);
+               List<String> result = mappedDs.collect();
+
+               String expected = "22\n" +
+                       "22\n" +
+                       "23\n" +
+                       "24\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class Trade {
+
+               public String v;
+
+               public Trade(String v) {
+                       this.v = v;
+               }
+
+               @Override
+               public String toString() {
+                       return v;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
index 2d6897b..750769c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
@@ -90,7 +90,11 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs 
= ds.
-                               groupBy(4, 0).reduce(new Tuple5Reduce());
+                               groupBy(4, 0).reduce((in1, in2) -> {
+                                       Tuple5<Integer, Long, Integer, String, 
Long> out = new Tuple5<>();
+                                       out.setFields(in1.f0, in1.f1 + in2.f1, 
0, "P-)", in1.f4);
+                                       return out;
+                               });
 
                List<Tuple5<Integer, Long, Integer, String, Long>> result = 
reduceDs
                                .collect();

http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 874f72a..65cfacb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,6 @@ under the License.
                <module>flink-shaded-curator</module>
                <module>flink-core</module>
                <module>flink-java</module>
-               <module>flink-java8</module>
                <module>flink-scala</module>
                <module>flink-filesystems</module>
                <module>flink-runtime</module>

Reply via email to