Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154477089
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java 
---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws 
IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws 
IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws 
IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean 
retain) throws IOException {
    +      checkNotNull(batch);
    +      checkNotNull(channel);
    +      final Timer.Context timerContext = 
metrics.timer(WRITER_TIMER).time();
    +
    +      final DrillBuf[] incomingBuffers = batch.getBuffers();
    +      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
    +
    +      try {
    +        /* Write the metadata to the file */
    +        batchDef.writeDelimitedTo(output);
    +
    +
    +        /* If we have a selection vector, dump it to file first */
    +        if (sv2 != null) {
    +          final int dataLength = sv2.getCount() * 
SelectionVector2.RECORD_SIZE;
    +          channel.write(sv2.getBuffer(false).nioBuffer(0, dataLength));
    +        }
    +
    +        /* Dump the array of ByteBuf's associated with the value vectors */
    +        for (DrillBuf buf : incomingBuffers) {
    +          /* dump the buffer into the OutputStream */
    +          channel.write(buf.nioBuffer());
    +        }
    +
    +        timeNs += timerContext.stop();
    +      } catch (IOException e) {
    +        throw new RuntimeException(e);
    +      } finally {
    +        if (!retain) {
    +          batch.clear();
    +          if (sv2 != null) {
    +            sv2.clear();
    +          }
    +        }
           }
    -      timeNs += vas.getTimeNs();
           return this;
         }
     
    +    public long close() throws IOException {
    +      if (!channel.isOpen()) {
    +        return 0;
    +      }
    +      long writeSize = spillSet.getPosition(channel);
    +      spillSet.tallyWriteBytes(writeSize);
    --- End diff --
    
    This seems awkward. Before, this class was a utility that wrapped an 
existing write mechanism. Here, this class basically becomes the writer itself 
-- a fancy file writer that manages its own channel. That seems like a good 
evolution.
    
    But, it seems that updating spill set metrics is not needed here. Instead, 
someone will call this close method. That method can first update the spill set 
metrics based on the total write bytes for this stream. That is, this writer 
can do the writing, manage the channel, and accumulate its own write bytes. The 
caller decides what to do with that info (add it to the spill set's metrics.)


---

Reply via email to