[
https://issues.apache.org/jira/browse/AVRO-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730633#comment-17730633
]
Victor edited comment on AVRO-3768 at 6/9/23 6:55 AM:
------------------------------------------------------
Hi Christophe, thank you for looking into my use case :)
I think it is a bit simpler than this, I will directly share my exact need, I
may have abstracted it a bit too much in the ticket description.
I have a system that can receive avro records via http and that deserialize
them using the DataFileStream available from the avro library to put them in a
db. This is all working perfectly well.
Now I want to write a test to verify it's working as expected without exploding
the memory and it is as fast as I want it, so in my test, I want to be able to
write something like this:
{code:java}
Schema recordSchema =
SchemaBuilder.record("Row").fields().name("value").type().intType().endRecord();
// this could be anything we want (e.g., a Stream, an Iterator, or maybe an
avro-provided interface),
// the only important thing is that it generates record one by one and not
everything at once because there are a lot of them
Iterator<GenericData.Record> iterator = Stream.generate(() -> {
var record = new GenericData.Record(recordSchema);
record.put("value", Random.nextInt(100));
return record;
}).limit(10_000_000).iterator();
httpClient.post(URL, new AvroInputStream(iterator));
{code}
Basically, I would like to have AvroInputStream that when its read() is called,
it will take a record, serialize it in a buffer in memory and return the bytes
and so on until there is no more record to read.
Right now, I am forced to write this:
{code:java}
var pipedOutput = new PipedOutputStream();
var pipedInput = new PipedInputStream(pipedOutput);
new Thread(() -> {
try (var w = new DataFileWriter<>(new
GenericDatumWriter<>(recordSchema)).create(recordSchema, pipedOutput)) {
while (iterator.hasNext()) {
it.append(rows.next())
}
}
}).start();
httpClient.post(URL, pipedInput);{code}
Which is a bit more complex in terms of code and also is using more resources
than needed because theoretically we don't really need a separate thread like
this.
I hope this makes sense :)
was (Author: victor.noel):
Hi Christophe, thank you for looking into my use case :)
I think it is a bit simpler than this, I will directly share my exact need, I
may have abstracted it a bit too much in the ticket description.
I have a system that can receive avro records via http and that deserialize
them using the DataFileStream available from the avro library to put them in a
db. This is all working perfectly well.
Now I want to write a test to verify it's working as expected without exploding
the memory and it is as fast as I want it, so in my test, I want to be able to
write something like this:
{code:java}
Schema recordSchema =
SchemaBuilder.record("Row").fields().name("value").type().intType().endRecord();
// this could be anything we want (e.g., a Stream, an Iterator, or maybe an
avro-provided interface),
// the only important thing is that it generates record one by one and not
everything at once because there are a lot of them
Iterator<GenericData.Record> iterator = Stream.generate(() -> {
var record = new GenericData.Record(recordSchema);
record.put("value", Random.nextInt(100));
return record;
}).limit(10_000_000).iterator();
httpClient.post(URL, new AvroInputStream(iterator));
{code}
Basically, I would like to have AvroInputStream that when its read() is called,
it will take a record, serialize it in a buffer in memory and send it and so on
until there is no more record to read.
Right now, I am forced to write this:
{code:java}
var pipedOutput = new PipedOutputStream();
var pipedInput = new PipedInputStream(pipedOutput);
new Thread(() -> {
try (var w = new DataFileWriter<>(new
GenericDatumWriter<>(recordSchema)).create(recordSchema, pipedOutput)) {
while (iterator.hasNext()) {
it.append(rows.next())
}
}
}).start();
httpClient.post(URL, pipedInput);{code}
Which is a bit more complex in terms of code and also is using more resources
than needed because theoretically we don't really need a separate thread like
this.
I hope this makes sense :)
> Provide InputStream implementation that wraps an Iterator of Record
> -------------------------------------------------------------------
>
> Key: AVRO-3768
> URL: https://issues.apache.org/jira/browse/AVRO-3768
> Project: Apache Avro
> Issue Type: New Feature
> Components: java
> Reporter: Victor
> Priority: Major
>
> Hello,
> I have some code that generates avro record (using GenericData.Record
> precisely) record per record (I used an Iterator<GenericData.Record> in
> practice, but we could imagine anything similar including an actual
> java.util.Stream or even an avro-provided interface), and I would like to
> serialized it to some external system (an http request in my particular
> case). So basically the data is generated as it is pulled.
> Right now, the only option I found is to use a combination of
> java.io.PipedInputStream and PipedOutputStream, wrapping the later inside a
> DataFileWriter and then feeding the records to the writer (in a separate
> thread) so that the PipedInputStream can be read by anything else.
> As you can see this is a bit cumbersome and a more straightforward approach
> would be welcomed. I tried to implement this myself but I admit I got lost in
> all the moving pieces and I couldn't find a simple way of doing that without
> implementing a lot of low-level stuff. So maybe I'm missing something or we
> could add something to avro :)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)