Arnaud Nauwynck created AVRO-2916:
-------------------------------------

             Summary: add DataFileWriter.appendTo(Header,OutputStream) to be 
able to append data to non-local file
                 Key: AVRO-2916
                 URL: https://issues.apache.org/jira/browse/AVRO-2916
             Project: Apache Avro
          Issue Type: Improvement
          Components: java
    Affects Versions: 1.11.0
            Reporter: Arnaud Nauwynck



It is not practical to append records to a remote DataFile (azure blob, aws, 
..), not using java.io.File, but in-memory byte array to append to an existing 
remote data.


The proposal is simply to add an equivalent method 
"DataFileWriter.appendTo(Header, OutputStream)" as follow:


{code:java}

  /**
   * Open a writer appending to an existing stream.
   *
   * @param header  the header from the existing data to append.
   * @param out positioned at the end of the existing file.
   */
  public DataFileWriter<D> appendTo(Header header, OutputStream out) throws 
IOException {
    assertNotOpen();
    this.schema = header.schema;
    this.sync = header.sync;
    this.meta.putAll(header.meta);
    byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
    if (codecBytes != null) {
      String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
      this.codec = CodecFactory.fromString(strCodec).createInstance();
    } else {
      this.codec = CodecFactory.nullCodec().createInstance();
    }

    init(out);

    return this;
  }
{code} 

Technically, we could call "DataFileWriter.appendTo(seekableInput, output)", 
but this is both complex and inneficient to pass the "seekableInput" fragment 
of an existing local file header.

 {code:java}

byte[] inArrayHeader = ... fetch once the header of a remote file...

User userToAppend = ...
 
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
        dataFileWriter.setCodec(CodecFactory.nullCodec());

        try (SeekableByteArrayInput in = new 
SeekableByteArrayInput(inArrayHeader)) {
              dataFileWriter.appendTo(in, out);  // ... inneficient: will 
reparse header schema,sync,meta each time!
        }
                                
        dataFileWriter.append(userToAppend);                                    
                        
}
byte[] serializedBytes = out.toByteArray();

// then use serializedBytes to append to remote file (azure blob, aws..) 
{code}


Using the new proposed helper method, you could more simply and efficiently 
compute datablock to append to a remote data file:


{code:java}
// run once at startup
Header header;
{
        SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader);
        DataFileReader<Object> reader = new DataFileReader<>(in, new 
GenericDatumReader<>());
        header = reader.getHeader();
        reader.close();
}

// streaming code to append+flush rows to remote data
for(;;) {
  User userToAppend = ...

  ByteArrayOutputStream out = new ByteArrayOutputStream();
  DatumWriter<User> writer = new 
SpecificDatumWriter<User>(User.getClassSchema());
  try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
        dataFileWriter.setCodec(CodecFactory.nullCodec());

        dataFileWriter.appendTo(header, out); // efficient: no reparse 
schema,sync,meta
                                
        dataFileWriter.append(userToAppend);
  }
  byte[] serializedBytes = out.toByteArray();

  // then use serializedBytes to append to remote file (azure blob, aws..) 
  ...  remoteAzureFile.append(.. serializedBytes) .. remoteAzureFile.flush()
}               
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to