Hi, looking at your code, it seems that you are creating a DataSet for each file in the "directory". Flink can also read entire directories.
Now regarding the actual problem: How are you starting the Flink job? Out of your IDE, or using the "./bin/flink run" tool? Best, Robert On Fri, Feb 6, 2015 at 12:27 PM, Nam-Luc Tran <namluc.t...@euranova.eu> wrote: > Hello, > > I am trying to use Java 8 lambdas in my project and hit the following > error: > > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: The > generic type parameters of 'Tuple2' are missing. > It seems that your compiler has not stored them into the .class > file. > Currently, only the Eclipse JDT compiler preserves the type > information necessary to use the lambdas feature type-safely. > See the documentation for more information about how to compile jobs > containing lambda expressions. > at > > org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:779) > at > > org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:765) > at > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:135) > at > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:78) > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > at eu.euranova.flink.Axa.main(Axa.java:62) > > My very simple code is the following: > > File directory = new File( > "PATH TO A DIRECTORY WITH CSV FILES"); > DataSet set = env.fromElements(new Tuple3(0, 0.0, 0.0)); > for (File file : directory.listFiles()) { > int pathID = 0; > String filePath = "file://" + file.getAbsolutePath(); > DataSet set2 = > > env.readCsvFile(filePath).ignoreFirstLine().includeFields("11").types(Double.class,Double.class); > DataSet set3 = set2.map(tuple -> new Tuple3(pathID, tuple.f0, > tuple.f1)); > set = set.union(set3); > } > > I followed the steps in the Java 8 documentation section > ( > http://flink.apache.org/docs/0.8/java8_programming_guide.html#compiler-limitations > ) > and have applied the following to the pom.xml file created using the > flink archetype: > - Modified java 1.6 reference to 1.8 > - Uncommented the section related to Java 8 lambdas > - Installed Eclipse Java developer tools (JDT) > - Installed m2e-jdt connector > > The pom.xml does not show any error and builds fine. > > Am I missing something? Do I need to explicity set up Eclipse JDT? The > only installed environment shown in my preferences is the > /usr/java/jdk-1.8.0_31 from oracle. > > Thanks and best regards, > > Tran Nam-Luc >