[ 
https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16571238#comment-16571238
 ] 

Fabian Hueske commented on FLINK-10036:
---------------------------------------

Flink neither provides an RFC4180 compatible source or sink.

I think before we change the output to be RFC4180 compatible, we should ensure 
that we can also read the data back.
Having RFC4180 compatible serialization and deserialization schemas 
(FLINK-9964) is a good first step.

The {{SerializationSchema}} can be used to write RFC4180 files. However, 
reading such files is more tricky if it should be done in parallel.
Since it is not possible to determine the start of a record (the beginning of a 
row might be in an escaped and wrapped field), RFC4180 files can only be read 
sequentially. The source implementation of FLINK-7050 is not based on 
{{DeserializationSchema}} but might be ported to that. It should provide a 
configuration parameter to adjust whether files are read in parallel or not.

> Flink's CSV output format is not consistent with the standard.
> --------------------------------------------------------------
>
>                 Key: FLINK-10036
>                 URL: https://issues.apache.org/jira/browse/FLINK-10036
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Caizhi Weng
>            Priority: Minor
>
> h2. What's the problem
> Flink's CSV output format is not consistent with the standard 
> ([https://tools.ietf.org/html/rfc4180]).
> In CSV format file, if a field contains comma, quotes or new line, this field 
> should be surrounded with quotes (see section 2.6 in the standard). 
> Specifically, if a field contains quotes, the quotes should be escaped by 
> double quotes (see section 2.7 in the standard).
> For example, to express these two fields in a CSV file:
> {noformat}
> Hello,World
> "Quoted" "String"
> {noformat}
> The CSV file should look like this:
> {noformat}
> "Hello,World","""Quoted"" ""String"""
> {noformat}
> But if we run the following Flink code to output these fields
> {code:java}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> val data = List(
>   ("Hello,World", "\"Quoted\" \"String\"")
> )
> val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
> ds.select('a, 'b)
> val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
> ds.writeToSink(sink)
> env.execute()
> {code}
> We get the following CSV:
> {noformat}
> Hello,World,"Quoted" "String"
> {noformat}
> which is not correct (there are actually 3 fields instead of 2 in this CSV 
> file, and the last field is not valid).
> h2. How am I going to fix it
> I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
> module, and add some test cases to ensure that my fix is correct.
> h2. What's affected
> This fix will change the output of CsvTableSink, and will affect the test 
> cases whose results are written to a CSV file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to