Hello all, We are using Flink 0.7 and trying to read a large JSON file, reading some fields into a flink (3-tuple based) dataset, then performing some operations.
We encountered the following runtime error: [QUOTE] Error: The main method caused an error. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) at org.apache.flink.client.program.Client.run(Client.java:244) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations. at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) ... 6 more [/QUOTE] The code snippet that could have caused this error (i.e. that we edited) is the following [CODE] import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; import org.apache.sling.commons.json.JSONException; ... public static void main(String[] args) throws Exception { if(!parseParameters(args)) { return; } // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input data DataSet<String> text = getTextDataSet(env); DataSet<Tuple3<Integer, String, String>> counts = // split up the lines in pairs (3-tuples) containing: (timestamp,uuid,event) text.flatMap(new SelectDataFlatMap() ) // group by the tuple field "0" and sum up tuple field "1" .groupBy(2) .sum(2); // emit result if(fileOutput) { counts.writeAsCsv(outputPath, "\n", " "); } else { counts.print(); } // execute program env.execute("Weblogs Programme"); } ... public static class SelectDataFlatMap extends JSONParseFlatMap<String, Tuple3<Integer, String, String>> { @Override public void flatMap(String value, Collector<Tuple3<Integer, String, String>> out) throws Exception { try { out.collect(new Tuple3<Integer, String, String>( getInt(value, "timestamp"), getString(value, "uuid"), getString(value, "event"))); } catch (JSONException e) { System.err.println("Field not found"); } } } [/CODE] [QUOTE] Error: The main method caused an error. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) at org.apache.flink.client.program.Client.run(Client.java:244) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations. at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) ... 6 more [/QUOTE] The JSON file is of the following nature, with a 2-level hierarchy for one field: [JSON] {timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}} {timestamp: 1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event: General,App,Opened}} [/JSON] Thanks in advance for helping us to understand where we are going wrong. Anirvan