martin-traverse opened a new issue, #794:
URL: https://github.com/apache/arrow-java/issues/794

   ### Describe the enhancement requested
   
   Part 4 in the Avro series, following on from #731. This will allow reading 
and writing whole files in the Avro container format as a series of batches. 
Each batch will correspond to one Avro file block and fill a single VSR. The 
VSR can be recycled between batches. Input and output can be to Avro encoder / 
decoder (set up externally) or to Java's native byte channels (which are set up 
with default binary encoder / decoder). To cater for async scenarios, the 
reader API should know how many bytes are required for a block before 
attempting to read it.
   
   I'd like to propose the following API - hopefully this is going in the right 
direction. I've taken some inspiration from ArrowFilleReader / Writer and Json 
Reader / Writer, but it's not identical (and they're not identical to each 
other). If there is a desire to line up on specific naming / conventions then 
certainly happy to do that, in which case I'll need a steer on exactly how it 
should be. Otherwise if anyone has radically different ideas of what it should 
look like, please do share!
   
       class AvroFileWriter {
   
           // Writer owns a channel / encoder and will close them
           // VSR and optional dictionaries are not owned and will not be closed
           // VSR can be recycled or supplied as a stream
   
           // Avro encoder configured externally
           public AvroFileWriter(
               Encoder encoder,
               VectorSchemaRoot firstBatch, 
               DictionaryProvider dictionaries)
   
           // Sets up a defaulr binary encoder for the channel
           public AvroFileWriter(
               WritableByteChannel channel,
               VectorSchemaRoot firstBatch, 
               DictionaryProvider dictionaries)
   
           // Write the Avro header (throws if already written)
           void writeHeader()
   
           // Write the contents of the VSR as an Avro data block
           // Writes header if required, throws if batch written and not reset
           void writeBatch()
   
           // Reset to currentIndex = 0
           // Expect new data in the same VSR (i.e. VSR is recycled)
           void resetBatch()
   
           // Reset vectors in all the producders
           // Supports a stream of VSRs (i.e. VSR is not recycled)
           void resetBatch(VectorSchemaRoot batch)
   
           // Reset all vectors and supply new dictionaries
           void resetBatch(VectorSchemaRoot batch, DictionaryProviders 
dictionaries)
   
           // Closes encoder and / or channel
           // Does not close VSR or dictionary vectors
           void close()
   
       }
     
   Now writing data looks like this:
   
       void writeAvro(MyApp app) {
   
           var root = app.prepareVsr();
           var dictionaries = app.prepareDictionaries()
   
           try (var writer = new AvroFileWriter(app.openChannel(), root, 
dictionaries)) {
   
               writer.writeHeader();
   
               // Assume recycling, loadBatch() puts fresh data into root
               while (app.loadBatch()) {
                   writer.resetBatch()
                   writer.writeBatch()
               }
           }
       }
   
   And then for the reader:
   
       class AvroFileReader implements DictionaryProvider {
   
           // Writer owns a channel / decoder and will close them
           // Schema / VSR / dictionaries are created when header is read
           // VSR / dictionaries are cleaned up on close
           // Dictionaries accessible through DictionaryProvider iface
   
           // Avro decoder configured externally
           public AvroFileWriter(
               Decoder decoder,
               VectorSchemaRoot firstBatch, 
               DictionaryProvider dictionaries)
   
           // Sets up a defaulr binary deocder for the channel
           // Avro read sequentially so seekable channel not needed
           public AvroFileWriter(
               ReadableByteChannel channel,
               VectorSchemaRoot firstBatch, 
               DictionaryProvider dictionaries)
   
           // Read the Avro header and set up schema / VSR / dictionaries
           void readHeader()
   
           // Schema and VSR available after readHeader() 
           Schema getSchema()
           VectorSchemaRoot getVectorSchemaRoot()
   
           // Read the next Avro block and load it into the VSR
           // Return true if successful, false if EOS
           // Also false in non-blocking mode if need more data
           boolean readBatch()
   
           // Check for position and size of the next Avro data block
           // Provides a mechanism for non-blocking / reactive styles
           boolean hasNextBatch();
           long nextBatchPosition();
           long nextBatchSize();
   
           // Closes encoder and / or channel
           // Also closes VSR and dictionary vectors
           void close()
   
       }
   
   So reading looks like this:
   
       // Blocking style
       void readAvro(MyApp app) {
   
           try (var reader = new AvroFileReader(app.openChannel()) {
   
               reader.readHeader();
               
               app.setSchema(reader.getSchema());
               app.setVsr(reader.getVectorSchemaRoot());
               app.setDictionaries(reader);
   
               while (reader.readBatch())) {
                   app.saveBatch();
               }
           }
       }
   
       // Non-blocking stage to process one batch
       CompletionStage<Boolean> readAvroAsync(AvroFileReader reader) {
   
           if (reader.hasNextBatch()) {
   
               var start = reader.nextBatchStart();
               var end = reader.nextBatchEnd();
   
               return app.ensureBytesAvailable(start, end)
                   .thenApply(x -> {
   
                       if (reader.readBatch()) {
                           app.saveBatch();
                       }
   
                       return reader;
                   })
                   .thenCompose(this::readAsync);
           }
           else {
               return CompletableFuture.completedFuture(true);
           }
       }
   
   The non-blocking read is quite important for me as I have a web service that 
receives bytes in a stream. There is a slight gotcha because we need the first 
8 bytes of the next batch before we know its size, but we can implement 
hasNextBatch() without them and the probably expose the batch padding size as a 
constant. 
   
   Compression is probably worth thinking about now - each block is compressed 
individually so the implementation needs to treat the contents of each block as 
a separate chunk, that can be fed through a codec. My guess is this is fairly 
straightforward for codecs that are already available so we might as well 
include it rather than reworking later.
   
   If this looks broadly right I'll make a start on top of #779 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to