Repository: incubator-drill
Updated Branches:
  refs/heads/master 8490d7433 -> 3db1d5a32


Allow disabling of memory leak query termination using 
-Ddrill.exec.debug.error_on_leak=false


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/65b36e83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/65b36e83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/65b36e83

Branch: refs/heads/master
Commit: 65b36e83168507e9bd2ee62320deef08f6fb585c
Parents: 8490d74
Author: Jacques Nadeau <[email protected]>
Authored: Tue Jun 3 17:02:38 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Tue Jun 3 17:04:41 2014 -0700

----------------------------------------------------------------------
 .../templates/StringOutputRecordWriter.java     |  9 ++-
 .../org/apache/drill/exec/ExecConstants.java    |  3 +-
 .../drill/exec/cache/local/LocalCache.java      |  2 +-
 .../apache/drill/exec/client/DrillClient.java   |  2 +-
 .../exec/client/PrintingResultsListener.java    |  6 +-
 .../drill/exec/client/QuerySubmitter.java       |  2 +-
 .../org/apache/drill/exec/memory/Accountor.java | 19 ++++--
 .../drill/exec/memory/AtomicRemainder.java      | 15 +++--
 .../drill/exec/memory/TopLevelAllocator.java    | 65 ++++++++++++--------
 .../drill/exec/physical/impl/ScanBatch.java     | 27 ++++----
 .../drill/exec/server/BootStrapContext.java     |  8 +--
 .../exec/store/easy/text/TextFormatPlugin.java  |  2 +-
 .../exec/store/text/DrillTextRecordWriter.java  | 18 ++++--
 .../src/main/resources/drill-module.conf        |  3 +-
 .../apache/drill/jdbc/DrillConnectionImpl.java  |  7 ++-
 15 files changed, 118 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java 
b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
index 506cace..7357246 100644
--- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.vector.*;
 
@@ -45,7 +46,11 @@ import java.util.Map;
 public abstract class StringOutputRecordWriter implements RecordWriter {
 
   private ValueVector[] columnVectors;
-
+  private final BufferAllocator allocator;
+  protected StringOutputRecordWriter(BufferAllocator allocator){
+    this.allocator = allocator;
+  }
+  
   public void updateSchema(BatchSchema schema) throws IOException {
     columnVectors = new ValueVector[schema.getFieldCount()];
 
@@ -57,7 +62,7 @@ public abstract class StringOutputRecordWriter implements 
RecordWriter {
     startNewSchema(columnNames);
 
     for (int i=0; i<columnVectors.length; i++) {
-      columnVectors[i] = TypeHelper.getNewVector(schema.getColumn(i), new 
TopLevelAllocator());
+      columnVectors[i] = TypeHelper.getNewVector(schema.getColumn(i), 
allocator);
       AllocationHelper.allocate(columnVectors[i], 1, 
TypeHelper.getSize(schema.getColumn(i).getType()));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index e66e93c..1ece198 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -74,6 +74,5 @@ public interface ExecConstants {
   public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new 
LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024);
   public static final String HTTP_ENABLE = "drill.exec.http.enabled";
   public static final String HTTP_PORT = "drill.exec.http.port";
-
-
+  public static final String ERROR_ON_MEMORY_LEAK = 
"drill.exec.debug.error_on_leak";
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
index 1b44c6b..2f41c26 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
@@ -61,7 +61,7 @@ public class LocalCache implements DistributedCache {
   private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps;
   private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps;
   private volatile ConcurrentMap<String, Counter> counters;
-  private static final BufferAllocator allocator = new TopLevelAllocator();
+  private static final BufferAllocator allocator = new 
TopLevelAllocator(DrillConfig.create());
 
   private static final ObjectMapper mapper = DrillConfig.create().getMapper();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 4755d32..9cd2cdd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -94,7 +94,7 @@ public class DrillClient implements Closeable, 
ConnectionThrottle{
   public DrillClient(DrillConfig config, ClusterCoordinator coordinator, 
BufferAllocator allocator){
     this.ownsZkConnection = coordinator == null;
     this.ownsAllocator = allocator == null;
-    this.allocator = allocator == null ? new TopLevelAllocator(Long.MAX_VALUE) 
: allocator;
+    this.allocator = allocator == null ? new TopLevelAllocator(config) : 
allocator;
     this.config = config;
     this.clusterCoordinator = coordinator;
     this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index 0dfc45a..4a18149 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.client;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -38,10 +39,11 @@ public class PrintingResultsListener implements 
UserResultsListener {
   RecordBatchLoader loader;
   Format format;
   int    columnWidth;
-  BufferAllocator allocator = new TopLevelAllocator();
+  BufferAllocator allocator;
   volatile Exception exception;
 
-  public PrintingResultsListener(Format format, int columnWidth) {
+  public PrintingResultsListener(DrillConfig config, Format format, int 
columnWidth) {
+    this.allocator = new TopLevelAllocator(config);
     loader = new RecordBatchLoader(allocator);
     this.format = format;
     this.columnWidth = columnWidth;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index 99e0c80..4153a24 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -187,7 +187,7 @@ public class QuerySubmitter {
     }
     Stopwatch watch = new Stopwatch();
     for (String query : queries) {
-      listener = new PrintingResultsListener(outputFormat, width);
+      listener = new PrintingResultsListener(client.getConfig(), outputFormat, 
width);
       watch.start();
       client.runQuery(queryType, query, listener);
       int rows = listener.await();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 624042e..257f6fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -40,12 +40,14 @@ public class Accountor {
   private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = 
Maps.newConcurrentMap();
   private final FragmentHandle handle;
   private Accountor parent;
+  private final boolean errorOnLeak;
 
-  public Accountor(FragmentHandle handle, Accountor parent, long max, long 
preAllocated) {
+  public Accountor(boolean errorOnLeak, FragmentHandle handle, Accountor 
parent, long max, long preAllocated) {
     // TODO: fix preallocation stuff
+    this.errorOnLeak = errorOnLeak;
     AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
     this.parent = parent;
-    this.remainder = new AtomicRemainder(parentRemainder, max, preAllocated);
+    this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, 
preAllocated);
     this.total = max;
     this.handle = handle;
     if (ENABLE_ACCOUNTING) {
@@ -103,7 +105,7 @@ public class Accountor {
       }
     }
   }
-  
+
   public void release(AccountingByteBuf buf, long size) {
     remainder.returnAllocation(size);
     if (ENABLE_ACCOUNTING) {
@@ -112,7 +114,7 @@ public class Accountor {
   }
 
   public void close() {
-     
+
     if (ENABLE_ACCOUNTING && !buffers.isEmpty()) {
       StringBuffer sb = new StringBuffer();
       sb.append("Attempted to close accountor with ");
@@ -148,13 +150,18 @@ public class Accountor {
         sb.append("at stack location:\n");
         entry.addToString(sb);
       }
+      IllegalStateException e = new IllegalStateException(sb.toString());
+      if(errorOnLeak){
+        throw e;
+      }else{
+        logger.warn("Memory leaked.", e);
+      }
 
-      throw new IllegalStateException(sb.toString());
 
     }
 
     remainder.close();
-    
+
   }
 
   public class DebugStackTrace {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
index 74849c2..1ae1e4c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
@@ -39,8 +39,10 @@ public class AtomicRemainder {
   private final long initShared;
   private final long initPrivate;
   private boolean closed = false;
+  private final boolean errorOnLeak;
 
-  public AtomicRemainder(AtomicRemainder parent, long max, long pre) {
+  public AtomicRemainder(boolean errorOnLeak, AtomicRemainder parent, long 
max, long pre) {
+    this.errorOnLeak = errorOnLeak;
     this.parent = parent;
     this.availableShared = new AtomicLong(max - pre);
     this.availablePrivate = new AtomicLong(pre);
@@ -160,11 +162,16 @@ public class AtomicRemainder {
       logger.warn("Tried to close remainder, but it has already been closed", 
new Exception());
       return;
     }
-    if (availablePrivate.get() != initPrivate || availableShared.get() != 
initShared)
-      throw new IllegalStateException(
+    if (availablePrivate.get() != initPrivate || availableShared.get() != 
initShared){
+      IllegalStateException e = new IllegalStateException(
           String
               .format(ERROR, initPrivate, availablePrivate.get(), initPrivate 
- availablePrivate.get(), initShared, availableShared.get(), initShared - 
availableShared.get()));
-    
+      if(errorOnLeak){
+        throw e;
+      }else{
+        logger.warn("Memory leaked during query.", e);
+      }
+    }
     if(parent != null) parent.returnAllocation(initPrivate);
     closed = true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 836f593..6c4d44f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -39,18 +39,28 @@ public class TopLevelAllocator implements BufferAllocator {
   private final Set<ChildAllocator> children;
   private final PooledByteBufAllocatorL innerAllocator = 
PooledByteBufAllocatorL.DEFAULT;
   private final Accountor acct;
+  private final boolean errorOnLeak;
 
+  @Deprecated
   public TopLevelAllocator() {
     this(DrillConfig.getMaxDirectMemory());
   }
 
-  public TopLevelAllocator(DrillConfig config) {
-    this(Math.min(DrillConfig.getMaxDirectMemory(), 
config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)));
-  }
-  
+  @Deprecated
   public TopLevelAllocator(long maximumAllocation) {
-    this.acct = new Accountor(null, null, maximumAllocation, 0);
-    this.children = ENABLE_ACCOUNTING ? new HashSet<ChildAllocator>() : null; 
+    this(maximumAllocation, true);
+  }
+
+  private TopLevelAllocator(long maximumAllocation, boolean errorOnLeak){
+    this.errorOnLeak = errorOnLeak;
+    this.acct = new Accountor(errorOnLeak, null, null, maximumAllocation, 0);
+    this.children = ENABLE_ACCOUNTING ? new HashSet<ChildAllocator>() : null;
+  }
+
+  public TopLevelAllocator(DrillConfig config) {
+    this(Math.min(DrillConfig.getMaxDirectMemory(), 
config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)),
+        config.getBoolean(ExecConstants.ERROR_ON_MEMORY_LEAK)
+        );
   }
 
   public AccountingByteBuf buffer(int min, int max) {
@@ -60,7 +70,7 @@ public class TopLevelAllocator implements BufferAllocator {
     acct.reserved(min, wrapped);
     return wrapped;
   }
-  
+
   @Override
   public AccountingByteBuf buffer(int size) {
     return buffer(size, size);
@@ -98,7 +108,7 @@ public class TopLevelAllocator implements BufferAllocator {
     acct.close();
   }
 
-  
+
   private class ChildAllocator implements BufferAllocator{
 
     private Accountor childAcct;
@@ -108,23 +118,23 @@ public class TopLevelAllocator implements BufferAllocator 
{
 
     public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, 
long max, long pre) throws OutOfMemoryException{
       assert max >= pre;
-      childAcct = new Accountor(handle, parentAccountor, max, pre);
+      childAcct = new Accountor(errorOnLeak, handle, parentAccountor, max, 
pre);
       this.handle = handle;
     }
-    
+
     @Override
     public AccountingByteBuf buffer(int size, int max) {
       if(!childAcct.reserve(size)){
         logger.warn("Unable to allocate buffer of size {} due to memory limit. 
Current allocation: {}", size, getAllocatedMemory());
         return null;
       };
-      
+
       ByteBuf buffer = innerAllocator.directBuffer(size, max);
       AccountingByteBuf wrapped = new AccountingByteBuf(childAcct, 
(PooledUnsafeDirectByteBufL) buffer);
       childAcct.reserved(buffer.capacity(), wrapped);
       return wrapped;
     }
-    
+
     public AccountingByteBuf buffer(int size) {
       return buffer(size, size);
     }
@@ -146,7 +156,7 @@ public class TopLevelAllocator implements BufferAllocator {
     }
 
     public PreAllocator getNewPreAllocator(){
-      return new PreAlloc(this.childAcct); 
+      return new PreAlloc(this.childAcct);
     }
 
     @Override
@@ -161,9 +171,16 @@ public class TopLevelAllocator implements BufferAllocator {
               sb.append(elements[i]);
               sb.append("\n");
             }
-            throw new IllegalStateException(String.format(
+
+
+            IllegalStateException e = new IllegalStateException(String.format(
                     "Failure while trying to close child allocator: Child 
level allocators not closed. Fragment %d:%d. Stack trace: \n %s",
                     handle.getMajorFragmentId(), handle.getMinorFragmentId(), 
sb.toString()));
+            if(errorOnLeak){
+              throw e;
+            }else{
+              logger.warn("Memory leak.", e);
+            }
           }
         }
       }
@@ -179,34 +196,34 @@ public class TopLevelAllocator implements BufferAllocator 
{
     public long getAllocatedMemory() {
       return childAcct.getAllocation();
     }
-    
+
   }
-  
+
   public PreAllocator getNewPreAllocator(){
-    return new PreAlloc(this.acct); 
+    return new PreAlloc(this.acct);
   }
-  
+
   public class PreAlloc implements PreAllocator{
     int bytes = 0;
     final Accountor acct;
     private PreAlloc(Accountor acct){
       this.acct = acct;
     }
-    
+
     /**
-     * 
+     *
      */
     public boolean preAllocate(int bytes){
-      
+
       if(!acct.reserve(bytes)){
         return false;
       }
       this.bytes += bytes;
       return true;
-   
+
     }
-    
-    
+
+
     public AccountingByteBuf getAllocation(){
       AccountingByteBuf b = new AccountingByteBuf(acct, 
(PooledUnsafeDirectByteBufL) innerAllocator.buffer(bytes));
       acct.reserved(bytes, b);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 7febb10..2914b67 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import io.netty.buffer.Unpooled;
+
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -30,6 +32,7 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -47,7 +50,6 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarCharVector;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -160,15 +162,8 @@ public class ScanBatch implements RecordBatch {
     try {
       partitionVectors = Lists.newArrayList();
       for (int i : selectedPartitionColumns) {
-        MaterializedField field;
-        ValueVector v;
-        if (partitionValues.length > i) {
-          field = 
MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + 
i), Types.required(MinorType.VARCHAR));
-          v = mutator.addField(field, VarCharVector.class);
-        } else {
-          field = 
MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + 
i), Types.optional(MinorType.VARCHAR));
-          v = mutator.addField(field, NullableVarCharVector.class);
-        }
+        MaterializedField field = 
MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + 
i), Types.optional(MinorType.VARCHAR));
+        ValueVector v = mutator.addField(field, NullableVarCharVector.class);
         partitionVectors.add(v);
       }
     } catch(SchemaChangeException e) {
@@ -179,12 +174,18 @@ public class ScanBatch implements RecordBatch {
   private void populatePartitionVectors() {
     for (int i : selectedPartitionColumns) {
       if (partitionValues.length > i) {
-        VarCharVector v = (VarCharVector) partitionVectors.get(i);
+        NullableVarCharVector v = (NullableVarCharVector) 
partitionVectors.get(i);
         String val = partitionValues[i];
-        byte[] bytes = val.getBytes();
         AllocationHelper.allocate(v, recordCount, val.length());
+        NullableVarCharHolder h = new NullableVarCharHolder();
+        byte[] bytes = val.getBytes();
+        h.buffer = Unpooled.buffer(bytes.length);
+        h.buffer.writeBytes(bytes);
+        h.start = 0;
+        h.isSet = 1;
+        h.end = bytes.length;
         for (int j = 0; j < recordCount; j++) {
-          v.getMutator().setSafe(j, bytes);
+          v.getMutator().setSafe(j, h);
         }
         v.getMutator().setValueCount(recordCount);
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 016d328..4261885 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -31,20 +31,20 @@ import com.codahale.metrics.MetricRegistry;
 
 public class BootStrapContext implements Closeable{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BootStrapContext.class);
-  
+
   private final DrillConfig config;
   private final NioEventLoopGroup loop;
   private final NioEventLoopGroup loop2;
   private final MetricRegistry metrics;
   private final BufferAllocator allocator;
-  
+
   public BootStrapContext(DrillConfig config) {
     super();
     this.config = config;
     this.loop = new 
NioEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), new 
NamedThreadFactory("BitServer-"));
     this.loop2 = new 
NioEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), new 
NamedThreadFactory("BitClient-"));
     this.metrics = new MetricRegistry();
-    this.allocator = new TopLevelAllocator();
+    this.allocator = new TopLevelAllocator(config);
   }
 
   public DrillConfig getConfig() {
@@ -71,5 +71,5 @@ public class BootStrapContext implements Closeable{
     loop.shutdownGracefully();
     allocator.close();
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index cd28d30..15d2e37 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -90,7 +90,7 @@ public class TextFormatPlugin extends 
EasyFormatPlugin<TextFormatPlugin.TextForm
 
     options.put("extension", 
((TextFormatConfig)getConfig()).getExtensions().get(0));
 
-    RecordWriter recordWriter = new DrillTextRecordWriter();
+    RecordWriter recordWriter = new 
DrillTextRecordWriter(context.getAllocator());
     recordWriter.init(options);
 
     return recordWriter;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index b6840f8..55f2b72 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -17,18 +17,20 @@
  */
 package org.apache.drill.exec.store.text;
 
-import com.google.common.base.Joiner;
-import org.apache.drill.exec.store.StringOutputRecordWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.store.StringOutputRecordWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+
 public class DrillTextRecordWriter extends StringOutputRecordWriter {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class);
 
@@ -47,6 +49,10 @@ public class DrillTextRecordWriter extends 
StringOutputRecordWriter {
   private boolean fRecordStarted = false; // true once the startRecord() is 
called until endRecord() is called
   private StringBuilder currentRecord; // contains the current record 
separated by field delimiter
 
+  public DrillTextRecordWriter(BufferAllocator allocator){
+    super(allocator);
+  }
+
   @Override
   public void init(Map<String, String> writerOptions) throws IOException {
     this.location = writerOptions.get("location");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index f8396bb..982f43f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -134,5 +134,6 @@ drill.exec: {
       max: 20000000000,
       initial: 20000000
     }
-  }
+  },
+  debug.error_on_leak: true
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
index 337477e..224d59f 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
@@ -60,11 +60,12 @@ abstract class DrillConnectionImpl extends 
AvaticaConnection implements org.apac
     super(driver, factory, url, info);
     this.config = new DrillConnectionConfig(info);
 
-  this.allocator = new TopLevelAllocator();
+
 
     try{
       if(config.isLocal()){
         DrillConfig dConfig = DrillConfig.create();
+        this.allocator = new TopLevelAllocator(dConfig);
         RemoteServiceSet set = GlobalServiceSetReference.SETS.get();
         if(set == null){
           // we're embedded, start a local drill bit.
@@ -83,7 +84,9 @@ abstract class DrillConnectionImpl extends AvaticaConnection 
implements org.apac
         this.client = new DrillClient(dConfig, set.getCoordinator());
         this.client.connect(null, info);
       }else{
-        this.client = new DrillClient(DrillConfig.createClient());
+        DrillConfig dConfig = DrillConfig.createClient();
+        this.allocator = new TopLevelAllocator(dConfig);
+        this.client = new DrillClient();
         this.client.connect(config.getZookeeperConnectionString(), info);
       }
     }catch(RpcException e){

Reply via email to