martin-traverse opened a new issue, #794: URL: https://github.com/apache/arrow-java/issues/794
### Describe the enhancement requested Part 4 in the Avro series, following on from #731. This will allow reading and writing whole files in the Avro container format as a series of batches. Each batch will correspond to one Avro file block and fill a single VSR. The VSR can be recycled between batches. Input and output can be to Avro encoder / decoder (set up externally) or to Java's native byte channels (which are set up with default binary encoder / decoder). To cater for async scenarios, the reader API should know how many bytes are required for a block before attempting to read it. I'd like to propose the following API - hopefully this is going in the right direction. I've taken some inspiration from ArrowFilleReader / Writer and Json Reader / Writer, but it's not identical (and they're not identical to each other). If there is a desire to line up on specific naming / conventions then certainly happy to do that, in which case I'll need a steer on exactly how it should be. Otherwise if anyone has radically different ideas of what it should look like, please do share! class AvroFileWriter { // Writer owns a channel / encoder and will close them // VSR and optional dictionaries are not owned and will not be closed // VSR can be recycled or supplied as a stream // Avro encoder configured externally public AvroFileWriter( Encoder encoder, VectorSchemaRoot firstBatch, DictionaryProvider dictionaries) // Sets up a defaulr binary encoder for the channel public AvroFileWriter( WritableByteChannel channel, VectorSchemaRoot firstBatch, DictionaryProvider dictionaries) // Write the Avro header (throws if already written) void writeHeader() // Write the contents of the VSR as an Avro data block // Writes header if required, throws if batch written and not reset void writeBatch() // Reset to currentIndex = 0 // Expect new data in the same VSR (i.e. VSR is recycled) void resetBatch() // Reset vectors in all the producders // Supports a stream of VSRs (i.e. VSR is not recycled) void resetBatch(VectorSchemaRoot batch) // Reset all vectors and supply new dictionaries void resetBatch(VectorSchemaRoot batch, DictionaryProviders dictionaries) // Closes encoder and / or channel // Does not close VSR or dictionary vectors void close() } Now writing data looks like this: void writeAvro(MyApp app) { var root = app.prepareVsr(); var dictionaries = app.prepareDictionaries() try (var writer = new AvroFileWriter(app.openChannel(), root, dictionaries)) { writer.writeHeader(); // Assume recycling, loadBatch() puts fresh data into root while (app.loadBatch()) { writer.resetBatch() writer.writeBatch() } } } And then for the reader: class AvroFileReader implements DictionaryProvider { // Writer owns a channel / decoder and will close them // Schema / VSR / dictionaries are created when header is read // VSR / dictionaries are cleaned up on close // Dictionaries accessible through DictionaryProvider iface // Avro decoder configured externally public AvroFileWriter( Decoder decoder, VectorSchemaRoot firstBatch, DictionaryProvider dictionaries) // Sets up a defaulr binary deocder for the channel // Avro read sequentially so seekable channel not needed public AvroFileWriter( ReadableByteChannel channel, VectorSchemaRoot firstBatch, DictionaryProvider dictionaries) // Read the Avro header and set up schema / VSR / dictionaries void readHeader() // Schema and VSR available after readHeader() Schema getSchema() VectorSchemaRoot getVectorSchemaRoot() // Read the next Avro block and load it into the VSR // Return true if successful, false if EOS // Also false in non-blocking mode if need more data boolean readBatch() // Check for position and size of the next Avro data block // Provides a mechanism for non-blocking / reactive styles boolean hasNextBatch(); long nextBatchPosition(); long nextBatchSize(); // Closes encoder and / or channel // Also closes VSR and dictionary vectors void close() } So reading looks like this: // Blocking style void readAvro(MyApp app) { try (var reader = new AvroFileReader(app.openChannel()) { reader.readHeader(); app.setSchema(reader.getSchema()); app.setVsr(reader.getVectorSchemaRoot()); app.setDictionaries(reader); while (reader.readBatch())) { app.saveBatch(); } } } // Non-blocking stage to process one batch CompletionStage<Boolean> readAvroAsync(AvroFileReader reader) { if (reader.hasNextBatch()) { var start = reader.nextBatchStart(); var end = reader.nextBatchEnd(); return app.ensureBytesAvailable(start, end) .thenApply(x -> { if (reader.readBatch()) { app.saveBatch(); } return reader; }) .thenCompose(this::readAsync); } else { return CompletableFuture.completedFuture(true); } } The non-blocking read is quite important for me as I have a web service that receives bytes in a stream. There is a slight gotcha because we need the first 8 bytes of the next batch before we know its size, but we can implement hasNextBatch() without them and the probably expose the batch padding size as a constant. Compression is probably worth thinking about now - each block is compressed individually so the implementation needs to treat the contents of each block as a separate chunk, that can be fed through a codec. My guess is this is fairly straightforward for codecs that are already available so we might as well include it rather than reworking later. If this looks broadly right I'll make a start on top of #779 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org