[ 
https://issues.apache.org/jira/browse/AVRO-2511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asen Milchev Kolev updated AVRO-2511:
-------------------------------------
    Description: 
If you try to use flush() with output stream instead of a file, the buffer is 
not flushed to disk. Here is an example how I'm using it and there is no chance 
to see data flushed to disk! Is that by design or it is a bug? I really need 
this in order to determine file size and create a new one when max file size is 
reached!
{code:java}
........
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new 
GenericDatumWriter<>());                     DataFileStream<GenericRecord> 
dataFileStreamReader = new DataFileStream<>(is, new 
GenericDatumReader<GenericRecord>()))               
 {                    
    dataFileWriter.setFlushOnEveryBlock(true);                    
    FSDataOutputStream hdfsOutputStream = null;                    
    dataFileWriter.setCodec(codecFactory);                    
    Schema schema = dataFileStreamReader.getSchema();                    
    if (fileMode.equals(FileMode.APPEND))
    {
      FileContext fc = FileContext.getFileContext(hdfsConfiguration);
      hdfsOutputStream = fileSystem.append(hdfsPath);
      dataFileWriter.appendTo(new AvroFSInput(fc, hdfsPath), hdfsOutputStream);
    }
    else
    {
      hdfsOutputStream = fileSystem.create(hdfsPath);                        
      fileManager.setCreationTime(hdfsPath);
      dataFileWriter.create(schema, hdfsOutputStream);
    }
    
    GenericRecord genericRecord = null;
    while (dataFileStreamReader.hasNext())
    {
      if (fileManager.isLimitsReached())
      {                            
        IOUtils.closeStream(dataFileWriter);
        fileSystem.rename(hdfsPath, fileManager.getFinalPath(hdfsPath));
        LOG.info("Avro write completed for {0}", hdfsPath.toString());
        hdfsPath = fileManager.getPath();
        storePaths.add(hdfsPath);
        hdfsOutputStream = fileSystem.create(hdfsPath);
        fileManager.setCreationTime(hdfsPath);
        dataFileWriter.setCodec(codecFactory);                            
        dataFileWriter.create(schema, hdfsOutputStream);
        LOG.info("Initiate Avro write to {0}", hdfsPath.toString());
      }                        

      genericRecord = dataFileStreamReader.next(genericRecord);
      dataFileWriter.append(genericRecord);
      // doesn't work at all when we are using streams (i.e. 
dataFileWriter.create(schema, hdfsOutputStream);) and not directly files (i.e. 
dataFileWriter.create(schema, new File("..."));)! 
      dataFileWriter.flush();                        
      fileManager.updateEntryCount(hdfsPath);
  }
{code}

  was:
If you try to use flush() with output stream instead of a file, the buffer is 
not flushed to disk. Here is an example how I'm using it and there is no chance 
to see data flushed to disk! Is that by design or it is a bug? I really need 
this in order to determine file size and create a new one when max file size is 
reached!
{code:java}
........
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new 
GenericDatumWriter<>());                     DataFileStream<GenericRecord> 
dataFileStreamReader = new DataFileStream<>(is, new 
GenericDatumReader<GenericRecord>()))                {                    
dataFileWriter.setFlushOnEveryBlock(true);                    
FSDataOutputStream hdfsOutputStream = null;                    
dataFileWriter.setCodec(codecFactory);                    Schema schema = 
dataFileStreamReader.getSchema();                    if 
(fileMode.equals(FileMode.APPEND))                    {                        
FileContext fc = FileContext.getFileContext(hdfsConfiguration);                 
       hdfsOutputStream = fileSystem.append(hdfsPath);                        
dataFileWriter.appendTo(new AvroFSInput(fc, hdfsPath), hdfsOutputStream);       
             }                    else                    {                     
   hdfsOutputStream = fileSystem.create(hdfsPath);                        
fileManager.setCreationTime(hdfsPath);                        
dataFileWriter.create(schema, hdfsOutputStream);                    }
                    GenericRecord genericRecord = null;                    
while (dataFileStreamReader.hasNext())                    {                     
   if (fileManager.isLimitsReached())                        {                  
          IOUtils.closeStream(dataFileWriter);                            
fileSystem.rename(hdfsPath, fileManager.getFinalPath(hdfsPath));                
            LOG.info("Avro write completed for {0}", hdfsPath.toString());      
                      hdfsPath = fileManager.getPath();                         
   storePaths.add(hdfsPath);                            hdfsOutputStream = 
fileSystem.create(hdfsPath);                            
fileManager.setCreationTime(hdfsPath);                            
dataFileWriter.setCodec(codecFactory);                            
dataFileWriter.create(schema, hdfsOutputStream);                            
LOG.info("Initiate Avro write to {0}", hdfsPath.toString());                    
    }                        genericRecord = 
dataFileStreamReader.next(genericRecord);                        
dataFileWriter.append(genericRecord); dataFileWriter.flush();// doesn't work at 
all when we are using streams and not directly files!                        
fileManager.updateEntryCount(hdfsPath);                    }
{code}


> Avro Java DataFileWriter Flush() does not flush the buffer to disk
> ------------------------------------------------------------------
>
>                 Key: AVRO-2511
>                 URL: https://issues.apache.org/jira/browse/AVRO-2511
>             Project: Apache Avro
>          Issue Type: Bug
>          Components: java
>    Affects Versions: 1.9.0
>            Reporter: Asen Milchev Kolev
>            Priority: Major
>
> If you try to use flush() with output stream instead of a file, the buffer is 
> not flushed to disk. Here is an example how I'm using it and there is no 
> chance to see data flushed to disk! Is that by design or it is a bug? I 
> really need this in order to determine file size and create a new one when 
> max file size is reached!
> {code:java}
> ........
> try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new 
> GenericDatumWriter<>());                     DataFileStream<GenericRecord> 
> dataFileStreamReader = new DataFileStream<>(is, new 
> GenericDatumReader<GenericRecord>()))               
>  {                    
>     dataFileWriter.setFlushOnEveryBlock(true);                    
>     FSDataOutputStream hdfsOutputStream = null;                    
>     dataFileWriter.setCodec(codecFactory);                    
>     Schema schema = dataFileStreamReader.getSchema();                    
>     if (fileMode.equals(FileMode.APPEND))
>     {
>       FileContext fc = FileContext.getFileContext(hdfsConfiguration);
>       hdfsOutputStream = fileSystem.append(hdfsPath);
>       dataFileWriter.appendTo(new AvroFSInput(fc, hdfsPath), 
> hdfsOutputStream);
>     }
>     else
>     {
>       hdfsOutputStream = fileSystem.create(hdfsPath);                        
>       fileManager.setCreationTime(hdfsPath);
>       dataFileWriter.create(schema, hdfsOutputStream);
>     }
>     
>     GenericRecord genericRecord = null;
>     while (dataFileStreamReader.hasNext())
>     {
>       if (fileManager.isLimitsReached())
>       {                            
>         IOUtils.closeStream(dataFileWriter);
>         fileSystem.rename(hdfsPath, fileManager.getFinalPath(hdfsPath));
>         LOG.info("Avro write completed for {0}", hdfsPath.toString());
>         hdfsPath = fileManager.getPath();
>         storePaths.add(hdfsPath);
>         hdfsOutputStream = fileSystem.create(hdfsPath);
>         fileManager.setCreationTime(hdfsPath);
>         dataFileWriter.setCodec(codecFactory);                            
>         dataFileWriter.create(schema, hdfsOutputStream);
>         LOG.info("Initiate Avro write to {0}", hdfsPath.toString());
>       }                        
>       genericRecord = dataFileStreamReader.next(genericRecord);
>       dataFileWriter.append(genericRecord);
>       // doesn't work at all when we are using streams (i.e. 
> dataFileWriter.create(schema, hdfsOutputStream);) and not directly files 
> (i.e. dataFileWriter.create(schema, new File("..."));)! 
>       dataFileWriter.flush();                        
>       fileManager.updateEntryCount(hdfsPath);
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to