[ 
https://issues.apache.org/jira/browse/AVRO-2916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Nauwynck updated AVRO-2916:
----------------------------------
    Description: 
It is not practical to append records to a remote DataFile (azure blob, aws, 
..), not using java.io.File, but in-memory byte array to append to an existing 
remote data.


The proposal is simply to add an equivalent method 
"DataFileWriter.appendTo(Header, OutputStream)" as follow:


{code:java}

  /**
   * Open a writer appending to an existing stream.
   *
   * @param header  the header from the existing data to append.
   * @param out positioned at the end of the existing file.
   */
  public DataFileWriter<D> appendTo(Header header, OutputStream out) throws 
IOException {
    assertNotOpen();
    this.schema = header.schema;
    this.sync = header.sync;
    this.meta.putAll(header.meta);
    byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
    if (codecBytes != null) {
      String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
      this.codec = CodecFactory.fromString(strCodec).createInstance();
    } else {
      this.codec = CodecFactory.nullCodec().createInstance();
    }

    init(out);

    return this;
  }
{code} 

in addition to the similar existing method:

 {code:java}
  public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) throws 
IOException {
    assertNotOpen();
    DataFileReader<D> reader = new DataFileReader<>(in, new 
GenericDatumReader<>());
    this.schema = reader.getSchema();
    this.sync = reader.getHeader().sync;
    this.meta.putAll(reader.getHeader().meta);
    byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
    if (codecBytes != null) {
      String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
      this.codec = CodecFactory.fromString(strCodec).createInstance();
    } else {
      this.codec = CodecFactory.nullCodec().createInstance();
    }

    init(out);

    return this;
  }
 {code}

Technically, we could call "DataFileWriter.appendTo(seekableInput, output)", 
but this is both complex and inneficient to pass the "seekableInput" fragment 
of an existing local file header.

 {code:java}

byte[] inArrayHeader = ... fetch once the header of a remote file...

User userToAppend = ...
 
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
        dataFileWriter.setCodec(CodecFactory.nullCodec());

        try (SeekableByteArrayInput in = new 
SeekableByteArrayInput(inArrayHeader)) {
              dataFileWriter.appendTo(in, out);  // ... inneficient: will 
reparse header schema,sync,meta each time!
        }
                                
        dataFileWriter.append(userToAppend);                                    
                        
}
byte[] serializedBytes = out.toByteArray();

// then use serializedBytes to append to remote file (azure blob, aws..) 
{code}


Using the new proposed helper method, you could more simply and efficiently 
compute datablock to append to a remote data file:


{code:java}
// run once at startup
Header header;
{
        SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader);
        DataFileReader<Object> reader = new DataFileReader<>(in, new 
GenericDatumReader<>());
        header = reader.getHeader();
        reader.close();
}

// streaming code to append+flush rows to remote data
for(;;) {
  User userToAppend = ...

  ByteArrayOutputStream out = new ByteArrayOutputStream();
  DatumWriter<User> writer = new 
SpecificDatumWriter<User>(User.getClassSchema());
  try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
        dataFileWriter.setCodec(CodecFactory.nullCodec());

        dataFileWriter.appendTo(header, out); // efficient: no reparse 
schema,sync,meta
                                
        dataFileWriter.append(userToAppend);
  }
  byte[] serializedBytes = out.toByteArray();

  // then use serializedBytes to append to remote file (azure blob, aws..) 
  ...  remoteAzureFile.append(.. serializedBytes) .. remoteAzureFile.flush()
}               
{code}



  was:
It is not practical to append records to a remote DataFile (azure blob, aws, 
..), not using java.io.File, but in-memory byte array to append to an existing 
remote data.


The proposal is simply to add an equivalent method 
"DataFileWriter.appendTo(Header, OutputStream)" as follow:


{code:java}

  /**
   * Open a writer appending to an existing stream.
   *
   * @param header  the header from the existing data to append.
   * @param out positioned at the end of the existing file.
   */
  public DataFileWriter<D> appendTo(Header header, OutputStream out) throws 
IOException {
    assertNotOpen();
    this.schema = header.schema;
    this.sync = header.sync;
    this.meta.putAll(header.meta);
    byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
    if (codecBytes != null) {
      String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
      this.codec = CodecFactory.fromString(strCodec).createInstance();
    } else {
      this.codec = CodecFactory.nullCodec().createInstance();
    }

    init(out);

    return this;
  }
{code} 

in addition to the similar existing method:

 {code:java}
  public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) throws 
IOException {
    assertNotOpen();
    DataFileReader<D> reader = new DataFileReader<>(in, new 
GenericDatumReader<>());
    this.schema = reader.getSchema();
    this.sync = reader.getHeader().sync;
    this.meta.putAll(reader.getHeader().meta);
    byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
    if (codecBytes != null) {
      String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
      this.codec = CodecFactory.fromString(strCodec).createInstance();
    } else {
      this.codec = CodecFactory.nullCodec().createInstance();
    }

    init(out);

    return this;
  }
 {code:java}

Technically, we could call "DataFileWriter.appendTo(seekableInput, output)", 
but this is both complex and inneficient to pass the "seekableInput" fragment 
of an existing local file header.

 {code:java}

byte[] inArrayHeader = ... fetch once the header of a remote file...

User userToAppend = ...
 
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
        dataFileWriter.setCodec(CodecFactory.nullCodec());

        try (SeekableByteArrayInput in = new 
SeekableByteArrayInput(inArrayHeader)) {
              dataFileWriter.appendTo(in, out);  // ... inneficient: will 
reparse header schema,sync,meta each time!
        }
                                
        dataFileWriter.append(userToAppend);                                    
                        
}
byte[] serializedBytes = out.toByteArray();

// then use serializedBytes to append to remote file (azure blob, aws..) 
{code}


Using the new proposed helper method, you could more simply and efficiently 
compute datablock to append to a remote data file:


{code:java}
// run once at startup
Header header;
{
        SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader);
        DataFileReader<Object> reader = new DataFileReader<>(in, new 
GenericDatumReader<>());
        header = reader.getHeader();
        reader.close();
}

// streaming code to append+flush rows to remote data
for(;;) {
  User userToAppend = ...

  ByteArrayOutputStream out = new ByteArrayOutputStream();
  DatumWriter<User> writer = new 
SpecificDatumWriter<User>(User.getClassSchema());
  try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
        dataFileWriter.setCodec(CodecFactory.nullCodec());

        dataFileWriter.appendTo(header, out); // efficient: no reparse 
schema,sync,meta
                                
        dataFileWriter.append(userToAppend);
  }
  byte[] serializedBytes = out.toByteArray();

  // then use serializedBytes to append to remote file (azure blob, aws..) 
  ...  remoteAzureFile.append(.. serializedBytes) .. remoteAzureFile.flush()
}               
{code}




> add DataFileWriter.appendTo(Header,OutputStream) to be able to append data to 
> non-local file
> --------------------------------------------------------------------------------------------
>
>                 Key: AVRO-2916
>                 URL: https://issues.apache.org/jira/browse/AVRO-2916
>             Project: Apache Avro
>          Issue Type: Improvement
>          Components: java
>    Affects Versions: 1.11.0
>            Reporter: Arnaud Nauwynck
>            Priority: Trivial
>
> It is not practical to append records to a remote DataFile (azure blob, aws, 
> ..), not using java.io.File, but in-memory byte array to append to an 
> existing remote data.
> The proposal is simply to add an equivalent method 
> "DataFileWriter.appendTo(Header, OutputStream)" as follow:
> {code:java}
>   /**
>    * Open a writer appending to an existing stream.
>    *
>    * @param header  the header from the existing data to append.
>    * @param out positioned at the end of the existing file.
>    */
>   public DataFileWriter<D> appendTo(Header header, OutputStream out) throws 
> IOException {
>     assertNotOpen();
>     this.schema = header.schema;
>     this.sync = header.sync;
>     this.meta.putAll(header.meta);
>     byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
>     if (codecBytes != null) {
>       String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
>       this.codec = CodecFactory.fromString(strCodec).createInstance();
>     } else {
>       this.codec = CodecFactory.nullCodec().createInstance();
>     }
>     init(out);
>     return this;
>   }
> {code} 
> in addition to the similar existing method:
>  {code:java}
>   public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) 
> throws IOException {
>     assertNotOpen();
>     DataFileReader<D> reader = new DataFileReader<>(in, new 
> GenericDatumReader<>());
>     this.schema = reader.getSchema();
>     this.sync = reader.getHeader().sync;
>     this.meta.putAll(reader.getHeader().meta);
>     byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
>     if (codecBytes != null) {
>       String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
>       this.codec = CodecFactory.fromString(strCodec).createInstance();
>     } else {
>       this.codec = CodecFactory.nullCodec().createInstance();
>     }
>     init(out);
>     return this;
>   }
>  {code}
> Technically, we could call "DataFileWriter.appendTo(seekableInput, output)", 
> but this is both complex and inneficient to pass the "seekableInput" fragment 
> of an existing local file header.
>  {code:java}
> byte[] inArrayHeader = ... fetch once the header of a remote file...
> User userToAppend = ...
>  
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> DatumWriter<User> writer = new 
> SpecificDatumWriter<User>(User.getClassSchema());
> try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
>       dataFileWriter.setCodec(CodecFactory.nullCodec());
>       try (SeekableByteArrayInput in = new 
> SeekableByteArrayInput(inArrayHeader)) {
>             dataFileWriter.appendTo(in, out);  // ... inneficient: will 
> reparse header schema,sync,meta each time!
>       }
>                               
>       dataFileWriter.append(userToAppend);                                    
>                         
> }
> byte[] serializedBytes = out.toByteArray();
> // then use serializedBytes to append to remote file (azure blob, aws..) 
> {code}
> Using the new proposed helper method, you could more simply and efficiently 
> compute datablock to append to a remote data file:
> {code:java}
> // run once at startup
> Header header;
> {
>       SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader);
>       DataFileReader<Object> reader = new DataFileReader<>(in, new 
> GenericDatumReader<>());
>       header = reader.getHeader();
>       reader.close();
> }
> // streaming code to append+flush rows to remote data
> for(;;) {
>   User userToAppend = ...
>   ByteArrayOutputStream out = new ByteArrayOutputStream();
>   DatumWriter<User> writer = new 
> SpecificDatumWriter<User>(User.getClassSchema());
>   try (DataFileWriter<User> dataFileWriter = new 
> DataFileWriter<User>(writer)) {
>       dataFileWriter.setCodec(CodecFactory.nullCodec());
>       dataFileWriter.appendTo(header, out); // efficient: no reparse 
> schema,sync,meta
>                               
>       dataFileWriter.append(userToAppend);
>   }
>   byte[] serializedBytes = out.toByteArray();
>   // then use serializedBytes to append to remote file (azure blob, aws..) 
>   ...  remoteAzureFile.append(.. serializedBytes) .. remoteAzureFile.flush()
> }             
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to