DRILL-5385: Vector serializer fails to read saved SV2

Unit testing revealed that the VectorAccessorSerializable class claims
to serialize SV2s, but, in fact, does not. Actually, it writes them,
but does not read them, resulting in corrupted data on read.

Fortunately, no code appears to serialize sv2s at present. Still, it is
a bug and needs to be fixed.

First task is to add serialization code for the sv2.

That revealed that the recently-added code to save DrillBufs using a
shared buffer had a bug: it relied on the writer index to know how much
data is in the buffer. Turns out sv2 buffers don’t set this index. So,
new versions of the write function takes a write length.

Then, closer inspection of the read code revealed duplicated code. So,
DrillBuf allocation moved into a version of the read function that now
does reading and DrillBuf allocation.

Turns out that value vectors, but not SV2s, can be built from a
Drillbuf. Added a matching constructor to the SV2 class.

Finally, cleaned up the code a bit to make it easier to follow. Also
allowed test code to access the handy timer already present in the code.

closes #800


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/35bccd0e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/35bccd0e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/35bccd0e

Branch: refs/heads/master
Commit: 35bccd0e6dcfdde6f9b4ae92734c82a0bd4242ec
Parents: e1bc44c
Author: Paul Rogers <prog...@maprtech.com>
Authored: Sat Mar 25 19:51:43 2017 -0700
Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Committed: Fri May 5 15:43:13 2017 +0300

----------------------------------------------------------------------
 .../drill/exec/cache/CachedVectorContainer.java |  3 -
 .../cache/VectorAccessibleSerializable.java     | 97 ++++++++++----------
 .../exec/record/selection/SelectionVector2.java |  6 ++
 .../apache/drill/exec/memory/BaseAllocator.java | 56 ++++++++---
 .../drill/exec/memory/BufferAllocator.java      | 36 +++++++-
 5 files changed, 127 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/35bccd0e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
index ff6c14b..99d08e6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
@@ -61,10 +61,8 @@ public class CachedVectorContainer extends 
LoopedAbstractDrillSerializable {
     } catch (IOException e) {
       throw new IllegalStateException(e);
     }
-
   }
 
-
   @Override
   public void read(DataInput input) throws IOException {
     int len = input.readInt();
@@ -95,5 +93,4 @@ public class CachedVectorContainer extends 
LoopedAbstractDrillSerializable {
   public byte[] getData() {
     return data;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/35bccd0e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 89876af..9d0182f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,6 +28,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
@@ -42,11 +43,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
- * A wrapper around a VectorAccessible. Will serialize a VectorAccessible and 
write to an OutputStream, or can read
- * from an InputStream and construct a new VectorContainer.
+ * A wrapper around a VectorAccessible. Will serialize a VectorAccessible and
+ * write to an OutputStream, or can read from an InputStream and construct a 
new
+ * VectorContainer.
  */
 public class VectorAccessibleSerializable extends AbstractStreamSerializable {
-//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
   static final MetricRegistry metrics = DrillMetrics.getRegistry();
   static final String WRITER_TIMER = 
MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
 
@@ -56,6 +57,7 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
   private int recordCount = -1;
   private BatchSchema.SelectionVectorMode svMode = 
BatchSchema.SelectionVectorMode.NONE;
   private SelectionVector2 sv2;
+  private long timeNs;
 
   private boolean retain = false;
 
@@ -69,8 +71,9 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
   }
 
   /**
-   * Creates a wrapper around batch and sv2 for writing to a stream. sv2 will 
never be released by this class, and ownership
-   * is maintained by caller.
+   * Creates a wrapper around batch and sv2 for writing to a stream. sv2 will
+   * never be released by this class, and ownership is maintained by caller.
+   *
    * @param batch
    * @param sv2
    * @param allocator
@@ -85,40 +88,48 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
   }
 
   /**
-   * Reads from an InputStream and parses a RecordBatchDef. From this, we 
construct a SelectionVector2 if it exits
-   * and construct the vectors and add them to a vector container
-   * @param input the InputStream to read from
+   * Reads from an InputStream and parses a RecordBatchDef. From this, we
+   * construct a SelectionVector2 if it exits and construct the vectors and add
+   * them to a vector container
+   *
+   * @param input
+   *          the InputStream to read from
    * @throws IOException
    */
-  @SuppressWarnings("resource")
   @Override
   public void readFromStream(InputStream input) throws IOException {
-    final VectorContainer container = new VectorContainer();
     final UserBitShared.RecordBatchDef batchDef = 
UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
     recordCount = batchDef.getRecordCount();
     if (batchDef.hasCarriesTwoByteSelectionVector() && 
batchDef.getCarriesTwoByteSelectionVector()) {
+      readSv2(input);
+    }
+    readVectors(input, batchDef);
+  }
 
-      if (sv2 == null) {
-        sv2 = new SelectionVector2(allocator);
-      }
-      sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
-      sv2.getBuffer().setBytes(0, input, recordCount * 
SelectionVector2.RECORD_SIZE);
-      svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+  private void readSv2(InputStream input) throws IOException {
+    if (sv2 != null) {
+      sv2.clear();
     }
+    final int dataLength = recordCount * SelectionVector2.RECORD_SIZE;
+    svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    @SuppressWarnings("resource")
+    DrillBuf buf = allocator.read(dataLength, input);
+    sv2 = new SelectionVector2(allocator, buf, recordCount);
+    buf.release(); // SV2 now owns the buffer
+  }
+
+  @SuppressWarnings("resource")
+  private void readVectors(InputStream input, RecordBatchDef batchDef) throws 
IOException {
+    final VectorContainer container = new VectorContainer();
     final List<ValueVector> vectorList = Lists.newArrayList();
     final List<SerializedField> fieldList = batchDef.getFieldList();
     for (SerializedField metaData : fieldList) {
       final int dataLength = metaData.getBufferLength();
       final MaterializedField field = MaterializedField.create(metaData);
-      final DrillBuf buf = allocator.buffer(dataLength);
-      final ValueVector vector;
-      try {
-        allocator.read(buf, input, dataLength);
-        vector = TypeHelper.getNewVector(field, allocator);
-        vector.load(metaData, buf);
-      } finally {
-        buf.release();
-      }
+      final DrillBuf buf = allocator.read(dataLength, input);
+      final ValueVector vector = TypeHelper.getNewVector(field, allocator);
+      vector.load(metaData, buf);
+      buf.release(); // Vector now owns the buffer
       vectorList.add(vector);
     }
     container.addCollection(vectorList);
@@ -146,36 +157,24 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
     final DrillBuf[] incomingBuffers = batch.getBuffers();
     final UserBitShared.RecordBatchDef batchDef = batch.getDef();
 
-    /* DrillBuf associated with the selection vector */
-    DrillBuf svBuf = null;
-    Integer svCount =  null;
-
-    if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE) {
-      svCount = sv2.getCount();
-      svBuf = sv2.getBuffer(); //this calls retain() internally
-    }
-
     try {
       /* Write the metadata to the file */
       batchDef.writeDelimitedTo(output);
 
       /* If we have a selection vector, dump it to file first */
-      if (svBuf != null) {
-        allocator.write(svBuf, output);
-        sv2.setBuffer(svBuf);
-        svBuf.release(); // sv2 now owns the buffer
-        sv2.setRecordCount(svCount);
+      if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE) {
+        recordCount = sv2.getCount();
+        final int dataLength = recordCount * SelectionVector2.RECORD_SIZE;
+        allocator.write(sv2.getBuffer(false), dataLength, output);
       }
 
       /* Dump the array of ByteBuf's associated with the value vectors */
       for (DrillBuf buf : incomingBuffers) {
-                /* dump the buffer into the OutputStream */
+        /* dump the buffer into the OutputStream */
         allocator.write(buf, output);
       }
 
-      output.flush();
-
-      timerContext.stop();
+      timeNs += timerContext.stop();
     } catch (IOException e) {
       throw new RuntimeException(e);
     } finally {
@@ -192,11 +191,9 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
     }
   }
 
-  public VectorContainer get() {
-    return va;
-  }
+  public VectorContainer get() { return va; }
 
-  public SelectionVector2 getSv2() {
-    return sv2;
-  }
+  public SelectionVector2 getSv2() { return sv2; }
+
+  public long getTimeNs() { return timeNs; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/35bccd0e/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 1a31625..a38a7fe 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -39,6 +39,12 @@ public class SelectionVector2 implements AutoCloseable {
     this.allocator = allocator;
   }
 
+  public SelectionVector2(BufferAllocator allocator, DrillBuf buf, int count) {
+    this.allocator = allocator;
+    buffer = buf;
+    recordCount = count;
+  }
+
   public int getCount() {
     return recordCount;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/35bccd0e/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index ba47998..d872d67 100644
--- 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -68,6 +68,17 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
   private final IdentityHashMap<Reservation, Object> reservations;
   private final HistoricalLog historicalLog;
 
+  /**
+   * Disk I/O buffer used for all reads and writes of DrillBufs.
+   * The buffer is allocated when first needed, then reused by all
+   * subsequent I/O operations for the same operator. Since very few
+   * operators do I/O, the number of allocated buffers should be
+   * low. Better would be to hold the buffer at the fragment level
+   * since all operators within a fragment run within a single thread.
+   */
+
+  private byte ioBuffer[];
+
   protected BaseAllocator(
       final BaseAllocator parentAllocator,
       final String name,
@@ -350,6 +361,9 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
         return;
       }
 
+      if (ioBuffer != null) {
+        ioBuffer = null;
+      }
       if (DEBUG) {
         if (!isClosed()) {
           final Object object;
@@ -513,12 +527,8 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
 
     if (DEBUG) {
       historicalLog.recordEvent("closed");
-      logger.debug(String.format(
-          "closed allocator[%s].",
-          name));
+      logger.debug(String.format("closed allocator[%s].", name));
     }
-
-
   }
 
   @Override
@@ -793,20 +803,20 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
     return DEBUG;
   }
 
-  /**
-   * Disk I/O buffer used for all reads and writes of DrillBufs.
-   */
-
-  private byte ioBuffer[];
-
   public byte[] getIOBuffer() {
     if (ioBuffer == null) {
+      // Length chosen to the smallest size that maximizes
+      // disk I/O performance. Smaller sizes slow I/O. Larger
+      // sizes provide no increase in performance.
+      // Revisit from time to time.
+
       ioBuffer = new byte[32*1024];
     }
     return ioBuffer;
   }
 
-  public void read(DrillBuf buf, InputStream in, int length) throws 
IOException {
+  @Override
+  public void read(DrillBuf buf, int length, InputStream in) throws 
IOException {
     buf.clear();
 
     byte[] buffer = getIOBuffer();
@@ -817,11 +827,27 @@ public abstract class BaseAllocator extends Accountant 
implements BufferAllocato
     }
   }
 
+  public DrillBuf read(int length, InputStream in) throws IOException {
+    DrillBuf buf = buffer(length);
+    try {
+      read(buf, length, in);
+      return buf;
+    } catch (IOException e) {
+      buf.release();
+      throw e;
+    }
+  }
+
+  @Override
   public void write(DrillBuf buf, OutputStream out) throws IOException {
+    write(buf, buf.readableBytes(), out);
+  }
+
+  @Override
+  public void write(DrillBuf buf, int length, OutputStream out) throws 
IOException {
     byte[] buffer = getIOBuffer();
-    int bufLength = buf.readableBytes();
-    for (int posn = 0; posn < bufLength; posn += buffer.length) {
-      int len = Math.min(buffer.length, bufLength - posn);
+    for (int posn = 0; posn < length; posn += buffer.length) {
+      int len = Math.min(buffer.length, length - posn);
       buf.getBytes(posn, buffer, 0, len);
       out.write(buffer, 0, len);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/35bccd0e/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 3c5f57d..bdf3073 100644
--- 
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ 
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -160,6 +160,9 @@ public interface BufferAllocator extends AutoCloseable {
    * Write the contents of a DrillBuf to a stream. Use this method, rather
    * than calling the DrillBuf.getBytes() method, because this method
    * avoids repeated heap allocation for the intermediate heap buffer.
+   * Uses the reader and writer indexes to determine
+   * the number of bytes to write. Useful only for bufs created using
+   * those indexes.
    *
    * @param buf the Drillbuf to write
    * @param output the output stream
@@ -169,15 +172,42 @@ public interface BufferAllocator extends AutoCloseable {
   public void write(DrillBuf buf, OutputStream out) throws IOException;
 
   /**
+   * Write the contents of a DrillBuf to a stream. Use this method, rather
+   * than calling the DrillBuf.getBytes() method, because this method
+   * avoids repeated heap allocation for the intermediate heap buffer.
+   * Writes the specified number of bytes starting from the head of the
+   * given Drillbuf.
+   *
+   * @param buf the Drillbuf to write
+   * @param length the number of bytes to read. Must be less than or
+   * equal to number of bytes allocated in the buffer.
+   * @param out the output stream
+   * @throws IOException if a write error occurs
+   */
+
+  public void write(DrillBuf buf, int length, OutputStream out) throws 
IOException;
+
+  /**
    * Read the contents of a DrillBuf from a stream. Use this method, rather
    * than calling the DrillBuf.writeBytes() method, because this method
    * avoids repeated heap allocation for the intermediate heap buffer.
+   * The buffer must have already been allocated.
    *
    * @param buf the buffer to read with space already allocated
-   * @param input input stream from which to read data
-   * @param bufLength number of bytes to read
+   * @param length number of bytes to read
+   * @param in input stream from which to read data
+   * @throws IOException if a read error occurs
+   */
+
+  public void read(DrillBuf buf, int length, InputStream in) throws 
IOException;
+
+  /**
+   * Reads the specified number of bytes into a new Drillbuf.
+   * @param length number of bytes to read
+   * @param in input stream from which to read data
+   * @return the buffer holding the data read from the stream
    * @throws IOException if a read error occurs
    */
 
-  public void read(DrillBuf buf, InputStream in, int length) throws 
IOException;
+  public DrillBuf read(int length, InputStream in) throws IOException;
 }

Reply via email to