[ 
https://issues.apache.org/jira/browse/AVRO-2916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17731593#comment-17731593
 ] 

Christophe Le Saec commented on AVRO-2916:
------------------------------------------

For azure, couldn't you use "CloudBlockBlob.openOutputStream()" method to get 
direct output stream instead of using an intermediary byteArrayOutputStream ?
 
{code:java}
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer);
    OutputStream out = blob.openOutputStream()) {
  dataFileWriter.setCodec(CodecFactory.nullCodec());
  dataFileWriter.appendTo(in, out); // call only once

  for(;;) {
    User userToAppend = ...
    dataFileWriter.append(userToAppend);
  }
}
{code}
        
 

> add DataFileWriter.appendTo(Header,OutputStream) to be able to append data to 
> non-local file
> --------------------------------------------------------------------------------------------
>
>                 Key: AVRO-2916
>                 URL: https://issues.apache.org/jira/browse/AVRO-2916
>             Project: Apache Avro
>          Issue Type: Improvement
>          Components: java
>    Affects Versions: 1.11.0
>            Reporter: Arnaud Nauwynck
>            Priority: Trivial
>              Labels: pull-request-available
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> It is not practical to append records to a remote DataFile (azure blob, aws, 
> ..), not using java.io.File, but in-memory byte array to append to an 
> existing remote data.
> The proposal is simply to add an equivalent method 
> "DataFileWriter.appendTo(Header, OutputStream)" as follow:
> {code:java}
>   /**
>    * Open a writer appending to an existing stream.
>    *
>    * @param header  the header from the existing data to append.
>    * @param out positioned at the end of the existing file.
>    */
>   public DataFileWriter<D> appendTo(Header header, OutputStream out) throws 
> IOException {
>     assertNotOpen();
>     this.schema = header.schema;
>     this.sync = header.sync;
>     this.meta.putAll(header.meta);
>     byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
>     if (codecBytes != null) {
>       String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
>       this.codec = CodecFactory.fromString(strCodec).createInstance();
>     } else {
>       this.codec = CodecFactory.nullCodec().createInstance();
>     }
>     init(out);
>     return this;
>   }
> {code} 
> in addition to the similar existing method:
>  {code:java}
>   public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) 
> throws IOException {
>     assertNotOpen();
>     DataFileReader<D> reader = new DataFileReader<>(in, new 
> GenericDatumReader<>());
>     this.schema = reader.getSchema();
>     this.sync = reader.getHeader().sync;
>     this.meta.putAll(reader.getHeader().meta);
>     byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
>     if (codecBytes != null) {
>       String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
>       this.codec = CodecFactory.fromString(strCodec).createInstance();
>     } else {
>       this.codec = CodecFactory.nullCodec().createInstance();
>     }
>     init(out);
>     return this;
>   }
>  {code}
> Technically, we could call "DataFileWriter.appendTo(seekableInput, output)", 
> but this is both complex and inneficient to pass the "seekableInput" fragment 
> of an existing local file header.
>  {code:java}
> byte[] inArrayHeader = ... fetch once the header of a remote file...
> User userToAppend = ...
>  
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> DatumWriter<User> writer = new 
> SpecificDatumWriter<User>(User.getClassSchema());
> try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
>       dataFileWriter.setCodec(CodecFactory.nullCodec());
>       try (SeekableByteArrayInput in = new 
> SeekableByteArrayInput(inArrayHeader)) {
>             dataFileWriter.appendTo(in, out);  // ... inneficient: will 
> reparse header schema,sync,meta each time!
>       }
>                               
>       dataFileWriter.append(userToAppend);                                    
>                         
> }
> byte[] serializedBytes = out.toByteArray();
> // then use serializedBytes to append to remote file (azure blob, aws..) 
> {code}
> Using the new proposed helper method, you could more simply and efficiently 
> compute datablock to append to a remote data file:
> {code:java}
> // run once at startup
> Header header;
> {
>       SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader);
>       DataFileReader<Object> reader = new DataFileReader<>(in, new 
> GenericDatumReader<>());
>       header = reader.getHeader();
>       reader.close();
> }
> // streaming code to append+flush rows to remote data
> for(;;) {
>   User userToAppend = ...
>   ByteArrayOutputStream out = new ByteArrayOutputStream();
>   DatumWriter<User> writer = new 
> SpecificDatumWriter<User>(User.getClassSchema());
>   try (DataFileWriter<User> dataFileWriter = new 
> DataFileWriter<User>(writer)) {
>       dataFileWriter.setCodec(CodecFactory.nullCodec());
>       dataFileWriter.appendTo(header, out); // efficient: no reparse 
> schema,sync,meta
>                               
>       dataFileWriter.append(userToAppend);
>   }
>   byte[] serializedBytes = out.toByteArray();
>   // then use serializedBytes to append to remote file (azure blob, aws..) 
>   ...  remoteAzureFile.append(.. serializedBytes) .. remoteAzureFile.flush()
> }             
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to