Hi Anirvan,
I don't know whether this fits your goals, but you can try a Hadoop Input
Format like
https://github.com/alexholmes/json-mapreduce#an-inputformat-to-work-with-splittable-multi-line-json

Best regards,
Felix
Am 26.11.2014 09:01 schrieb "Anirvan BASU" <anirvan.b...@inria.fr>:

> Ciao Stefano !
>
> Thanks for this early morning information, very helpful.
> Yes, for outputting the data we are using WriteAsCSV which is stable over
> different versions of Flink.
>
> Our current concern is "reading" a JSON file into a dataset.
> As you can see, we have a simple 2-level JSON hierarchy that can be easily
> mapped to a fixed-column CSV.
> But the place we are stuck at currently is in reading the file correctly
> into a tuple-based dataset in memory.
> Once this is achieved, the rest will be fairly simple dataset
> transformations.
>
> As you can see from the pasted code, we used functions developed from the
> stream connector for our purposes. (Thanks to Gyula and Marton for that
> information)
>
> If reading a JSON file using functions already developed is not possible
> then we will have to develop some custom functions on hardcore string
> operations to do the same.
> That would be like reinventing the wheel ... :-((
>
> Any advice in this regard will be highly appreciated.
>
> Thanks in advance to all,
> Anirvan
>
> ----- Original Message -----
>
> > From: "Stefano Bortoli" <s.bort...@gmail.com>
> > To: "user" <u...@flink.incubator.apache.org>
> > Sent: Wednesday, November 26, 2014 8:37:59 AM
> > Subject: Re: Program crashes trying to read JSON file
>
> > You can output your results in different ways. If all you need is to
> write a
> > file, I normally use the writeAsText method (however, there is the
> > writeAsCSV, writeAsFormattedText. Of write according to your custom
> > FileOutputFormat.
>
> > datasetToPrint.writeAsText("/path/to/file/with/permission",
> > WriteMode.OVERWRITE);
>
> > Keep in mind that this will output your tuple dataset. Therefore, if you
> want
> > to shape your output differently, It may be necessary to have further
> > processing.
>
> > saluti,
> > Stefano
>
> > 2014-11-25 22:04 GMT+01:00 Anirvan BASU < anirvan.b...@inria.fr > :
>
> > > Thanks to Aljoscha and Stefano for pointing out the flaw.
> >
>
> > > We corrected the issue as follows:
> >
>
> > > [CODE]
> >
>
> > > import org.apache.flink.api.java.tuple. Tuple4 ;
> >
> > > import org.apache.flink.util.Collector;
> >
> > > import org.apache.flink.api.java.DataSet;
> >
> > > import org.apache.flink.api.java.ExecutionEnvironment;
> >
> > > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> >
> > > import org.apache.sling.commons.json.JSONException;
> >
> > > ...
> >
>
> > > public static void main(String[] args) throws Exception {
> >
>
> > > if(!parseParameters(args)) {
> >
> > > return;
> >
> > > }
> >
>
> > > // set up the execution environment
> >
> > > final ExecutionEnvironment env =
> > > ExecutionEnvironment.getExecutionEnvironment();
> >
>
> > > // get input data
> >
> > > DataSet<String> text = getTextDataSet(env);
> >
>
> > > DataSet<Tuple4<Integer, String, String, Integer >> counts =
> >
> > > // split up the lines in pairs (4-tuples) containing:
> > > (timestamp,uuid,event,
> > > count )
> >
> > > text.flatMap(new SelectDataFlatMap())
> >
> > > // group by the tuple field "1" (an event - string) and sum up tuple
> field
> > > "3" (integer - value 1)
> >
> > > . groupBy(1)
> >
> > > . sum(3 );
> >
>
> > > // emit result
> >
> > > if(fileOutput) {
> >
> > > counts.writeAsCsv(outputPath, "\n", " ");
> >
> > > } else {
> >
> > > counts.print();
> >
> > > }
> >
>
> > > // execute program
> >
> > > env.execute("Weblogs Programme");
> >
> > > }
> >
>
> > > ...
> >
>
> > > public static class SelectDataFlatMap extends
> >
> > > JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> {
> >
>
> > > private static final long serialVersionUID = 1L;
> >
>
> > > @Override
> >
> > > public void flatMap(String value, Collector<Tuple4<Integer, String,
> String,
> > > Integer>> record)
> >
> > > throws Exception {
> >
> > > try {
> >
> > > record.collect(new Tuple4<Integer, String, String, Integer>(
> >
> > > getInt(value, "timestamp"),
> >
> > > getString(value, "uuid"),
> >
> > > getString(value, "event"),
> >
> > > 1));
> >
> > > } catch (JSONException e) {
> >
> > > System.err.println("Field not found");
> >
> > > }
> >
> > > }
> >
> > > }
> >
>
> > > [/CODE]
> >
>
> > > However, this time the issue was different.
> >
> > > The programme executed correctly till status FINISHED.
> >
> > > However, there was no output :-((
> >
> > > i.e. For each Task Manager, an empty file is written.
> >
>
> > > When we checked further about the input text file that is read using
> > > env.readTextFile() we find that instead of a text string (full text
> > > dataset)
> > > only a small string is written!
> >
> > > Something as :
> >
> > > org.apache.flink.api.java.operators.DataSource@6bd8b476
> >
>
> > > Worse still ! this string value sometimes remains the same over
> multiple
> > > runs
> > > of the programme ....
> >
> > > Is this natural ? Is this just the handle to the file or the dataset ?
> >
> > > Is the Collector() working correctly also ?
> >
>
> > > Note :
> >
> > > The actual JSON file (i.e. the text file that should be read) is of the
> > > following nature, with a 2-level hierarchy for one field:
> >
> > > [JSON]
> >
> > > {timestamp: 1397731764 payload: {product: Younited uuid:
> > > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
> platform:
> > > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type:
> can-usage-v1
> > > event: General,Login,Success}}
> >
> > > {timestamp: 1397731765 payload: {product: Younited uuid:
> > > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
> platform:
> > > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type:
> can-usage-v1
> > > event: General,App,Opened}}
> >
> > > [/JSON]
> >
>
> > > So now again, we are confused if we are doing it correctly :-((
> >
>
> > > Thanks in advance for helping us to understand where we are going
> wrong.
> >
> > > Anirvan
> >
>
> > > > From: "Stefano Bortoli" < s.bort...@gmail.com >
> > >
> >
> > > > To: "user" < u...@flink.incubator.apache.org >
> > >
> >
> > > > Cc: dev@flink.incubator.apache.org
> > >
> >
> > > > Sent: Tuesday, November 25, 2014 5:05:34 PM
> > >
> >
> > > > Subject: Re: Program crashes trying to read JSON file
> > >
> >
>
> > > > Very quickly, it seems you are trying to sum on Strings
> > >
> >
>
> > > > Caused by: org.apache.flink.api.java.
> > >
> >
> > > > aggregation.UnsupportedAggregationTypeException: The type
> > > > java.lang.String
> > > > has currently not supported for built-in sum aggregations.
> > >
> >
>
> > > > Check your tuple types and be sure that you are not summing on
> strings.
> > >
> >
>
> > > > 2014-11-25 16:55 GMT+01:00 Anirvan BASU < anirvan.b...@inria.fr > :
> > >
> >
>
> > > > > Hello all,
> > > >
> > >
> >
>
> > > > > We are using Flink 0.7 and trying to read a large JSON file,
> reading
> > > > > some
> > > > > fields into a flink (3-tuple based) dataset, then performing some
> > > > > operations.
> > > >
> > >
> >
>
> > > > > We encountered the following runtime error:
> > > >
> > >
> >
>
> > > > > [QUOTE]
> > > >
> > >
> >
> > > > > Error: The main method caused an error.
> > > >
> > >
> >
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > > method
> > > > > caused an error.
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.program.Client.run(Client.java:244)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> > > >
> > >
> >
> > > > > Caused by:
> > > > >
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > > > > The type java.lang.String has currently not supported for built-in
> sum
> > > > > aggregations.
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> > > >
> > >
> >
> > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >
> > >
> >
> > > > > at
> > > > >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > >
> > >
> >
> > > > > at
> > > > >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >
> > >
> >
> > > > > at java.lang.reflect.Method.invoke(Method.java:606)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> > > >
> > >
> >
> > > > > ... 6 more
> > > >
> > >
> >
> > > > > [/QUOTE]
> > > >
> > >
> >
>
> > > > > The code snippet that could have caused this error (i.e. that we
> > > > > edited)
> > > > > is
> > > > > the following
> > > >
> > >
> >
>
> > > > > [CODE]
> > > >
> > >
> >
>
> > > > > import org.apache.flink.api.java.tuple.Tuple3;
> > > >
> > >
> >
> > > > > import org.apache.flink.util.Collector;
> > > >
> > >
> >
> > > > > import org.apache.flink.api.java.DataSet;
> > > >
> > >
> >
> > > > > import org.apache.flink.api.java.ExecutionEnvironment;
> > > >
> > >
> >
> > > > > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> > > >
> > >
> >
> > > > > import org.apache.sling.commons.json.JSONException;
> > > >
> > >
> >
> > > > > ...
> > > >
> > >
> >
>
> > > > > public static void main(String[] args) throws Exception {
> > > >
> > >
> >
>
> > > > > if(!parseParameters(args)) {
> > > >
> > >
> >
> > > > > return;
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
>
> > > > > // set up the execution environment
> > > >
> > >
> >
> > > > > final ExecutionEnvironment env =
> > > > > ExecutionEnvironment.getExecutionEnvironment();
> > > >
> > >
> >
>
> > > > > // get input data
> > > >
> > >
> >
> > > > > DataSet<String> text = getTextDataSet(env);
> > > >
> > >
> >
>
> > > > > DataSet<Tuple3<Integer, String, String>> counts =
> > > >
> > >
> >
> > > > > // split up the lines in pairs (3-tuples) containing:
> > > > > (timestamp,uuid,event)
> > > >
> > >
> >
> > > > > text.flatMap(new SelectDataFlatMap() )
> > > >
> > >
> >
> > > > > // group by the tuple field "0" and sum up tuple field "1"
> > > >
> > >
> >
> > > > > .groupBy(2)
> > > >
> > >
> >
> > > > > .sum(2);
> > > >
> > >
> >
>
> > > > > // emit result
> > > >
> > >
> >
> > > > > if(fileOutput) {
> > > >
> > >
> >
> > > > > counts.writeAsCsv(outputPath, "\n", " ");
> > > >
> > >
> >
> > > > > } else {
> > > >
> > >
> >
> > > > > counts.print();
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
>
> > > > > // execute program
> > > >
> > >
> >
> > > > > env.execute("Weblogs Programme");
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
>
> > > > > ...
> > > >
> > >
> >
>
> > > > > public static class SelectDataFlatMap extends
> > > >
> > >
> >
> > > > > JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
> > > >
> > >
> >
>
> > > > > @Override
> > > >
> > >
> >
> > > > > public void flatMap(String value, Collector<Tuple3<Integer, String,
> > > > > String>>
> > > > > out)
> > > >
> > >
> >
> > > > > throws Exception {
> > > >
> > >
> >
> > > > > try {
> > > >
> > >
> >
> > > > > out.collect(new Tuple3<Integer, String, String>(
> > > >
> > >
> >
> > > > > getInt(value, "timestamp"),
> > > >
> > >
> >
> > > > > getString(value, "uuid"),
> > > >
> > >
> >
> > > > > getString(value, "event")));
> > > >
> > >
> >
> > > > > } catch (JSONException e) {
> > > >
> > >
> >
> > > > > System.err.println("Field not found");
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
>
> > > > > [/CODE]
> > > >
> > >
> >
>
> > > > > [QUOTE]
> > > >
> > >
> >
> > > > > Error: The main method caused an error.
> > > >
> > >
> >
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > > method
> > > > > caused an error.
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.program.Client.run(Client.java:244)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> > > >
> > >
> >
> > > > > Caused by:
> > > > >
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > > > > The type java.lang.String has currently not supported for built-in
> sum
> > > > > aggregations.
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> > > >
> > >
> >
> > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >
> > >
> >
> > > > > at
> > > > >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > >
> > >
> >
> > > > > at
> > > > >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >
> > >
> >
> > > > > at java.lang.reflect.Method.invoke(Method.java:606)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> > > >
> > >
> >
> > > > > ... 6 more
> > > >
> > >
> >
> > > > > [/QUOTE]
> > > >
> > >
> >
>
> > > > > The JSON file is of the following nature, with a 2-level hierarchy
> for
> > > > > one
> > > > > field:
> > > >
> > >
> >
> > > > > [JSON]
> > > >
> > >
> >
> > > > > {timestamp: 1397731764 payload: {product: Younited uuid:
> > > > > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
> > > > > platform:
> > > > > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type:
> > > > > can-usage-v1
> > > > > event: General,Login,Success}}
> > > >
> > >
> >
> > > > > {timestamp: 1397731765 payload: {product: Younited uuid:
> > > > > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
> > > > > platform:
> > > > > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type:
> > > > > can-usage-v1
> > > > > event: General,App,Opened}}
> > > >
> > >
> >
> > > > > [/JSON]
> > > >
> > >
> >
>
> > > > > Thanks in advance for helping us to understand where we are going
> > > > > wrong.
> > > >
> > >
> >
>
> > > > > Anirvan
> > > >
> > >
> >
>

Reply via email to