Hello all, 

We are using Flink 0.7 and trying to read a large JSON file, reading some 
fields into a flink (3-tuple based) dataset, then performing some operations. 

We encountered the following runtime error: 

[QUOTE] 
Error: The main method caused an error. 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error. 
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
 
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
 
at org.apache.flink.client.program.Client.run(Client.java:244) 
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) 
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) 
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) 
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) 
Caused by: 
org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The 
type java.lang.String has currently not supported for built-in sum 
aggregations. 
at 
org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
 
at 
org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
 
at 
org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
 
at 
org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
 
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
 
... 6 more 
[/QUOTE] 



The code snippet that could have caused this error (i.e. that we edited) is the 
following 

[CODE] 

import org.apache.flink.api.java.tuple.Tuple3; 
import org.apache.flink.util.Collector; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; 
import org.apache.sling.commons.json.JSONException; 
... 

public static void main(String[] args) throws Exception { 

if(!parseParameters(args)) { 
return; 
} 

// set up the execution environment 
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment(); 

// get input data 
DataSet<String> text = getTextDataSet(env); 

DataSet<Tuple3<Integer, String, String>> counts = 
// split up the lines in pairs (3-tuples) containing: (timestamp,uuid,event) 
text.flatMap(new SelectDataFlatMap() ) 
// group by the tuple field "0" and sum up tuple field "1" 
.groupBy(2) 
.sum(2); 

// emit result 
if(fileOutput) { 
counts.writeAsCsv(outputPath, "\n", " "); 
} else { 
counts.print(); 
} 

// execute program 
env.execute("Weblogs Programme"); 
} 

... 

public static class SelectDataFlatMap extends 
JSONParseFlatMap<String, Tuple3<Integer, String, String>> { 

@Override 
public void flatMap(String value, Collector<Tuple3<Integer, String, String>> 
out) 
throws Exception { 
try { 
out.collect(new Tuple3<Integer, String, String>( 
getInt(value, "timestamp"), 
getString(value, "uuid"), 
getString(value, "event"))); 
} catch (JSONException e) { 
System.err.println("Field not found"); 
} 
} 
} 

[/CODE] 



[QUOTE] 
Error: The main method caused an error. 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error. 
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
 
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
 
at org.apache.flink.client.program.Client.run(Client.java:244) 
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) 
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) 
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) 
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) 
Caused by: 
org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The 
type java.lang.String has currently not supported for built-in sum 
aggregations. 
at 
org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
 
at 
org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
 
at 
org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
 
at 
org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
 
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
 
... 6 more 
[/QUOTE] 


The JSON file is of the following nature, with a 2-level hierarchy for one 
field: 
[JSON] 
{timestamp: 1397731764 payload: {product: Younited uuid: 
754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: 
native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 
event: General,Login,Success}} 
{timestamp: 1397731765 payload: {product: Younited uuid: 
e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: 
native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 
event: General,App,Opened}} 
[/JSON] 



Thanks in advance for helping us to understand where we are going wrong. 

Anirvan 

Reply via email to