Asen Milchev Kolev created AVRO-2511:
----------------------------------------
Summary: Avro Java DataFileWriter Flush() does not flush the
buffer to disk
Key: AVRO-2511
URL: https://issues.apache.org/jira/browse/AVRO-2511
Project: Apache Avro
Issue Type: Bug
Components: java
Affects Versions: 1.9.0
Reporter: Asen Milchev Kolev
If you try to use flush() with output stream instead of a file, the buffer is
not flushed to disk. Here is an example how I'm using it and there is no chance
to see data flushed to disk! Is that by design or it is a bug? I really need
this in order to determine file size and create a new one when max file size is
reached!
{code:java}
........
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new
GenericDatumWriter<>()); DataFileStream<GenericRecord>
dataFileStreamReader = new DataFileStream<>(is, new
GenericDatumReader<GenericRecord>())) {
dataFileWriter.setFlushOnEveryBlock(true);
FSDataOutputStream hdfsOutputStream = null;
dataFileWriter.setCodec(codecFactory); Schema schema =
dataFileStreamReader.getSchema(); if
(fileMode.equals(FileMode.APPEND)) {
FileContext fc = FileContext.getFileContext(hdfsConfiguration);
hdfsOutputStream = fileSystem.append(hdfsPath);
dataFileWriter.appendTo(new AvroFSInput(fc, hdfsPath), hdfsOutputStream);
} else {
hdfsOutputStream = fileSystem.create(hdfsPath);
fileManager.setCreationTime(hdfsPath);
dataFileWriter.create(schema, hdfsOutputStream); }
GenericRecord genericRecord = null;
while (dataFileStreamReader.hasNext()) {
if (fileManager.isLimitsReached()) {
IOUtils.closeStream(dataFileWriter);
fileSystem.rename(hdfsPath, fileManager.getFinalPath(hdfsPath));
LOG.info("Avro write completed for {0}", hdfsPath.toString());
hdfsPath = fileManager.getPath();
storePaths.add(hdfsPath); hdfsOutputStream =
fileSystem.create(hdfsPath);
fileManager.setCreationTime(hdfsPath);
dataFileWriter.setCodec(codecFactory);
dataFileWriter.create(schema, hdfsOutputStream);
LOG.info("Initiate Avro write to {0}", hdfsPath.toString());
} genericRecord =
dataFileStreamReader.next(genericRecord);
dataFileWriter.append(genericRecord); dataFileWriter.flush();// doesn't work at
all when we are using streams and not directly files!
fileManager.updateEntryCount(hdfsPath); }
{code}
--
This message was sent by Atlassian Jira
(v8.3.2#803003)