Hi Wei and Till,
Thanks for the quick reply.

*@Wei,* I tried with code which you have suggested and it is working fine
but I have one use case where it is failing, below is the csv input data
format :
Csv file data format   :
-------------------------------
*field_id,data,*



*A,1B,3C,4D,9*
*E,0,0,0,0*

because of last row which contains more that two value, and its is
throwing *org.apache.flink.api.common.io.ParseException:
Row too short: field_id,data,*

How to handle the above corner case.Could you please suggest some way to
handle this.

*@Till,* Could you please elaborate more which you are suggesting? As per
my use case I am dealing with multiple csv files under the given folder and
reading line by line using TextInputFormat  and transform will not work by
using map operator. Correct me if i'm wrong .

Thanks & Regards,
-Deep


On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Deep,
>
> Could you use the TextInputFormat which reads a file line by line? That way
> you can do the JSON parsing as part of a mapper which consumes the file
> lines.
>
> Cheers,
> Till
>
> On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <weizhong0...@gmail.com> wrote:
>
> > Hi Deep,
> >
> > (redirecting this to user mailing list as this is not a dev question)
> >
> > You can try to set the line delimiter and field delimiter of the
> > RowCsvInputFormat to a non-printing character (assume there is no
> non-printing
> > characters in the csv files). It will read all the content of a csv file
> > into one Row. e.g.
> >
> > final StreamExecutionEnvironment env =
> >    StreamExecutionEnvironment.getExecutionEnvironment();
> > String path = "test";
> > TypeInformation[] fieldTypes = new TypeInformation[]{
> >    BasicTypeInfo.STRING_TYPE_INFO};
> > RowCsvInputFormat csvFormat =
> >    new RowCsvInputFormat(new Path(path), fieldTypes);
> > csvFormat.setNestedFileEnumeration(true);
> > csvFormat.setDelimiter((char) 0);
> > csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> > DataStream<Row>
> >    lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> >    -1);lines.map(value -> value).print();
> > env.execute();
> >
> >
> > Then you can convert the content of the csv files to json manually.
> >
> > Best,
> > Wei
> >
> >
> > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <about.d...@gmail.com> 写道:
> >
> > Hi  Guys,
> >
> > Below is my code snippet , which read all csv files under the given
> folder
> > row by row but my requirement is to read csv file at a time and  convert
> as
> > json which will looks like :
> > {"A":"1","B":"3","C":"4","D":9}
> >
> > Csv file data format   :
> > -------------------------------
> > *field_id,data,*
> >
> >
> >
> > *A,1B,3C,4D,9*
> >
> > Code snippet:
> > --------------------------
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();String path =
> > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> > fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
> >  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> > RowCsvInputFormat(            new Path(path),
> >
> >
> fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> > -1);lines.map(value -> value).print();*
> >
> >
> > Any help is highly appreciated.
> >
> > Thanks,
> > -Deep
> >
> >
> >
>

Reply via email to