[ 
https://issues.apache.org/jira/browse/FLINK-35240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841035#comment-17841035
 ] 

Rob Young commented on FLINK-35240:
-----------------------------------

Hi, I've been looking into this.

1. 
[FLUSH_AFTER_WRITE_VALUE|https://github.com/FasterXML/jackson-databind/blob/52c74def0e487e1149bdc63783b1086ddcb095b7/src/main/java/com/fasterxml/jackson/databind/SerializationFeature.java#L167]
 is only applied to a subset of writeValue methods of ObjectMapper and 
ObjectWriter.
> Feature that determines whether JsonGenerator. flush() is called after 
> writeValue() method that takes JsonGenerator as an argument completes (i. e. 
> does NOT affect methods that use other destinations);
So it only applies if JsonGenerator is an argument to writeValue. The method 
we're using is passing OutputStream.

2. Disabling AUTO_CLOSE_TARGET prevents the underlying stream being closed 
during writeValue, which is what we want.

3. There's another feature 
[FLUSH_PASSED_TO_STREAM|https://github.com/FasterXML/jackson-core/blob/02aba8a36aa4c62e02196ee1d64c027a2d03ecdd/src/main/java/com/fasterxml/jackson/core/JsonGenerator.java#L105]
 being applied by the Jackson CsvGenerator 
[here|https://github.com/FasterXML/jackson-dataformats-text/blob/3d3165e58b90618a5fbccf630f1604a383afe78c/csv/src/main/java/com/fasterxml/jackson/dataformat/csv/CsvGenerator.java#L505]
 which is enabled by default. This causes a flush on every writeValue, when the 
CsvGenerator is closed. I think this is what's provoking the flush.

Experimentally disabling FLUSH_PASSED_TO_STREAM broke integration tests because 
Jackson is internally wrapping the stream in it's own Writer class which has 
it's own buffering. So if you tell Jackson not to flush, the jackson writer 
isn't flushed, and the CSV bytes are never written to the underlying stream.

One workaround I found is wrapping the stream in an implementation that ignores 
the flush call and passing that to Jackson. 
https://github.com/robobario/flink/commit/ae3fdb1ca9de748df791af232bba57d6d7289a79

> Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record
> ---------------------------------------------------------
>
>                 Key: FLINK-35240
>                 URL: https://issues.apache.org/jira/browse/FLINK-35240
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: Zhongqiang Gong
>            Priority: Minor
>         Attachments: image-2024-04-26-00-23-29-975.png
>
>
> *Reproduce:*
> * According to user email: 
> https://lists.apache.org/thread/9j5z8hv4vjkd54dkzqy1ryyvm0l5rxhc
> *  !image-2024-04-26-00-23-29-975.png! 
> *Analysis:*
> * `org.apache.flink.formats.csv.CsvBulkWriter#addElement` will flush per 
> record.
> *Solution:*
> * I think maybe we can disable `FLUSH_AFTER_WRITE_VALUE` to avoid flush when 
> a record added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to