http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java deleted file mode 100644 index b9dba77..0000000 --- a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.java8.wordcount; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.Collector; - -import java.util.Arrays; - -/** - * Implements the streaming "WordCount" program that computes a simple word occurrences - * over text files. - * - * <p>The input is a plain text file with lines separated by newline characters. - * - * <p>Usage: <code>WordCount <text path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * - * <p>This example shows how to: - * <ul> - * <li>write a compact Flink Streaming program with Java 8 Lambda Expressions. - * </ul> - * - */ -public class WordCount { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // get input data - DataStream<String> text = getTextDataStream(env); - - DataStream<Tuple2<String, Integer>> counts = - // normalize and split each line - text.map(line -> line.toLowerCase().split("\\W+")) - // convert split line in pairs (2-tuples) containing: (word,1) - .flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> { - // emit the pairs with non-zero-length words - Arrays.stream(tokens) - .filter(t -> t.length() > 0) - .forEach(t -> out.collect(new Tuple2<>(t, 1))); - }) - // group by the tuple field "0" and sum up tuple field "1" - .keyBy(0) - .sum(1); - - // emit result - if (fileOutput) { - counts.writeAsCsv(outputPath); - } else { - counts.print(); - } - - // execute program - env.execute("Streaming WordCount Example"); - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: WordCount <text path> <result path>"); - return false; - } - } else { - System.out.println("Executing WordCount example with built-in default data."); - System.out.println(" Provide parameters to read input data from a file."); - System.out.println(" Usage: WordCount <text path> <result path>"); - } - return true; - } - - private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) { - if (fileOutput) { - // read the text file from given input path - return env.readTextFile(textPath); - } else { - // get default test text data - return env.fromElements(WordCountData.WORDS); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java deleted file mode 100644 index de1f395..0000000 --- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java +++ /dev/null @@ -1,383 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.type.lambdas; - -import org.apache.flink.api.common.functions.CoGroupFunction; -import org.apache.flink.api.common.functions.CrossFunction; -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.MapPartitionFunction; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.MissingTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractionUtils; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -import org.junit.Assert; -import org.junit.Test; - -import java.lang.reflect.Method; - -import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -/** - * Tests the type extractor for lambda functions. - */ -@SuppressWarnings("serial") -public class LambdaExtractionTest { - - private static final TypeInformation<Tuple2<Tuple1<Integer>, Boolean>> NESTED_TUPLE_BOOLEAN_TYPE = - new TypeHint<Tuple2<Tuple1<Integer>, Boolean>>(){}.getTypeInfo(); - - private static final TypeInformation<Tuple2<Tuple1<Integer>, Double>> NESTED_TUPLE_DOUBLE_TYPE = - new TypeHint<Tuple2<Tuple1<Integer>, Double>>(){}.getTypeInfo(); - - @Test - public void testIdentifyLambdas() { - try { - MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() { - @Override - public Integer map(String value) { - return Integer.parseInt(value); - } - }; - - MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() { - @Override - public Integer map(String value) { - return Integer.parseInt(value); - } - }; - - MapFunction<?, ?> fromProperClass = new StaticMapper(); - - MapFunction<?, ?> fromDerived = new ToTuple<Integer>() { - @Override - public Tuple2<Integer, Long> map(Integer value) { - return new Tuple2<>(value, 1L); - } - }; - - MapFunction<String, Integer> staticLambda = Integer::parseInt; - MapFunction<Integer, String> instanceLambda = Object::toString; - MapFunction<String, Integer> constructorLambda = Integer::new; - - assertNull(checkAndExtractLambda(anonymousFromInterface)); - assertNull(checkAndExtractLambda(anonymousFromClass)); - assertNull(checkAndExtractLambda(fromProperClass)); - assertNull(checkAndExtractLambda(fromDerived)); - assertNotNull(checkAndExtractLambda(staticLambda)); - assertNotNull(checkAndExtractLambda(instanceLambda)); - assertNotNull(checkAndExtractLambda(constructorLambda)); - assertNotNull(checkAndExtractLambda(STATIC_LAMBDA)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - private static class StaticMapper implements MapFunction<String, Integer> { - @Override - public Integer map(String value) { - return Integer.parseInt(value); - } - } - - private interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> { - @Override - Tuple2<T, Long> map(T value) throws Exception; - } - - private static final MapFunction<String, Integer> STATIC_LAMBDA = Integer::parseInt; - - private static class MyClass { - private String s = "mystring"; - - public MapFunction<Integer, String> getMapFunction() { - return (i) -> s; - } - } - - @Test - public void testLambdaWithMemberVariable() { - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), Types.INT); - Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO); - } - - @Test - public void testLambdaWithLocalVariable() { - String s = "mystring"; - final int k = 24; - int j = 26; - - MapFunction<Integer, String> f = (i) -> s + k + j; - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, Types.INT); - Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO); - } - - @Test - public void testMapLambda() { - MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null; - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE); - if (!(ti instanceof MissingTypeInfo)) { - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); - } - } - - @Test - public void testFlatMapLambda() { - FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {}; - - TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE); - if (!(ti instanceof MissingTypeInfo)) { - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); - } - } - - @Test - public void testMapPartitionLambda() { - MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {}; - - TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE); - if (!(ti instanceof MissingTypeInfo)) { - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); - } - } - - @Test - public void testGroupReduceLambda() { - GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {}; - - TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE); - if (!(ti instanceof MissingTypeInfo)) { - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); - } - } - - @Test - public void testFlatJoinLambda() { - FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {}; - - TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE); - if (!(ti instanceof MissingTypeInfo)) { - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); - } - } - - @Test - public void testJoinLambda() { - JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null; - - TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE); - if (!(ti instanceof MissingTypeInfo)) { - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); - } - } - - @Test - public void testCoGroupLambda() { - CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {}; - - TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE); - if (!(ti instanceof MissingTypeInfo)) { - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); - } - } - - @Test - public void testCrossLambda() { - CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null; - - TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE); - if (!(ti instanceof MissingTypeInfo)) { - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); - } - } - - @Test - public void testKeySelectorLambda() { - KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null; - - TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, NESTED_TUPLE_BOOLEAN_TYPE); - if (!(ti instanceof MissingTypeInfo)) { - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); - } - } - - @SuppressWarnings("rawtypes") - @Test - public void testLambdaTypeErasure() { - MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null; - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true); - Assert.assertTrue(ti instanceof MissingTypeInfo); - } - - @Test - public void testPartitionerLambda() { - Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions; - final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner); - - Assert.assertTrue(ti.isTupleType()); - Assert.assertEquals(2, ti.getArity()); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO); - Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO); - - } - - private static class MyType { - private int key; - - public int getKey() { - return key; - } - - public void setKey(int key) { - this.key = key; - } - - protected int getKey2() { - return 0; - } - } - - @Test - public void testInstanceMethodRefSameType() { - MapFunction<MyType, Integer> f = MyType::getKey; - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MyType.class)); - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); - } - - @Test - public void testInstanceMethodRefSuperType() { - MapFunction<Integer, String> f = Object::toString; - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.INT_TYPE_INFO); - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti); - } - - private static class MySubtype extends MyType { - public boolean test; - } - - @Test - public void testInstanceMethodRefSuperTypeProtected() { - MapFunction<MySubtype, Integer> f = MyType::getKey2; - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MySubtype.class)); - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); - } - - @Test - public void testConstructorMethodRef() { - MapFunction<String, Integer> f = Integer::new; - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.STRING_TYPE_INFO); - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); - } - - private interface InterfaceWithDefaultMethod { - void samMethod(); - - default void defaultMethod() { - - } - } - - @Test - public void testSamMethodExtractionInterfaceWithDefaultMethod() { - final Method sam = TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithDefaultMethod.class); - assertNotNull(sam); - assertEquals("samMethod", sam.getName()); - } - - private interface InterfaceWithMultipleMethods { - void firstMethod(); - - void secondMethod(); - } - - @Test(expected = InvalidTypesException.class) - public void getSingleAbstractMethodMultipleMethods() throws Exception { - TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithMultipleMethods.class); - } - - private interface InterfaceWithoutAbstractMethod { - default void defaultMethod() { - - } - } - - @Test(expected = InvalidTypesException.class) - public void getSingleAbstractMethodNoAbstractMethods() throws Exception { - TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithoutAbstractMethod.class); - } - - private abstract class AbstractClassWithSingleAbstractMethod { - public abstract void defaultMethod(); - } - - @Test(expected = InvalidTypesException.class) - public void getSingleAbstractMethodNotAnInterface() throws Exception { - TypeExtractionUtils.getSingleAbstractMethod(AbstractClassWithSingleAbstractMethod.class); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java deleted file mode 100644 index 7cbdf6a..0000000 --- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.cep.pattern.Pattern; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.transformations.SourceTransformation; -import org.apache.flink.util.Collector; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -/** - * Tests for lambda support in CEP. - */ -public class CEPLambdaTest extends TestLogger { - /** - * Test event class. - */ - public static class EventA {} - - /** - * Test event class. - */ - public static class EventB {} - - /** - * Tests that a Java8 lambda can be passed as a CEP select function. - */ - @Test - public void testLambdaSelectFunction() { - TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class); - TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class); - - DataStream<EventA> inputStream = new DataStream<>( - StreamExecutionEnvironment.getExecutionEnvironment(), - new SourceTransformation<>( - "source", - null, - eventTypeInformation, - 1)); - - Pattern<EventA, ?> dummyPattern = Pattern.begin("start"); - - PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern); - - DataStream<EventB> result = patternStream.select( - (Map<String, List<EventA>> map) -> new EventB() - ); - - assertEquals(outputTypeInformation, result.getType()); - } - - /** - * Tests that a Java8 lambda can be passed as a CEP flat select function. - */ - @Test - public void testLambdaFlatSelectFunction() { - TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class); - TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class); - - DataStream<EventA> inputStream = new DataStream<>( - StreamExecutionEnvironment.getExecutionEnvironment(), - new SourceTransformation<>( - "source", - null, - eventTypeInformation, - 1)); - - Pattern<EventA, ?> dummyPattern = Pattern.begin("start"); - - PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern); - - DataStream<EventB> result = patternStream.flatSelect( - (Map<String, List<EventA>> map, Collector<EventB> collector) -> collector.collect(new EventB()) - ); - - assertEquals(outputTypeInformation, result.getType()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java deleted file mode 100644 index ca11275..0000000 --- a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import org.apache.flink.runtime.util.jartestprogram.FilterLambda1; -import org.apache.flink.runtime.util.jartestprogram.FilterLambda2; -import org.apache.flink.runtime.util.jartestprogram.FilterLambda3; -import org.apache.flink.runtime.util.jartestprogram.FilterLambda4; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.File; -import java.io.FileInputStream; -import java.util.HashSet; -import java.util.Set; -import java.util.jar.JarInputStream; -import java.util.zip.ZipEntry; - -/** - * Tests for the {@link JarFileCreator}. - */ -public class JarFileCreatorLambdaTest { - @Test - public void testFilterFunctionOnLambda1() throws Exception { - File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); - JarFileCreator jfc = new JarFileCreator(out); - jfc.addClass(FilterLambda1.class) - .createJarFile(); - - Set<String> ans = new HashSet<String>(); - ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda1.class"); - ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class"); - - Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out)); - out.delete(); - } - - @Test - public void testFilterFunctionOnLambda2() throws Exception{ - File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); - JarFileCreator jfc = new JarFileCreator(out); - jfc.addClass(FilterLambda2.class) - .createJarFile(); - - Set<String> ans = new HashSet<String>(); - ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda2.class"); - ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class"); - - Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out)); - out.delete(); - } - - @Test - public void testFilterFunctionOnLambda3() throws Exception { - File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); - JarFileCreator jfc = new JarFileCreator(out); - jfc.addClass(FilterLambda3.class) - .createJarFile(); - - Set<String> ans = new HashSet<String>(); - ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda3.class"); - ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class"); - ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunction.class"); - - Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out)); - out.delete(); - } - - @Test - public void testFilterFunctionOnLambda4() throws Exception { - File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); - JarFileCreator jfc = new JarFileCreator(out); - jfc.addClass(FilterLambda4.class) - .createJarFile(); - - Set<String> ans = new HashSet<String>(); - ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda4.class"); - ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class"); - ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper$UtilFunction.class"); - - Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out)); - out.delete(); - } - - public boolean validate(Set<String> expected, File out) throws Exception { - int count = expected.size(); - try (JarInputStream jis = new JarInputStream(new FileInputStream(out))) { - ZipEntry ze; - while ((ze = jis.getNextEntry()) != null) { - count--; - expected.remove(ze.getName()); - } - } - return count == 0 && expected.size() == 0; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java deleted file mode 100644 index 12abff9..0000000 --- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util.jartestprogram; - -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * A lambda filter using a static method. - */ -public class FilterLambda1 { - - public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<String> input = env.fromElements("Please filter", "the words", "but not this"); - - FilterFunction<String> filter = (v) -> WordFilter.filter(v); - - DataSet<String> output = input.filter(filter); - output.print(); - - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java deleted file mode 100644 index 9555607..0000000 --- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util.jartestprogram; - -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * Similar to {@link FilterLambda1}, but the filter lambda is directly passed to {@link DataSet#filter(FilterFunction)}. - */ -public class FilterLambda2 { - - public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<String> input = env.fromElements("Please filter", "the words", "but not this"); - - DataSet<String> output = input.filter((v) -> WordFilter.filter(v)); - output.print(); - - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java deleted file mode 100644 index b493722..0000000 --- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util.jartestprogram; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * Similar to {@link FilterLambda2}, but uses a getter to retrieve a lambda filter instance. - */ -public class FilterLambda3 { - - public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<String> input = env.fromElements("Please filter", "the words", "but not this"); - - DataSet<String> output = input.filter(UtilFunction.getWordFilter()); - output.print(); - - env.execute(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java deleted file mode 100644 index 606ef5e..0000000 --- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util.jartestprogram; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * Similar to {@link FilterLambda3} with additional indirection. - */ -public class FilterLambda4 { - - public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<String> input = env.fromElements("Please filter", "the words", "but not this"); - - DataSet<String> output = input.filter(UtilFunctionWrapper.UtilFunction.getWordFilter()); - output.print(); - - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java deleted file mode 100644 index 1d5394a..0000000 --- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util.jartestprogram; - -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * Static factory for a lambda filter function. - */ -public class UtilFunction { - public static FilterFunction<String> getWordFilter() { - return (v) -> WordFilter.filter(v); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java deleted file mode 100644 index de8f68a..0000000 --- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util.jartestprogram; - -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * A wrapper around {@link WordFilter} to introduce additional indirection. - */ -public class UtilFunctionWrapper { - /** - * Static factory for a lambda filter function. - */ - public static class UtilFunction { - public static FilterFunction<String> getWordFilter() { - return (v) -> WordFilter.filter(v); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java deleted file mode 100644 index 4a5b16f..0000000 --- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util.jartestprogram; - -/** - * Static filter method for lambda tests. - */ -public class WordFilter { - public static boolean filter(String value) { - return !value.contains("not"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java deleted file mode 100644 index cee34af..0000000 --- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.api.java.operators.lambdas; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.test.util.JavaProgramTestBase; - -/** - * IT cases for lambda allreduce functions. - */ -public class AllGroupReduceITCase extends JavaProgramTestBase { - - private static final String EXPECTED_RESULT = "aaabacad\n"; - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad"); - DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> { - String conc = ""; - for (String s : values) { - conc = conc.concat(s); - } - out.collect(conc); - }); - concatDs.writeAsText(resultPath); - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java deleted file mode 100644 index a70f37a..0000000 --- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.api.java.operators.lambdas; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.test.util.JavaProgramTestBase; - -/** - * IT cases for lambda cogroup functions. - */ -public class CoGroupITCase extends JavaProgramTestBase { - - private static final String EXPECTED_RESULT = "6\n3\n"; - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @SuppressWarnings("unchecked") - @Override - protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, String>> left = env.fromElements( - new Tuple2<Integer, String>(1, "hello"), - new Tuple2<Integer, String>(2, "what's"), - new Tuple2<Integer, String>(2, "up") - ); - DataSet<Tuple2<Integer, String>> right = env.fromElements( - new Tuple2<Integer, String>(1, "not"), - new Tuple2<Integer, String>(1, "much"), - new Tuple2<Integer, String>(2, "really") - ); - DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0) - .with((values1, values2, out) -> { - int sum = 0; - for (Tuple2<Integer, String> next : values1) { - sum += next.f0; - } - for (Tuple2<Integer, String> next : values2) { - sum += next.f0; - } - out.collect(sum); - }); - joined.writeAsText(resultPath); - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java deleted file mode 100644 index 32cd910..0000000 --- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.api.java.operators.lambdas; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.test.util.JavaProgramTestBase; - -/** - * IT cases for lambda cross functions. - */ -public class CrossITCase extends JavaProgramTestBase { - - private static final String EXPECTED_RESULT = "2,hello not\n" + - "3,what's not\n" + - "3,up not\n" + - "2,hello much\n" + - "3,what's much\n" + - "3,up much\n" + - "3,hello really\n" + - "4,what's really\n" + - "4,up really"; - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @SuppressWarnings("unchecked") - @Override - protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, String>> left = env.fromElements( - new Tuple2<Integer, String>(1, "hello"), - new Tuple2<Integer, String>(2, "what's"), - new Tuple2<Integer, String>(2, "up") - ); - DataSet<Tuple2<Integer, String>> right = env.fromElements( - new Tuple2<Integer, String>(1, "not"), - new Tuple2<Integer, String>(1, "much"), - new Tuple2<Integer, String>(2, "really") - ); - DataSet<Tuple2<Integer, String>> joined = left.cross(right) - .with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 + " " + s.f1)); - joined.writeAsCsv(resultPath); - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java deleted file mode 100644 index 6ad1058..0000000 --- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.api.java.operators.lambdas; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * IT cases for lambda filter functions. - */ -public class FilterITCase extends JavaProgramTestBase { - - private static final String EXPECTED_RESULT = "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n"; - - public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) { - - List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>(); - data.add(new Tuple3<>(1, 1L, "Hi")); - data.add(new Tuple3<>(2, 2L, "Hello")); - data.add(new Tuple3<>(3, 2L, "Hello world")); - data.add(new Tuple3<>(4, 3L, "Hello world, how are you?")); - data.add(new Tuple3<>(5, 3L, "I am fine.")); - data.add(new Tuple3<>(6, 3L, "Luke Skywalker")); - data.add(new Tuple3<>(7, 4L, "Comment#1")); - data.add(new Tuple3<>(8, 4L, "Comment#2")); - data.add(new Tuple3<>(9, 4L, "Comment#3")); - data.add(new Tuple3<>(10, 4L, "Comment#4")); - data.add(new Tuple3<>(11, 5L, "Comment#5")); - data.add(new Tuple3<>(12, 5L, "Comment#6")); - data.add(new Tuple3<>(13, 5L, "Comment#7")); - data.add(new Tuple3<>(14, 5L, "Comment#8")); - data.add(new Tuple3<>(15, 5L, "Comment#9")); - data.add(new Tuple3<>(16, 6L, "Comment#10")); - data.add(new Tuple3<>(17, 6L, "Comment#11")); - data.add(new Tuple3<>(18, 6L, "Comment#12")); - data.add(new Tuple3<>(19, 6L, "Comment#13")); - data.add(new Tuple3<>(20, 6L, "Comment#14")); - data.add(new Tuple3<>(21, 6L, "Comment#15")); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> filterDs = ds. - filter(value -> value.f2.contains("world")); - filterDs.writeAsCsv(resultPath); - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java deleted file mode 100644 index f793450..0000000 --- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.api.java.operators.lambdas; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.test.util.JavaProgramTestBase; - -/** - * IT cases for lambda join functions. - */ -public class FlatJoinITCase extends JavaProgramTestBase { - - private static final String EXPECTED_RESULT = "2,what's really\n" + - "2,up really\n" + - "1,hello not\n" + - "1,hello much\n"; - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @SuppressWarnings("unchecked") - @Override - protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, String>> left = env.fromElements( - new Tuple2<Integer, String>(1, "hello"), - new Tuple2<Integer, String>(2, "what's"), - new Tuple2<Integer, String>(2, "up") - ); - DataSet<Tuple2<Integer, String>> right = env.fromElements( - new Tuple2<Integer, String>(1, "not"), - new Tuple2<Integer, String>(1, "much"), - new Tuple2<Integer, String>(2, "really") - ); - DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0) - .with((t, s, out) -> out.collect(new Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1))); - joined.writeAsCsv(resultPath); - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java deleted file mode 100644 index d395d7d..0000000 --- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.api.java.operators.lambdas; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.test.util.JavaProgramTestBase; - -/** - * IT cases for lambda flatmap functions. - */ -public class FlatMapITCase extends JavaProgramTestBase { - - private static final String EXPECTED_RESULT = "bb\n" + - "bb\n" + - "bc\n" + - "bd\n"; - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad"); - DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b"))); - flatMappedDs.writeAsText(resultPath); - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java deleted file mode 100644 index 53db541..0000000 --- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.api.java.operators.lambdas; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.test.util.JavaProgramTestBase; - -/** - * IT cases for lambda groupreduce functions. - */ -public class GroupReduceITCase extends JavaProgramTestBase { - - private static final String EXPECTED_RESULT = "abad\n" + - "aaac\n"; - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @SuppressWarnings("unchecked") - @Override - protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, String>> stringDs = env.fromElements( - new Tuple2<>(1, "aa"), - new Tuple2<>(2, "ab"), - new Tuple2<>(1, "ac"), - new Tuple2<>(2, "ad") - ); - DataSet<String> concatDs = stringDs - .groupBy(0) - .reduceGroup((values, out) -> { - String conc = ""; - for (Tuple2<Integer, String> next : values) { - conc = conc.concat(next.f1); - } - out.collect(conc); - }); - concatDs.writeAsText(resultPath); - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java deleted file mode 100644 index d86ea49..0000000 --- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.api.java.operators.lambdas; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.test.util.JavaProgramTestBase; - -/** - * IT cases for lambda join functions. - */ -public class JoinITCase extends JavaProgramTestBase { - - private static final String EXPECTED_RESULT = "2,what's really\n" + - "2,up really\n" + - "1,hello not\n" + - "1,hello much\n"; - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @SuppressWarnings("unchecked") - @Override - protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, String>> left = env.fromElements( - new Tuple2<Integer, String>(1, "hello"), - new Tuple2<Integer, String>(2, "what's"), - new Tuple2<Integer, String>(2, "up") - ); - DataSet<Tuple2<Integer, String>> right = env.fromElements( - new Tuple2<Integer, String>(1, "not"), - new Tuple2<Integer, String>(1, "much"), - new Tuple2<Integer, String>(2, "really") - ); - DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0) - .with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + s.f1)); - joined.writeAsCsv(resultPath); - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java deleted file mode 100644 index 15a9b9d..0000000 --- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.api.java.operators.lambdas; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.test.util.JavaProgramTestBase; - -/** - * IT cases for lambda map functions. - */ -public class MapITCase extends JavaProgramTestBase { - - private static class Trade { - - public String v; - - public Trade(String v) { - this.v = v; - } - - @Override - public String toString() { - return v; - } - } - - private static final String EXPECTED_RESULT = "22\n" + - "22\n" + - "23\n" + - "24\n"; - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14); - DataSet<String> mappedDs = stringDs - .map(Object::toString) - .map (s -> s.replace("1", "2")) - .map(Trade::new) - .map(Trade::toString); - mappedDs.writeAsText(resultPath); - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java deleted file mode 100644 index 712132c..0000000 --- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.api.java.operators.lambdas; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * IT cases for lambda reduce functions. - */ -public class ReduceITCase extends JavaProgramTestBase { - - private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" + - "2,3,2,Hallo Welt wie,1\n" + - "2,2,1,Hallo Welt,2\n" + - "3,9,0,P-),2\n" + - "3,6,5,BCD,3\n" + - "4,17,0,P-),1\n" + - "4,17,0,P-),2\n" + - "5,11,10,GHI,1\n" + - "5,29,0,P-),2\n" + - "5,25,0,P-),3\n"; - - public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) { - - List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>(); - data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L)); - data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L)); - data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L)); - data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L)); - data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L)); - data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L)); - data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L)); - data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L)); - data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L)); - data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L)); - data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L)); - data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L)); - data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L)); - data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L)); - data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L)); - - Collections.shuffle(data); - - TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new - TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>>( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds - .groupBy(4, 0) - .reduce((in1, in2) -> { - Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>(); - out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4); - return out; - }); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/resources/log4j-test.properties b/flink-java8/src/test/resources/log4j-test.properties deleted file mode 100644 index c977d4c..0000000 --- a/flink-java8/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,19 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java index 0e6c2fe..521665f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java @@ -104,8 +104,7 @@ public class PatternStream<T> { PatternSelectFunction.class, 0, 1, - new int[]{0, 1, 0}, - new int[]{}, + TypeExtractor.NO_INDEX, inputStream.getType(), null, false); @@ -173,8 +172,7 @@ public class PatternStream<T> { PatternSelectFunction.class, 0, 1, - new int[]{0, 1, 0}, - new int[]{}, + TypeExtractor.NO_INDEX, inputStream.getType(), null, false); @@ -259,8 +257,7 @@ public class PatternStream<T> { PatternSelectFunction.class, 0, 1, - new int[]{0, 1, 0}, - new int[]{}, + TypeExtractor.NO_INDEX, inputStream.getType(), null, false); @@ -270,8 +267,7 @@ public class PatternStream<T> { PatternTimeoutFunction.class, 0, 1, - new int[]{0, 1, 0}, - new int[]{}, + TypeExtractor.NO_INDEX, inputStream.getType(), null, false); @@ -314,7 +310,6 @@ public class PatternStream<T> { PatternFlatSelectFunction.class, 0, 1, - new int[] {0, 1, 0}, new int[] {1, 0}, inputStream.getType(), null, @@ -381,7 +376,6 @@ public class PatternStream<T> { PatternFlatSelectFunction.class, 0, 1, - new int[]{0, 1, 0}, new int[]{1, 0}, inputStream.getType(), null, @@ -465,7 +459,6 @@ public class PatternStream<T> { PatternFlatTimeoutFunction.class, 0, 1, - new int[]{0, 1, 0}, new int[]{2, 0}, inputStream.getType(), null, @@ -476,7 +469,6 @@ public class PatternStream<T> { PatternFlatSelectFunction.class, 0, 1, - new int[]{0, 1, 0}, new int[]{1, 0}, inputStream.getType(), null, http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 6d1013c..e397d31 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.cep; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.nfa.NFA; @@ -96,19 +97,15 @@ public class CEPITCase extends AbstractTestBase { } }); - DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() { + DataStream<String> result = CEP.pattern(input, pattern).flatSelect((p, o) -> { + StringBuilder builder = new StringBuilder(); - @Override - public String select(Map<String, List<Event>> pattern) { - StringBuilder builder = new StringBuilder(); + builder.append(p.get("start").get(0).getId()).append(",") + .append(p.get("middle").get(0).getId()).append(",") + .append(p.get("end").get(0).getId()); - builder.append(pattern.get("start").get(0).getId()).append(",") - .append(pattern.get("middle").get(0).getId()).append(",") - .append(pattern.get("end").get(0).getId()); - - return builder.toString(); - } - }); + o.collect(builder.toString()); + }, Types.STRING); List<String> resultList = new ArrayList<>(); @@ -170,18 +167,14 @@ public class CEPITCase extends AbstractTestBase { } }); - DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() { - - @Override - public String select(Map<String, List<Event>> pattern) { - StringBuilder builder = new StringBuilder(); + DataStream<String> result = CEP.pattern(input, pattern).select(p -> { + StringBuilder builder = new StringBuilder(); - builder.append(pattern.get("start").get(0).getId()).append(",") - .append(pattern.get("middle").get(0).getId()).append(",") - .append(pattern.get("end").get(0).getId()); + builder.append(p.get("start").get(0).getId()).append(",") + .append(p.get("middle").get(0).getId()).append(",") + .append(p.get("end").get(0).getId()); - return builder.toString(); - } + return builder.toString(); }); List<String> resultList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java index 6dcf766..95310b4 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java @@ -78,7 +78,6 @@ public class Translate { TranslateFunction.class, 0, 1, - new int[]{0}, new int[]{1}, oldType, null, @@ -162,7 +161,6 @@ public class Translate { TranslateFunction.class, 0, 1, - new int[] {0}, new int[] {1}, oldType, null, @@ -248,7 +246,6 @@ public class Translate { TranslateFunction.class, 0, 1, - new int[]{0}, new int[]{1}, oldType, null, @@ -332,7 +329,6 @@ public class Translate { TranslateFunction.class, 0, 1, - new int[]{0}, new int[]{1}, oldType, null, http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml ---------------------------------------------------------------------- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index 0ca6eb9..33399f8 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -156,25 +156,6 @@ under the License. <pluginManagement> <plugins> - <!-- If you want to use Java 8 Lambda Expressions uncomment the following lines --> - <!-- - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>${java.version}</source> - <target>${java.version}</target> - <compilerId>jdt</compilerId> - </configuration> - <dependencies> - <dependency> - <groupId>org.eclipse.tycho</groupId> - <artifactId>tycho-compiler-jdt</artifactId> - <version>0.21.0</version> - </dependency> - </dependencies> - </plugin> - --> - <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --> <plugin> <groupId>org.eclipse.m2e</groupId>
