Hi Anirvan, I don't know whether this fits your goals, but you can try a Hadoop Input Format like https://github.com/alexholmes/json-mapreduce#an-inputformat-to-work-with-splittable-multi-line-json
Best regards, Felix Am 26.11.2014 09:01 schrieb "Anirvan BASU" <anirvan.b...@inria.fr>: > Ciao Stefano ! > > Thanks for this early morning information, very helpful. > Yes, for outputting the data we are using WriteAsCSV which is stable over > different versions of Flink. > > Our current concern is "reading" a JSON file into a dataset. > As you can see, we have a simple 2-level JSON hierarchy that can be easily > mapped to a fixed-column CSV. > But the place we are stuck at currently is in reading the file correctly > into a tuple-based dataset in memory. > Once this is achieved, the rest will be fairly simple dataset > transformations. > > As you can see from the pasted code, we used functions developed from the > stream connector for our purposes. (Thanks to Gyula and Marton for that > information) > > If reading a JSON file using functions already developed is not possible > then we will have to develop some custom functions on hardcore string > operations to do the same. > That would be like reinventing the wheel ... :-(( > > Any advice in this regard will be highly appreciated. > > Thanks in advance to all, > Anirvan > > ----- Original Message ----- > > > From: "Stefano Bortoli" <s.bort...@gmail.com> > > To: "user" <u...@flink.incubator.apache.org> > > Sent: Wednesday, November 26, 2014 8:37:59 AM > > Subject: Re: Program crashes trying to read JSON file > > > You can output your results in different ways. If all you need is to > write a > > file, I normally use the writeAsText method (however, there is the > > writeAsCSV, writeAsFormattedText. Of write according to your custom > > FileOutputFormat. > > > datasetToPrint.writeAsText("/path/to/file/with/permission", > > WriteMode.OVERWRITE); > > > Keep in mind that this will output your tuple dataset. Therefore, if you > want > > to shape your output differently, It may be necessary to have further > > processing. > > > saluti, > > Stefano > > > 2014-11-25 22:04 GMT+01:00 Anirvan BASU < anirvan.b...@inria.fr > : > > > > Thanks to Aljoscha and Stefano for pointing out the flaw. > > > > > > We corrected the issue as follows: > > > > > > [CODE] > > > > > > import org.apache.flink.api.java.tuple. Tuple4 ; > > > > > import org.apache.flink.util.Collector; > > > > > import org.apache.flink.api.java.DataSet; > > > > > import org.apache.flink.api.java.ExecutionEnvironment; > > > > > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; > > > > > import org.apache.sling.commons.json.JSONException; > > > > > ... > > > > > > public static void main(String[] args) throws Exception { > > > > > > if(!parseParameters(args)) { > > > > > return; > > > > > } > > > > > > // set up the execution environment > > > > > final ExecutionEnvironment env = > > > ExecutionEnvironment.getExecutionEnvironment(); > > > > > > // get input data > > > > > DataSet<String> text = getTextDataSet(env); > > > > > > DataSet<Tuple4<Integer, String, String, Integer >> counts = > > > > > // split up the lines in pairs (4-tuples) containing: > > > (timestamp,uuid,event, > > > count ) > > > > > text.flatMap(new SelectDataFlatMap()) > > > > > // group by the tuple field "1" (an event - string) and sum up tuple > field > > > "3" (integer - value 1) > > > > > . groupBy(1) > > > > > . sum(3 ); > > > > > > // emit result > > > > > if(fileOutput) { > > > > > counts.writeAsCsv(outputPath, "\n", " "); > > > > > } else { > > > > > counts.print(); > > > > > } > > > > > > // execute program > > > > > env.execute("Weblogs Programme"); > > > > > } > > > > > > ... > > > > > > public static class SelectDataFlatMap extends > > > > > JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> { > > > > > > private static final long serialVersionUID = 1L; > > > > > > @Override > > > > > public void flatMap(String value, Collector<Tuple4<Integer, String, > String, > > > Integer>> record) > > > > > throws Exception { > > > > > try { > > > > > record.collect(new Tuple4<Integer, String, String, Integer>( > > > > > getInt(value, "timestamp"), > > > > > getString(value, "uuid"), > > > > > getString(value, "event"), > > > > > 1)); > > > > > } catch (JSONException e) { > > > > > System.err.println("Field not found"); > > > > > } > > > > > } > > > > > } > > > > > > [/CODE] > > > > > > However, this time the issue was different. > > > > > The programme executed correctly till status FINISHED. > > > > > However, there was no output :-(( > > > > > i.e. For each Task Manager, an empty file is written. > > > > > > When we checked further about the input text file that is read using > > > env.readTextFile() we find that instead of a text string (full text > > > dataset) > > > only a small string is written! > > > > > Something as : > > > > > org.apache.flink.api.java.operators.DataSource@6bd8b476 > > > > > > Worse still ! this string value sometimes remains the same over > multiple > > > runs > > > of the programme .... > > > > > Is this natural ? Is this just the handle to the file or the dataset ? > > > > > Is the Collector() working correctly also ? > > > > > > Note : > > > > > The actual JSON file (i.e. the text file that should be read) is of the > > > following nature, with a 2-level hierarchy for one field: > > > > > [JSON] > > > > > {timestamp: 1397731764 payload: {product: Younited uuid: > > > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd > platform: > > > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: > can-usage-v1 > > > event: General,Login,Success}} > > > > > {timestamp: 1397731765 payload: {product: Younited uuid: > > > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e > platform: > > > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: > can-usage-v1 > > > event: General,App,Opened}} > > > > > [/JSON] > > > > > > So now again, we are confused if we are doing it correctly :-(( > > > > > > Thanks in advance for helping us to understand where we are going > wrong. > > > > > Anirvan > > > > > > > From: "Stefano Bortoli" < s.bort...@gmail.com > > > > > > > > > > To: "user" < u...@flink.incubator.apache.org > > > > > > > > > > Cc: dev@flink.incubator.apache.org > > > > > > > > > Sent: Tuesday, November 25, 2014 5:05:34 PM > > > > > > > > > Subject: Re: Program crashes trying to read JSON file > > > > > > > > > > Very quickly, it seems you are trying to sum on Strings > > > > > > > > > > Caused by: org.apache.flink.api.java. > > > > > > > > > aggregation.UnsupportedAggregationTypeException: The type > > > > java.lang.String > > > > has currently not supported for built-in sum aggregations. > > > > > > > > > > Check your tuple types and be sure that you are not summing on > strings. > > > > > > > > > > 2014-11-25 16:55 GMT+01:00 Anirvan BASU < anirvan.b...@inria.fr > : > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > We are using Flink 0.7 and trying to read a large JSON file, > reading > > > > > some > > > > > fields into a flink (3-tuple based) dataset, then performing some > > > > > operations. > > > > > > > > > > > > > > > We encountered the following runtime error: > > > > > > > > > > > > > > > [QUOTE] > > > > > > > > > > > > > > Error: The main method caused an error. > > > > > > > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: The > main > > > > > method > > > > > caused an error. > > > > > > > > > > > > > > at > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) > > > > > > > > > > > > > > at org.apache.flink.client.program.Client.run(Client.java:244) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) > > > > > > > > > > > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) > > > > > > > > > > > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) > > > > > > > > > > > > > > Caused by: > > > > > > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: > > > > > The type java.lang.String has currently not supported for built-in > sum > > > > > aggregations. > > > > > > > > > > > > > > at > > > > > > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) > > > > > > > > > > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > > > > > > > > > > > at > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > > > > > > > > > > > > at > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > > > > > > > > > > > at java.lang.reflect.Method.invoke(Method.java:606) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) > > > > > > > > > > > > > > ... 6 more > > > > > > > > > > > > > > [/QUOTE] > > > > > > > > > > > > > > > The code snippet that could have caused this error (i.e. that we > > > > > edited) > > > > > is > > > > > the following > > > > > > > > > > > > > > > [CODE] > > > > > > > > > > > > > > > import org.apache.flink.api.java.tuple.Tuple3; > > > > > > > > > > > > > > import org.apache.flink.util.Collector; > > > > > > > > > > > > > > import org.apache.flink.api.java.DataSet; > > > > > > > > > > > > > > import org.apache.flink.api.java.ExecutionEnvironment; > > > > > > > > > > > > > > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; > > > > > > > > > > > > > > import org.apache.sling.commons.json.JSONException; > > > > > > > > > > > > > > ... > > > > > > > > > > > > > > > public static void main(String[] args) throws Exception { > > > > > > > > > > > > > > > if(!parseParameters(args)) { > > > > > > > > > > > > > > return; > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > // set up the execution environment > > > > > > > > > > > > > > final ExecutionEnvironment env = > > > > > ExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > > > > > > > > // get input data > > > > > > > > > > > > > > DataSet<String> text = getTextDataSet(env); > > > > > > > > > > > > > > > DataSet<Tuple3<Integer, String, String>> counts = > > > > > > > > > > > > > > // split up the lines in pairs (3-tuples) containing: > > > > > (timestamp,uuid,event) > > > > > > > > > > > > > > text.flatMap(new SelectDataFlatMap() ) > > > > > > > > > > > > > > // group by the tuple field "0" and sum up tuple field "1" > > > > > > > > > > > > > > .groupBy(2) > > > > > > > > > > > > > > .sum(2); > > > > > > > > > > > > > > > // emit result > > > > > > > > > > > > > > if(fileOutput) { > > > > > > > > > > > > > > counts.writeAsCsv(outputPath, "\n", " "); > > > > > > > > > > > > > > } else { > > > > > > > > > > > > > > counts.print(); > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > // execute program > > > > > > > > > > > > > > env.execute("Weblogs Programme"); > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > ... > > > > > > > > > > > > > > > public static class SelectDataFlatMap extends > > > > > > > > > > > > > > JSONParseFlatMap<String, Tuple3<Integer, String, String>> { > > > > > > > > > > > > > > > @Override > > > > > > > > > > > > > > public void flatMap(String value, Collector<Tuple3<Integer, String, > > > > > String>> > > > > > out) > > > > > > > > > > > > > > throws Exception { > > > > > > > > > > > > > > try { > > > > > > > > > > > > > > out.collect(new Tuple3<Integer, String, String>( > > > > > > > > > > > > > > getInt(value, "timestamp"), > > > > > > > > > > > > > > getString(value, "uuid"), > > > > > > > > > > > > > > getString(value, "event"))); > > > > > > > > > > > > > > } catch (JSONException e) { > > > > > > > > > > > > > > System.err.println("Field not found"); > > > > > > > > > > > > > > } > > > > > > > > > > > > > > } > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > [/CODE] > > > > > > > > > > > > > > > [QUOTE] > > > > > > > > > > > > > > Error: The main method caused an error. > > > > > > > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: The > main > > > > > method > > > > > caused an error. > > > > > > > > > > > > > > at > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) > > > > > > > > > > > > > > at org.apache.flink.client.program.Client.run(Client.java:244) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) > > > > > > > > > > > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) > > > > > > > > > > > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) > > > > > > > > > > > > > > Caused by: > > > > > > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: > > > > > The type java.lang.String has currently not supported for built-in > sum > > > > > aggregations. > > > > > > > > > > > > > > at > > > > > > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) > > > > > > > > > > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > > > > > > > > > > > at > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > > > > > > > > > > > > at > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > > > > > > > > > > > at java.lang.reflect.Method.invoke(Method.java:606) > > > > > > > > > > > > > > at > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) > > > > > > > > > > > > > > ... 6 more > > > > > > > > > > > > > > [/QUOTE] > > > > > > > > > > > > > > > The JSON file is of the following nature, with a 2-level hierarchy > for > > > > > one > > > > > field: > > > > > > > > > > > > > > [JSON] > > > > > > > > > > > > > > {timestamp: 1397731764 payload: {product: Younited uuid: > > > > > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd > > > > > platform: > > > > > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: > > > > > can-usage-v1 > > > > > event: General,Login,Success}} > > > > > > > > > > > > > > {timestamp: 1397731765 payload: {product: Younited uuid: > > > > > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e > > > > > platform: > > > > > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: > > > > > can-usage-v1 > > > > > event: General,App,Opened}} > > > > > > > > > > > > > > [/JSON] > > > > > > > > > > > > > > > Thanks in advance for helping us to understand where we are going > > > > > wrong. > > > > > > > > > > > > > > > Anirvan > > > > > > > > > >