Repository: flink Updated Branches: refs/heads/release-1.6 402745eba -> 4862101dd
http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java index 8f8016e..f5d07de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,9 +16,11 @@ * limitations under the License. */ - package org.apache.flink.runtime.util; +import org.apache.flink.runtime.util.jartestprogram.FilterWithIndirection; +import org.apache.flink.runtime.util.jartestprogram.FilterWithLambda; +import org.apache.flink.runtime.util.jartestprogram.FilterWithMethodReference; import org.apache.flink.runtime.util.jartestprogram.WordCountWithAnonymousClass; import org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass; import org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass2; @@ -28,6 +30,7 @@ import org.apache.flink.runtime.util.jartestprogram.AnonymousInNonStaticMethod; import org.apache.flink.runtime.util.jartestprogram.AnonymousInNonStaticMethod2; import org.apache.flink.runtime.util.jartestprogram.NestedAnonymousInnerClass; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import java.io.File; import java.io.FileInputStream; @@ -37,7 +40,6 @@ import java.util.Set; import java.util.jar.JarInputStream; import java.util.zip.ZipEntry; - public class JarFileCreatorTest { //anonymous inner class in static method accessing a local variable in its closure. @@ -48,14 +50,14 @@ public class JarFileCreatorTest { jfc.addClass(AnonymousInStaticMethod.class) .createJarFile(); - Set<String> ans = new HashSet<String>(); + Set<String> ans = new HashSet<>(); ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod$1.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod$A.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod.class"); Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out)); - out.delete(); + Assert.assertTrue(out.delete()); } //anonymous inner class in non static method accessing a local variable in its closure. @@ -66,14 +68,14 @@ public class JarFileCreatorTest { jfc.addClass(AnonymousInNonStaticMethod.class) .createJarFile(); - Set<String> ans = new HashSet<String>(); + Set<String> ans = new HashSet<>(); ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod$1.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod$A.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod.class"); Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out)); - out.delete(); + Assert.assertTrue(out.delete()); } //anonymous inner class in non static method accessing a field of its enclosing class. @@ -84,14 +86,14 @@ public class JarFileCreatorTest { jfc.addClass(AnonymousInNonStaticMethod2.class) .createJarFile(); - Set<String> ans = new HashSet<String>(); + Set<String> ans = new HashSet<>(); ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2$1.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2$A.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2.class"); Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out)); - out.delete(); + Assert.assertTrue(out.delete()); } //anonymous inner class in an anonymous inner class accessing a field of the outermost enclosing class. @@ -102,7 +104,7 @@ public class JarFileCreatorTest { jfc.addClass(NestedAnonymousInnerClass.class) .createJarFile(); - Set<String> ans = new HashSet<String>(); + Set<String> ans = new HashSet<>(); ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1$1.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1.class"); @@ -110,7 +112,54 @@ public class JarFileCreatorTest { Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out)); - out.delete(); + Assert.assertTrue(out.delete()); + } + + @Ignore // this is currently not supported (see FLINK-9520) + @Test + public void testFilterWithMethodReference() throws Exception { + File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); + JarFileCreator jfc = new JarFileCreator(out); + jfc.addClass(FilterWithMethodReference.class) + .createJarFile(); + + Set<String> ans = new HashSet<>(); + ans.add("org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class"); + + Assert.assertTrue("Jar file for Java 8 method reference is not correct", validate(ans, out)); + Assert.assertTrue(out.delete()); + } + + @Test + public void testFilterWithLambda() throws Exception{ + File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); + JarFileCreator jfc = new JarFileCreator(out); + jfc.addClass(FilterWithLambda.class) + .createJarFile(); + + Set<String> ans = new HashSet<>(); + ans.add("org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class"); + + Assert.assertTrue("Jar file for Java 8 lambda is not correct", validate(ans, out)); + Assert.assertTrue(out.delete()); + } + + @Test + public void testFilterWithIndirection() throws Exception { + File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); + JarFileCreator jfc = new JarFileCreator(out); + jfc.addClass(FilterWithIndirection.class) + .createJarFile(); + + Set<String> ans = new HashSet<>(); + ans.add("org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class"); + ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper$UtilFunction.class"); + + Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out)); + Assert.assertTrue(out.delete()); } //---------------------------------------------------------------------------------------------- @@ -123,14 +172,14 @@ public class JarFileCreatorTest { jfc.addClass(WordCountWithExternalClass.class) .createJarFile(); - Set<String> ans = new HashSet<String>(); + Set<String> ans = new HashSet<>(); ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.class"); Assert.assertTrue("Jar file for External Class is not correct", validate(ans, out)); - out.delete(); + Assert.assertTrue(out.delete()); } @Test @@ -140,14 +189,14 @@ public class JarFileCreatorTest { jfc.addClass(WordCountWithInnerClass.class) .createJarFile(); - Set<String> ans = new HashSet<String>(); + Set<String> ans = new HashSet<>(); ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class"); Assert.assertTrue("Jar file for Inner Class is not correct", validate(ans, out)); - out.delete(); + Assert.assertTrue(out.delete()); } @Test @@ -157,14 +206,14 @@ public class JarFileCreatorTest { jfc.addClass(WordCountWithAnonymousClass.class) .createJarFile(); - Set<String> ans = new HashSet<String>(); + Set<String> ans = new HashSet<>(); ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass$1.class"); Assert.assertTrue("Jar file for Anonymous Class is not correct", validate(ans, out)); - out.delete(); + Assert.assertTrue(out.delete()); } @Test @@ -174,7 +223,7 @@ public class JarFileCreatorTest { jfc.addClass(WordCountWithExternalClass2.class) .createJarFile(); - Set<String> ans = new HashSet<String>(); + Set<String> ans = new HashSet<>(); ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.class"); @@ -182,7 +231,7 @@ public class JarFileCreatorTest { Assert.assertTrue("Jar file for Extend Identifier is not correct", validate(ans, out)); - out.delete(); + Assert.assertTrue(out.delete()); } @Test @@ -193,7 +242,7 @@ public class JarFileCreatorTest { .addPackage("org.apache.flink.util") .createJarFile(); - Set<String> ans = new HashSet<String>(); + Set<String> ans = new HashSet<>(); ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class"); ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class"); @@ -201,7 +250,7 @@ public class JarFileCreatorTest { Assert.assertTrue("Jar file for UDF package is not correct", validate(ans, out)); - out.delete(); + Assert.assertTrue(out.delete()); } private boolean validate(Set<String> expected, File out) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java new file mode 100644 index 0000000..12026e9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * Filter with additional indirections. + */ +public class FilterWithIndirection { + + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<String> input = env.fromElements("Please filter", "the words", "but not this"); + + DataSet<String> output = input.filter(UtilFunctionWrapper.UtilFunction.getWordFilter()); + output.print(); + + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java new file mode 100644 index 0000000..ffa5756 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * Filter with lambda that is directly passed to {@link DataSet#filter(FilterFunction)}. + */ +public class FilterWithLambda { + + @SuppressWarnings("Convert2MethodRef") + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<String> input = env.fromElements("Please filter", "the words", "but not this"); + + DataSet<String> output = input.filter((v) -> WordFilter.filter(v)); + output.print(); + + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java new file mode 100644 index 0000000..ddb76b4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * A lambda filter using a static method. + */ +public class FilterWithMethodReference { + + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<String> input = env.fromElements("Please filter", "the words", "but not this"); + + FilterFunction<String> filter = WordFilter::filter; + + DataSet<String> output = input.filter(filter); + output.print(); + + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java new file mode 100644 index 0000000..89fca0d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * Static factory for a lambda filter function. + */ +public class UtilFunction { + + @SuppressWarnings("Convert2MethodRef") + public static FilterFunction<String> getWordFilter() { + return (v) -> WordFilter.filter(v); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java new file mode 100644 index 0000000..498c4cf --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * A wrapper around {@link WordFilter} to introduce additional indirection. + */ +public class UtilFunctionWrapper { + + /** + * Static factory for a lambda filter function. + */ + public static class UtilFunction { + @SuppressWarnings("Convert2MethodRef") + public static FilterFunction<String> getWordFilter() { + return (v) -> WordFilter.filter(v); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java new file mode 100644 index 0000000..2d072cb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.jartestprogram; + +/** + * Static filter method for lambda tests. + */ +public class WordFilter { + + public static boolean filter(String value) { + return !value.contains("not"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 7fb3822..3f935e3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -521,7 +521,6 @@ public class AllWindowedStream<T, W extends Window> { AllWindowFunction.class, 0, 1, - new int[]{1, 0}, new int[]{2, 0}, inType, null, @@ -537,7 +536,6 @@ public class AllWindowedStream<T, W extends Window> { 0, 1, TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, inType, null, false); http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java index cb18a3f..a3e28ab 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java @@ -72,7 +72,6 @@ public class AsyncDataStream { AsyncFunction.class, 0, 1, - new int[]{0}, new int[]{1, 0}, in.getType(), Utils.getCallLocationName(), http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java index cb7d8c9..30047cb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java @@ -131,8 +131,6 @@ public class BroadcastConnectedStream<IN1, IN2> { 2, 3, TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), @@ -183,8 +181,6 @@ public class BroadcastConnectedStream<IN1, IN2> { 1, 2, TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index c2ebdf4..55009e1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.ArrayList; @@ -95,9 +96,24 @@ public class CoGroupedStreams<T1, T2> { /** * Specifies a {@link KeySelector} for elements from the first input. + * + * @param keySelector The KeySelector to be used for extracting the first input's key for partitioning. */ public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) { - TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); + Preconditions.checkNotNull(keySelector); + final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); + return where(keySelector, keyType); + } + + /** + * Specifies a {@link KeySelector} for elements from the first input with explicit type information. + * + * @param keySelector The KeySelector to be used for extracting the first input's key for partitioning. + * @param keyType The type information describing the key type. + */ + public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) { + Preconditions.checkNotNull(keySelector); + Preconditions.checkNotNull(keyType); return new Where<>(input1.clean(keySelector), keyType); } @@ -121,12 +137,28 @@ public class CoGroupedStreams<T1, T2> { /** * Specifies a {@link KeySelector} for elements from the second input. + * + * @param keySelector The KeySelector to be used for extracting the second input's key for partitioning. */ public EqualTo equalTo(KeySelector<T2, KEY> keySelector) { - TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); - if (!otherKey.equals(this.keyType)) { + Preconditions.checkNotNull(keySelector); + final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); + return equalTo(keySelector, otherKey); + } + + /** + * Specifies a {@link KeySelector} for elements from the second input with explicit type information for the key type. + * + * @param keySelector The KeySelector to be used for extracting the key for partitioning. + * @param keyType The type information describing the key type. + */ + public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType) { + Preconditions.checkNotNull(keySelector); + Preconditions.checkNotNull(keyType); + + if (!keyType.equals(this.keyType)) { throw new IllegalArgumentException("The keys for the two inputs are not equal: " + - "first key = " + this.keyType + " , second key = " + otherKey); + "first key = " + this.keyType + " , second key = " + keyType); } return new EqualTo(input2.clean(keySelector)); http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java index e244bd2..0ada54a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java @@ -192,6 +192,28 @@ public class ConnectedStreams<IN1, IN2> { } /** + * KeyBy operation for connected data stream. Assigns keys to the elements of + * input1 and input2 using keySelector1 and keySelector2 with explicit type information + * for the common key type. + * + * @param keySelector1 + * The {@link KeySelector} used for grouping the first input + * @param keySelector2 + * The {@link KeySelector} used for grouping the second input + * @param keyType The type information of the common key type. + * @return The partitioned {@link ConnectedStreams} + */ + public <KEY> ConnectedStreams<IN1, IN2> keyBy( + KeySelector<IN1, KEY> keySelector1, + KeySelector<IN2, KEY> keySelector2, + TypeInformation<KEY> keyType) { + return new ConnectedStreams<>( + environment, + inputStream1.keyBy(keySelector1, keyType), + inputStream2.keyBy(keySelector2, keyType)); + } + + /** * Applies a CoMap transformation on a {@link ConnectedStreams} and maps * the output to a common type. The transformation calls a * {@link CoMapFunction#map1} for each element of the first input and @@ -210,8 +232,6 @@ public class ConnectedStreams<IN1, IN2> { 1, 2, TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), @@ -244,8 +264,6 @@ public class ConnectedStreams<IN1, IN2> { 1, 2, TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), @@ -281,8 +299,6 @@ public class ConnectedStreams<IN1, IN2> { 1, 2, TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 78ba8e4..8e24ad7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -286,10 +286,25 @@ public class DataStream<T> { * @return The {@link DataStream} with partitioned state (i.e. KeyedStream) */ public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) { + Preconditions.checkNotNull(key); return new KeyedStream<>(this, clean(key)); } /** + * It creates a new {@link KeyedStream} that uses the provided key with explicit type information + * for partitioning its operator states. + * + * @param key The KeySelector to be used for extracting the key for partitioning. + * @param keyType The type information describing the key type. + * @return The {@link DataStream} with partitioned state (i.e. KeyedStream) + */ + public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(keyType); + return new KeyedStream<>(this, clean(key), keyType); + } + + /** * Partitions the operator state of a {@link DataStream} by the given key positions. * * @param fields @@ -621,7 +636,6 @@ public class DataStream<T> { 0, 1, TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, getType(), Utils.getCallLocationName(), true); http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java index f614ab0..3d22275 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java @@ -206,5 +206,9 @@ public class IterativeStream<T> extends SingleOutputStreamOperator<T> { throw groupingException; } + @Override + public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) { + throw groupingException; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java index 088fab9..bb67c09 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java @@ -79,9 +79,24 @@ public class JoinedStreams<T1, T2> { /** * Specifies a {@link KeySelector} for elements from the first input. + * + * @param keySelector The KeySelector to be used for extracting the key for partitioning. */ public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) { - TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); + requireNonNull(keySelector); + final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); + return where(keySelector, keyType); + } + + /** + * Specifies a {@link KeySelector} for elements from the first input with explicit type information for the key type. + * + * @param keySelector The KeySelector to be used for extracting the first input's key for partitioning. + * @param keyType The type information describing the key type. + */ + public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) { + requireNonNull(keySelector); + requireNonNull(keyType); return new Where<>(input1.clean(keySelector), keyType); } @@ -105,12 +120,28 @@ public class JoinedStreams<T1, T2> { /** * Specifies a {@link KeySelector} for elements from the second input. + * + * @param keySelector The KeySelector to be used for extracting the second input's key for partitioning. */ public EqualTo equalTo(KeySelector<T2, KEY> keySelector) { - TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); - if (!otherKey.equals(this.keyType)) { + requireNonNull(keySelector); + final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); + return equalTo(keySelector, otherKey); + } + + /** + * Specifies a {@link KeySelector} for elements from the second input with explicit type information for the key type. + * + * @param keySelector The KeySelector to be used for extracting the second input's key for partitioning. + * @param keyType The type information describing the key type. + */ + public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType) { + requireNonNull(keySelector); + requireNonNull(keyType); + + if (!keyType.equals(this.keyType)) { throw new IllegalArgumentException("The keys for the two inputs are not equal: " + - "first key = " + this.keyType + " , second key = " + otherKey); + "first key = " + this.keyType + " , second key = " + keyType); } return new EqualTo(input2.clean(keySelector)); @@ -226,8 +257,6 @@ public class JoinedStreams<T1, T2> { 0, 1, 2, - new int[]{0}, - new int[]{1}, TypeExtractor.NO_INDEX, input1.getType(), input2.getType(), @@ -309,8 +338,6 @@ public class JoinedStreams<T1, T2> { 0, 1, 2, - new int[]{0}, - new int[]{1}, new int[]{2, 0}, input1.getType(), input2.getType(), http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 32a5c96..84df716 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -70,6 +70,7 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.util.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -305,7 +306,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> { 0, 1, TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, getType(), Utils.getCallLocationName(), true); @@ -366,7 +366,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> { 1, 2, TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, getType(), Utils.getCallLocationName(), true); @@ -480,8 +479,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> { @PublicEvolving public static class IntervalJoined<IN1, IN2, KEY> { - private static final String INTERVAL_JOIN_FUNC_NAME = "IntervalJoin"; - private final KeyedStream<IN1, KEY> left; private final KeyedStream<IN2, KEY> right; @@ -534,33 +531,52 @@ public class KeyedStream<T, KEY> extends DataStream<T> { } /** - * Completes the join operation with the user function that is executed for each joined pair + * Completes the join operation with the given user function that is executed for each joined pair * of elements. - * @param udf The user-defined function - * @param <OUT> The output type - * @return Returns a DataStream + * + * @param processJoinFunction The user-defined process join function. + * @param <OUT> The output type. + * @return The transformed {@link DataStream}. */ @PublicEvolving - public <OUT> DataStream<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> udf) { - - ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(udf); + public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) { + Preconditions.checkNotNull(processJoinFunction); - TypeInformation<OUT> resultType = TypeExtractor.getBinaryOperatorReturnType( - cleanedUdf, - ProcessJoinFunction.class, // ProcessJoinFunction<IN1, IN2, OUT> - 0, // 0 1 2 + final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType( + processJoinFunction, + ProcessJoinFunction.class, + 0, 1, 2, - new int[]{0}, // lambda input 1 type arg indices - new int[]{1}, // lambda input 1 type arg indices - TypeExtractor.NO_INDEX, // output arg indices - left.getType(), // input 1 type information - right.getType(), // input 2 type information - INTERVAL_JOIN_FUNC_NAME , - false + TypeExtractor.NO_INDEX, + left.getType(), + right.getType(), + Utils.getCallLocationName(), + true ); - IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = + return process(processJoinFunction, outputType); + } + + /** + * Completes the join operation with the given user function that is executed for each joined pair + * of elements. This methods allows for passing explicit type information for the output type. + * + * @param processJoinFunction The user-defined process join function. + * @param outputType The type information for the output type. + * @param <OUT> The output type. + * @return The transformed {@link DataStream}. + */ + @PublicEvolving + public <OUT> SingleOutputStreamOperator<OUT> process( + ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, + TypeInformation<OUT> outputType) { + Preconditions.checkNotNull(processJoinFunction); + Preconditions.checkNotNull(outputType); + + final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction); + + final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new IntervalJoinOperator<>( lowerBound, upperBound, @@ -574,8 +590,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { return left .connect(right) .keyBy(keySelector1, keySelector2) - .transform(INTERVAL_JOIN_FUNC_NAME , resultType, operator); - + .transform("Interval Join", outputType, operator); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 50c6f1a..1f09b73 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -896,7 +896,6 @@ public class WindowedStream<T, K, W extends Window> { WindowFunction.class, 0, 1, - new int[]{2, 0}, new int[]{3, 0}, inType, null, @@ -913,7 +912,6 @@ public class WindowedStream<T, K, W extends Window> { 0, 1, TypeExtractor.NO_INDEX, - TypeExtractor.NO_INDEX, inType, functionName, false); http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java index 5baa980..cfb5adc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java @@ -21,12 +21,16 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import org.junit.Test; @@ -35,7 +39,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; /** - * Tests for {@link TypeFill}. + * Tests for handling missing type information either by calling {@code returns()} or having an + * explicit type information parameter. */ @SuppressWarnings("serial") public class TypeFillTest { @@ -43,6 +48,7 @@ public class TypeFillTest { @Test public void test() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); try { env.addSource(new TestSource<Integer>()).print(); @@ -71,12 +77,53 @@ public class TypeFillTest { fail(); } catch (Exception ignored) {} + try { + source.keyBy(new TestKeySelector<Long, String>()).print(); + fail(); + } catch (Exception ignored) {} + + try { + source.connect(source).keyBy(new TestKeySelector<Long, String>(), new TestKeySelector<>()); + fail(); + } catch (Exception ignored) {} + + try { + source.coGroup(source).where(new TestKeySelector<>()).equalTo(new TestKeySelector<>()); + fail(); + } catch (Exception ignored) {} + + try { + source.join(source).where(new TestKeySelector<>()).equalTo(new TestKeySelector<>()); + fail(); + } catch (Exception ignored) {} + + try { + source.keyBy((in) -> in) + .intervalJoin(source.keyBy((in) -> in)) + .between(Time.milliseconds(10L), Time.milliseconds(10L)) + .process(new TestProcessJoinFunction<>()) + .print(); + fail(); + } catch (Exception ignored) {} + env.addSource(new TestSource<Integer>()).returns(Integer.class); source.map(new TestMap<Long, Long>()).returns(Long.class).print(); source.flatMap(new TestFlatMap<Long, Long>()).returns(new TypeHint<Long>(){}).print(); source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns(BasicTypeInfo.INT_TYPE_INFO).print(); source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>()) .returns(BasicTypeInfo.INT_TYPE_INFO).print(); + source.connect(source).keyBy(new TestKeySelector<>(), new TestKeySelector<>(), Types.STRING); + source.coGroup(source).where(new TestKeySelector<>(), Types.STRING).equalTo(new TestKeySelector<>(), Types.STRING); + source.join(source).where(new TestKeySelector<>(), Types.STRING).equalTo(new TestKeySelector<>(), Types.STRING); + source.keyBy((in) -> in) + .intervalJoin(source.keyBy((in) -> in)) + .between(Time.milliseconds(10L), Time.milliseconds(10L)) + .process(new TestProcessJoinFunction<Long, Long, String>()) + .returns(Types.STRING); + source.keyBy((in) -> in) + .intervalJoin(source.keyBy((in) -> in)) + .between(Time.milliseconds(10L), Time.milliseconds(10L)) + .process(new TestProcessJoinFunction<>(), Types.STRING); assertEquals(BasicTypeInfo.LONG_TYPE_INFO, source.map(new TestMap<Long, Long>()).returns(Long.class).getType()); @@ -142,4 +189,20 @@ public class TypeFillTest { public void flatMap2(IN2 value, Collector<OUT> out) throws Exception {} } + + private static class TestKeySelector<IN, KEY> implements KeySelector<IN, KEY> { + + @Override + public KEY getKey(IN value) throws Exception { + return null; + } + } + + private static class TestProcessJoinFunction<IN1, IN2, OUT> extends ProcessJoinFunction<IN1, IN2, OUT> { + + @Override + public void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception { + // nothing to do + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java index 453f525..21583fd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java @@ -669,6 +669,39 @@ public class CoGroupITCase extends MultipleProgramsTestBase { compareResultAsTuples(result, expected); } + @Test + public void testCoGroupLambda() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Integer, String>> left = env.fromElements( + new Tuple2<>(1, "hello"), + new Tuple2<>(2, "what's"), + new Tuple2<>(2, "up") + ); + DataSet<Tuple2<Integer, String>> right = env.fromElements( + new Tuple2<>(1, "not"), + new Tuple2<>(1, "much"), + new Tuple2<>(2, "really") + ); + DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0) + .with((Iterable<Tuple2<Integer, String>> values1, Iterable<Tuple2<Integer, String>> values2, + Collector<Integer> out) -> { + int sum = 0; + for (Tuple2<Integer, String> next : values1) { + sum += next.f0; + } + for (Tuple2<Integer, String> next : values2) { + sum += next.f0; + } + out.collect(sum); + }).returns(Integer.class); + List<Integer> result = joined.collect(); + + String expected = "6\n3\n"; + + compareResultAsText(result, expected); + } + // -------------------------------------------------------------------------------------------- // UDF classes // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java index dfb3efb..f145555 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java @@ -515,4 +515,38 @@ public class MapITCase extends MultipleProgramsTestBase { return value; } } + + @Test + public void testMapWithLambdas() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14); + DataSet<String> mappedDs = stringDs + .map(Object::toString) + .map(s -> s.replace("1", "2")) + .map(Trade::new) + .map(Trade::toString); + List<String> result = mappedDs.collect(); + + String expected = "22\n" + + "22\n" + + "23\n" + + "24\n"; + + compareResultAsText(result, expected); + } + + private static class Trade { + + public String v; + + public Trade(String v) { + this.v = v; + } + + @Override + public String toString() { + return v; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java index 2d6897b..750769c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java @@ -90,7 +90,11 @@ public class ReduceITCase extends MultipleProgramsTestBase { DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds. - groupBy(4, 0).reduce(new Tuple5Reduce()); + groupBy(4, 0).reduce((in1, in2) -> { + Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<>(); + out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4); + return out; + }); List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs .collect(); http://git-wip-us.apache.org/repos/asf/flink/blob/4862101d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 874f72a..65cfacb 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,6 @@ under the License. <module>flink-shaded-curator</module> <module>flink-core</module> <module>flink-java</module> - <module>flink-java8</module> <module>flink-scala</module> <module>flink-filesystems</module> <module>flink-runtime</module>
