robobario commented on code in PR #24730:
URL: https://github.com/apache/flink/pull/24730#discussion_r1582425139
##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java:
##########
@@ -51,13 +54,19 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
checkNotNull(mapper);
checkNotNull(schema);
+ // Prevent Jackson's writeValue() method calls from closing the stream.
+ mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+ mapper.disable(SerializationFeature.FLUSH_AFTER_WRITE_VALUE);
+
this.converter = checkNotNull(converter);
this.stream = checkNotNull(stream);
this.converterContext = converterContext;
this.csvWriter = mapper.writer(schema);
-
- // Prevent Jackson's writeValue() method calls from closing the stream.
- mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+ try {
+ this.generator = csvWriter.createGenerator(stream,
JsonEncoding.UTF8);
Review Comment:
The Generator manages some resources with their own buffers so I think we
should `close` the generator during `finish()`. The underlying `CsvEncoder`
uses a char buffer that jackson [can
recycle](https://github.com/FasterXML/jackson-dataformats-text/blob/3d3165e58b90618a5fbccf630f1604a383afe78c/csv/src/main/java/com/fasterxml/jackson/dataformat/csv/impl/CsvEncoder.java#L1015)
using a threadlocal pool.
Disabling `AUTO_CLOSE_TARGET` will still prevent the closing of the
underlying stream.
##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java:
##########
@@ -98,11 +107,12 @@ static <T> CsvBulkWriter<T, T, Void> forPojo(Class<T>
pojoClass, FSDataOutputStr
@Override
public void addElement(T element) throws IOException {
final R r = converter.convert(element, converterContext);
- csvWriter.writeValue(stream, r);
+ csvWriter.writeValue(generator, r);
}
@Override
public void flush() throws IOException {
+ generator.flush();
stream.flush();
Review Comment:
The `generator.flush()` also flushes the stream, so the following
`stream.flush()` is redundant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]