Caizhi Weng created FLINK-10036:
-----------------------------------

             Summary: Flink's CSV output format is not consistent with the 
standard.
                 Key: FLINK-10036
                 URL: https://issues.apache.org/jira/browse/FLINK-10036
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: Caizhi Weng


h2. What's the problem

Flink's CSV output format is not consistent with the standard 
(https://tools.ietf.org/html/rfc4180).

In CSV format file, if a field contains comma, quotes or new line, this field 
should be surrounded with quotes (see section 2.6 in the standard). 
Specifically, if a field contains quotes, the quotes should be escaped by 
double quotes (see section 2.7 in the standard).

For example, to express these two fields in a CSV file:

{noformat}
Hello,World
"Quoted" "String"
{noformat}

The CSV file should look like this:

{noformat}
"Hello,World","""Quoted"" ""String"""
{noformat}


But if we run the following Flink code to output these fields

{code:scala}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val data = List(
  ("Hello,World", "\"Quoted\" \"String\"")
)
val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
ds.select('a, 'b)

val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
ds.writeToSink(sink)

env.execute()
{code}

We get the following CSV:

{noformat}
Hello,World,"Quoted" "String"
{noformat}

which is not correct (there are actually 3 fields instead of 2 in this CSV 
file, and the last field is not valid).

h2. How am I going to fix it

I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
module, and add some test cases to ensure that my fix is correct.

But this fix will change the output of CsvTableSink, and will affect about 50 
test cases currently in the project.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to