[
https://issues.apache.org/jira/browse/AVRO-2916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Arnaud Nauwynck updated AVRO-2916:
----------------------------------
Description:
It is not practical to append records to a remote DataFile (azure blob, aws,
..), not using java.io.File, but in-memory byte array to append to an existing
remote data.
The proposal is simply to add an equivalent method
"DataFileWriter.appendTo(Header, OutputStream)" as follow:
{code:java}
/**
* Open a writer appending to an existing stream.
*
* @param header the header from the existing data to append.
* @param out positioned at the end of the existing file.
*/
public DataFileWriter<D> appendTo(Header header, OutputStream out) throws
IOException {
assertNotOpen();
this.schema = header.schema;
this.sync = header.sync;
this.meta.putAll(header.meta);
byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
if (codecBytes != null) {
String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
this.codec = CodecFactory.fromString(strCodec).createInstance();
} else {
this.codec = CodecFactory.nullCodec().createInstance();
}
init(out);
return this;
}
{code}
in addition to the similar existing method:
{code:java}
public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) throws
IOException {
assertNotOpen();
DataFileReader<D> reader = new DataFileReader<>(in, new
GenericDatumReader<>());
this.schema = reader.getSchema();
this.sync = reader.getHeader().sync;
this.meta.putAll(reader.getHeader().meta);
byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
if (codecBytes != null) {
String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
this.codec = CodecFactory.fromString(strCodec).createInstance();
} else {
this.codec = CodecFactory.nullCodec().createInstance();
}
init(out);
return this;
}
{code}
Technically, we could call "DataFileWriter.appendTo(seekableInput, output)",
but this is both complex and inneficient to pass the "seekableInput" fragment
of an existing local file header.
{code:java}
byte[] inArrayHeader = ... fetch once the header of a remote file...
User userToAppend = ...
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
dataFileWriter.setCodec(CodecFactory.nullCodec());
try (SeekableByteArrayInput in = new
SeekableByteArrayInput(inArrayHeader)) {
dataFileWriter.appendTo(in, out); // ... inneficient: will
reparse header schema,sync,meta each time!
}
dataFileWriter.append(userToAppend);
}
byte[] serializedBytes = out.toByteArray();
// then use serializedBytes to append to remote file (azure blob, aws..)
{code}
Using the new proposed helper method, you could more simply and efficiently
compute datablock to append to a remote data file:
{code:java}
// run once at startup
Header header;
{
SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader);
DataFileReader<Object> reader = new DataFileReader<>(in, new
GenericDatumReader<>());
header = reader.getHeader();
reader.close();
}
// streaming code to append+flush rows to remote data
for(;;) {
User userToAppend = ...
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new
SpecificDatumWriter<User>(User.getClassSchema());
try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
dataFileWriter.setCodec(CodecFactory.nullCodec());
dataFileWriter.appendTo(header, out); // efficient: no reparse
schema,sync,meta
dataFileWriter.append(userToAppend);
}
byte[] serializedBytes = out.toByteArray();
// then use serializedBytes to append to remote file (azure blob, aws..)
... remoteAzureFile.append(.. serializedBytes) .. remoteAzureFile.flush()
}
{code}
was:
It is not practical to append records to a remote DataFile (azure blob, aws,
..), not using java.io.File, but in-memory byte array to append to an existing
remote data.
The proposal is simply to add an equivalent method
"DataFileWriter.appendTo(Header, OutputStream)" as follow:
{code:java}
/**
* Open a writer appending to an existing stream.
*
* @param header the header from the existing data to append.
* @param out positioned at the end of the existing file.
*/
public DataFileWriter<D> appendTo(Header header, OutputStream out) throws
IOException {
assertNotOpen();
this.schema = header.schema;
this.sync = header.sync;
this.meta.putAll(header.meta);
byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
if (codecBytes != null) {
String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
this.codec = CodecFactory.fromString(strCodec).createInstance();
} else {
this.codec = CodecFactory.nullCodec().createInstance();
}
init(out);
return this;
}
{code}
in addition to the similar existing method:
{code:java}
public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) throws
IOException {
assertNotOpen();
DataFileReader<D> reader = new DataFileReader<>(in, new
GenericDatumReader<>());
this.schema = reader.getSchema();
this.sync = reader.getHeader().sync;
this.meta.putAll(reader.getHeader().meta);
byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
if (codecBytes != null) {
String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
this.codec = CodecFactory.fromString(strCodec).createInstance();
} else {
this.codec = CodecFactory.nullCodec().createInstance();
}
init(out);
return this;
}
{code:java}
Technically, we could call "DataFileWriter.appendTo(seekableInput, output)",
but this is both complex and inneficient to pass the "seekableInput" fragment
of an existing local file header.
{code:java}
byte[] inArrayHeader = ... fetch once the header of a remote file...
User userToAppend = ...
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
dataFileWriter.setCodec(CodecFactory.nullCodec());
try (SeekableByteArrayInput in = new
SeekableByteArrayInput(inArrayHeader)) {
dataFileWriter.appendTo(in, out); // ... inneficient: will
reparse header schema,sync,meta each time!
}
dataFileWriter.append(userToAppend);
}
byte[] serializedBytes = out.toByteArray();
// then use serializedBytes to append to remote file (azure blob, aws..)
{code}
Using the new proposed helper method, you could more simply and efficiently
compute datablock to append to a remote data file:
{code:java}
// run once at startup
Header header;
{
SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader);
DataFileReader<Object> reader = new DataFileReader<>(in, new
GenericDatumReader<>());
header = reader.getHeader();
reader.close();
}
// streaming code to append+flush rows to remote data
for(;;) {
User userToAppend = ...
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<User> writer = new
SpecificDatumWriter<User>(User.getClassSchema());
try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
dataFileWriter.setCodec(CodecFactory.nullCodec());
dataFileWriter.appendTo(header, out); // efficient: no reparse
schema,sync,meta
dataFileWriter.append(userToAppend);
}
byte[] serializedBytes = out.toByteArray();
// then use serializedBytes to append to remote file (azure blob, aws..)
... remoteAzureFile.append(.. serializedBytes) .. remoteAzureFile.flush()
}
{code}
> add DataFileWriter.appendTo(Header,OutputStream) to be able to append data to
> non-local file
> --------------------------------------------------------------------------------------------
>
> Key: AVRO-2916
> URL: https://issues.apache.org/jira/browse/AVRO-2916
> Project: Apache Avro
> Issue Type: Improvement
> Components: java
> Affects Versions: 1.11.0
> Reporter: Arnaud Nauwynck
> Priority: Trivial
>
> It is not practical to append records to a remote DataFile (azure blob, aws,
> ..), not using java.io.File, but in-memory byte array to append to an
> existing remote data.
> The proposal is simply to add an equivalent method
> "DataFileWriter.appendTo(Header, OutputStream)" as follow:
> {code:java}
> /**
> * Open a writer appending to an existing stream.
> *
> * @param header the header from the existing data to append.
> * @param out positioned at the end of the existing file.
> */
> public DataFileWriter<D> appendTo(Header header, OutputStream out) throws
> IOException {
> assertNotOpen();
> this.schema = header.schema;
> this.sync = header.sync;
> this.meta.putAll(header.meta);
> byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
> if (codecBytes != null) {
> String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
> this.codec = CodecFactory.fromString(strCodec).createInstance();
> } else {
> this.codec = CodecFactory.nullCodec().createInstance();
> }
> init(out);
> return this;
> }
> {code}
> in addition to the similar existing method:
> {code:java}
> public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out)
> throws IOException {
> assertNotOpen();
> DataFileReader<D> reader = new DataFileReader<>(in, new
> GenericDatumReader<>());
> this.schema = reader.getSchema();
> this.sync = reader.getHeader().sync;
> this.meta.putAll(reader.getHeader().meta);
> byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
> if (codecBytes != null) {
> String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
> this.codec = CodecFactory.fromString(strCodec).createInstance();
> } else {
> this.codec = CodecFactory.nullCodec().createInstance();
> }
> init(out);
> return this;
> }
> {code}
> Technically, we could call "DataFileWriter.appendTo(seekableInput, output)",
> but this is both complex and inneficient to pass the "seekableInput" fragment
> of an existing local file header.
> {code:java}
> byte[] inArrayHeader = ... fetch once the header of a remote file...
> User userToAppend = ...
>
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> DatumWriter<User> writer = new
> SpecificDatumWriter<User>(User.getClassSchema());
> try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
> dataFileWriter.setCodec(CodecFactory.nullCodec());
> try (SeekableByteArrayInput in = new
> SeekableByteArrayInput(inArrayHeader)) {
> dataFileWriter.appendTo(in, out); // ... inneficient: will
> reparse header schema,sync,meta each time!
> }
>
> dataFileWriter.append(userToAppend);
>
> }
> byte[] serializedBytes = out.toByteArray();
> // then use serializedBytes to append to remote file (azure blob, aws..)
> {code}
> Using the new proposed helper method, you could more simply and efficiently
> compute datablock to append to a remote data file:
> {code:java}
> // run once at startup
> Header header;
> {
> SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader);
> DataFileReader<Object> reader = new DataFileReader<>(in, new
> GenericDatumReader<>());
> header = reader.getHeader();
> reader.close();
> }
> // streaming code to append+flush rows to remote data
> for(;;) {
> User userToAppend = ...
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> DatumWriter<User> writer = new
> SpecificDatumWriter<User>(User.getClassSchema());
> try (DataFileWriter<User> dataFileWriter = new
> DataFileWriter<User>(writer)) {
> dataFileWriter.setCodec(CodecFactory.nullCodec());
> dataFileWriter.appendTo(header, out); // efficient: no reparse
> schema,sync,meta
>
> dataFileWriter.append(userToAppend);
> }
> byte[] serializedBytes = out.toByteArray();
> // then use serializedBytes to append to remote file (azure blob, aws..)
> ... remoteAzureFile.append(.. serializedBytes) .. remoteAzureFile.flush()
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)