Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154499154
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java 
---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws 
IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws 
IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws 
IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean 
retain) throws IOException {
    +      checkNotNull(batch);
    +      checkNotNull(channel);
    +      final Timer.Context timerContext = 
metrics.timer(WRITER_TIMER).time();
    +
    +      final DrillBuf[] incomingBuffers = batch.getBuffers();
    +      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
    +
    +      try {
    +        /* Write the metadata to the file */
    +        batchDef.writeDelimitedTo(output);
    +
    +
    +        /* If we have a selection vector, dump it to file first */
    +        if (sv2 != null) {
    +          final int dataLength = sv2.getCount() * 
SelectionVector2.RECORD_SIZE;
    +          channel.write(sv2.getBuffer(false).nioBuffer(0, dataLength));
    +        }
    +
    +        /* Dump the array of ByteBuf's associated with the value vectors */
    +        for (DrillBuf buf : incomingBuffers) {
    +          /* dump the buffer into the OutputStream */
    +          channel.write(buf.nioBuffer());
    --- End diff --
    
    I agree that we should only write necessary payload and avoid spilling 
unused buffers. Note that `channel.write()` writes only bytes written to a 
`ByteBuffer` (`ByteBuffer.remaining()`) and not the whole allocated buffer. Do 
you mean that there are bytes written to a buffer but they should not be 
spilled? In this case, I'd suggest limiting the scope of this PR to using 
`WritableByteChannel` to avoid memory copy from off-heap during spill to local 
files and handling extra bytes in a separate JIRA/PR.


---

Reply via email to