Repository: incubator-drill Updated Branches: refs/heads/master 8490d7433 -> 3db1d5a32
Allow disabling of memory leak query termination using -Ddrill.exec.debug.error_on_leak=false Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/65b36e83 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/65b36e83 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/65b36e83 Branch: refs/heads/master Commit: 65b36e83168507e9bd2ee62320deef08f6fb585c Parents: 8490d74 Author: Jacques Nadeau <[email protected]> Authored: Tue Jun 3 17:02:38 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Tue Jun 3 17:04:41 2014 -0700 ---------------------------------------------------------------------- .../templates/StringOutputRecordWriter.java | 9 ++- .../org/apache/drill/exec/ExecConstants.java | 3 +- .../drill/exec/cache/local/LocalCache.java | 2 +- .../apache/drill/exec/client/DrillClient.java | 2 +- .../exec/client/PrintingResultsListener.java | 6 +- .../drill/exec/client/QuerySubmitter.java | 2 +- .../org/apache/drill/exec/memory/Accountor.java | 19 ++++-- .../drill/exec/memory/AtomicRemainder.java | 15 +++-- .../drill/exec/memory/TopLevelAllocator.java | 65 ++++++++++++-------- .../drill/exec/physical/impl/ScanBatch.java | 27 ++++---- .../drill/exec/server/BootStrapContext.java | 8 +-- .../exec/store/easy/text/TextFormatPlugin.java | 2 +- .../exec/store/text/DrillTextRecordWriter.java | 18 ++++-- .../src/main/resources/drill-module.conf | 3 +- .../apache/drill/jdbc/DrillConnectionImpl.java | 7 ++- 15 files changed, 118 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java index 506cace..7357246 100644 --- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.holders.*; import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.vector.*; @@ -45,7 +46,11 @@ import java.util.Map; public abstract class StringOutputRecordWriter implements RecordWriter { private ValueVector[] columnVectors; - + private final BufferAllocator allocator; + protected StringOutputRecordWriter(BufferAllocator allocator){ + this.allocator = allocator; + } + public void updateSchema(BatchSchema schema) throws IOException { columnVectors = new ValueVector[schema.getFieldCount()]; @@ -57,7 +62,7 @@ public abstract class StringOutputRecordWriter implements RecordWriter { startNewSchema(columnNames); for (int i=0; i<columnVectors.length; i++) { - columnVectors[i] = TypeHelper.getNewVector(schema.getColumn(i), new TopLevelAllocator()); + columnVectors[i] = TypeHelper.getNewVector(schema.getColumn(i), allocator); AllocationHelper.allocate(columnVectors[i], 1, TypeHelper.getSize(schema.getColumn(i).getType())); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index e66e93c..1ece198 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -74,6 +74,5 @@ public interface ExecConstants { public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024); public static final String HTTP_ENABLE = "drill.exec.http.enabled"; public static final String HTTP_PORT = "drill.exec.http.port"; - - + public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak"; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java index 1b44c6b..2f41c26 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java @@ -61,7 +61,7 @@ public class LocalCache implements DistributedCache { private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps; private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps; private volatile ConcurrentMap<String, Counter> counters; - private static final BufferAllocator allocator = new TopLevelAllocator(); + private static final BufferAllocator allocator = new TopLevelAllocator(DrillConfig.create()); private static final ObjectMapper mapper = DrillConfig.create().getMapper(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 4755d32..9cd2cdd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -94,7 +94,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator){ this.ownsZkConnection = coordinator == null; this.ownsAllocator = allocator == null; - this.allocator = allocator == null ? new TopLevelAllocator(Long.MAX_VALUE) : allocator; + this.allocator = allocator == null ? new TopLevelAllocator(config) : allocator; this.config = config; this.clusterCoordinator = coordinator; this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java index 0dfc45a..4a18149 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.client; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.client.QuerySubmitter.Format; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; @@ -38,10 +39,11 @@ public class PrintingResultsListener implements UserResultsListener { RecordBatchLoader loader; Format format; int columnWidth; - BufferAllocator allocator = new TopLevelAllocator(); + BufferAllocator allocator; volatile Exception exception; - public PrintingResultsListener(Format format, int columnWidth) { + public PrintingResultsListener(DrillConfig config, Format format, int columnWidth) { + this.allocator = new TopLevelAllocator(config); loader = new RecordBatchLoader(allocator); this.format = format; this.columnWidth = columnWidth; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java index 99e0c80..4153a24 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java @@ -187,7 +187,7 @@ public class QuerySubmitter { } Stopwatch watch = new Stopwatch(); for (String query : queries) { - listener = new PrintingResultsListener(outputFormat, width); + listener = new PrintingResultsListener(client.getConfig(), outputFormat, width); watch.start(); client.runQuery(queryType, query, listener); int rows = listener.await(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index 624042e..257f6fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -40,12 +40,14 @@ public class Accountor { private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap(); private final FragmentHandle handle; private Accountor parent; + private final boolean errorOnLeak; - public Accountor(FragmentHandle handle, Accountor parent, long max, long preAllocated) { + public Accountor(boolean errorOnLeak, FragmentHandle handle, Accountor parent, long max, long preAllocated) { // TODO: fix preallocation stuff + this.errorOnLeak = errorOnLeak; AtomicRemainder parentRemainder = parent != null ? parent.remainder : null; this.parent = parent; - this.remainder = new AtomicRemainder(parentRemainder, max, preAllocated); + this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, preAllocated); this.total = max; this.handle = handle; if (ENABLE_ACCOUNTING) { @@ -103,7 +105,7 @@ public class Accountor { } } } - + public void release(AccountingByteBuf buf, long size) { remainder.returnAllocation(size); if (ENABLE_ACCOUNTING) { @@ -112,7 +114,7 @@ public class Accountor { } public void close() { - + if (ENABLE_ACCOUNTING && !buffers.isEmpty()) { StringBuffer sb = new StringBuffer(); sb.append("Attempted to close accountor with "); @@ -148,13 +150,18 @@ public class Accountor { sb.append("at stack location:\n"); entry.addToString(sb); } + IllegalStateException e = new IllegalStateException(sb.toString()); + if(errorOnLeak){ + throw e; + }else{ + logger.warn("Memory leaked.", e); + } - throw new IllegalStateException(sb.toString()); } remainder.close(); - + } public class DebugStackTrace { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java index 74849c2..1ae1e4c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java @@ -39,8 +39,10 @@ public class AtomicRemainder { private final long initShared; private final long initPrivate; private boolean closed = false; + private final boolean errorOnLeak; - public AtomicRemainder(AtomicRemainder parent, long max, long pre) { + public AtomicRemainder(boolean errorOnLeak, AtomicRemainder parent, long max, long pre) { + this.errorOnLeak = errorOnLeak; this.parent = parent; this.availableShared = new AtomicLong(max - pre); this.availablePrivate = new AtomicLong(pre); @@ -160,11 +162,16 @@ public class AtomicRemainder { logger.warn("Tried to close remainder, but it has already been closed", new Exception()); return; } - if (availablePrivate.get() != initPrivate || availableShared.get() != initShared) - throw new IllegalStateException( + if (availablePrivate.get() != initPrivate || availableShared.get() != initShared){ + IllegalStateException e = new IllegalStateException( String .format(ERROR, initPrivate, availablePrivate.get(), initPrivate - availablePrivate.get(), initShared, availableShared.get(), initShared - availableShared.get())); - + if(errorOnLeak){ + throw e; + }else{ + logger.warn("Memory leaked during query.", e); + } + } if(parent != null) parent.returnAllocation(initPrivate); closed = true; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index 836f593..6c4d44f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -39,18 +39,28 @@ public class TopLevelAllocator implements BufferAllocator { private final Set<ChildAllocator> children; private final PooledByteBufAllocatorL innerAllocator = PooledByteBufAllocatorL.DEFAULT; private final Accountor acct; + private final boolean errorOnLeak; + @Deprecated public TopLevelAllocator() { this(DrillConfig.getMaxDirectMemory()); } - public TopLevelAllocator(DrillConfig config) { - this(Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC))); - } - + @Deprecated public TopLevelAllocator(long maximumAllocation) { - this.acct = new Accountor(null, null, maximumAllocation, 0); - this.children = ENABLE_ACCOUNTING ? new HashSet<ChildAllocator>() : null; + this(maximumAllocation, true); + } + + private TopLevelAllocator(long maximumAllocation, boolean errorOnLeak){ + this.errorOnLeak = errorOnLeak; + this.acct = new Accountor(errorOnLeak, null, null, maximumAllocation, 0); + this.children = ENABLE_ACCOUNTING ? new HashSet<ChildAllocator>() : null; + } + + public TopLevelAllocator(DrillConfig config) { + this(Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)), + config.getBoolean(ExecConstants.ERROR_ON_MEMORY_LEAK) + ); } public AccountingByteBuf buffer(int min, int max) { @@ -60,7 +70,7 @@ public class TopLevelAllocator implements BufferAllocator { acct.reserved(min, wrapped); return wrapped; } - + @Override public AccountingByteBuf buffer(int size) { return buffer(size, size); @@ -98,7 +108,7 @@ public class TopLevelAllocator implements BufferAllocator { acct.close(); } - + private class ChildAllocator implements BufferAllocator{ private Accountor childAcct; @@ -108,23 +118,23 @@ public class TopLevelAllocator implements BufferAllocator { public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre) throws OutOfMemoryException{ assert max >= pre; - childAcct = new Accountor(handle, parentAccountor, max, pre); + childAcct = new Accountor(errorOnLeak, handle, parentAccountor, max, pre); this.handle = handle; } - + @Override public AccountingByteBuf buffer(int size, int max) { if(!childAcct.reserve(size)){ logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory()); return null; }; - + ByteBuf buffer = innerAllocator.directBuffer(size, max); AccountingByteBuf wrapped = new AccountingByteBuf(childAcct, (PooledUnsafeDirectByteBufL) buffer); childAcct.reserved(buffer.capacity(), wrapped); return wrapped; } - + public AccountingByteBuf buffer(int size) { return buffer(size, size); } @@ -146,7 +156,7 @@ public class TopLevelAllocator implements BufferAllocator { } public PreAllocator getNewPreAllocator(){ - return new PreAlloc(this.childAcct); + return new PreAlloc(this.childAcct); } @Override @@ -161,9 +171,16 @@ public class TopLevelAllocator implements BufferAllocator { sb.append(elements[i]); sb.append("\n"); } - throw new IllegalStateException(String.format( + + + IllegalStateException e = new IllegalStateException(String.format( "Failure while trying to close child allocator: Child level allocators not closed. Fragment %d:%d. Stack trace: \n %s", handle.getMajorFragmentId(), handle.getMinorFragmentId(), sb.toString())); + if(errorOnLeak){ + throw e; + }else{ + logger.warn("Memory leak.", e); + } } } } @@ -179,34 +196,34 @@ public class TopLevelAllocator implements BufferAllocator { public long getAllocatedMemory() { return childAcct.getAllocation(); } - + } - + public PreAllocator getNewPreAllocator(){ - return new PreAlloc(this.acct); + return new PreAlloc(this.acct); } - + public class PreAlloc implements PreAllocator{ int bytes = 0; final Accountor acct; private PreAlloc(Accountor acct){ this.acct = acct; } - + /** - * + * */ public boolean preAllocate(int bytes){ - + if(!acct.reserve(bytes)){ return false; } this.bytes += bytes; return true; - + } - - + + public AccountingByteBuf getAllocation(){ AccountingByteBuf b = new AccountingByteBuf(acct, (PooledUnsafeDirectByteBufL) innerAllocator.buffer(bytes)); acct.reserved(bytes, b); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 7febb10..2914b67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.impl; +import io.netty.buffer.Unpooled; + import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -30,6 +32,7 @@ import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.NullableVarCharHolder; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.PhysicalOperator; @@ -47,7 +50,6 @@ import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VarCharVector; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -160,15 +162,8 @@ public class ScanBatch implements RecordBatch { try { partitionVectors = Lists.newArrayList(); for (int i : selectedPartitionColumns) { - MaterializedField field; - ValueVector v; - if (partitionValues.length > i) { - field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.required(MinorType.VARCHAR)); - v = mutator.addField(field, VarCharVector.class); - } else { - field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR)); - v = mutator.addField(field, NullableVarCharVector.class); - } + MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR)); + ValueVector v = mutator.addField(field, NullableVarCharVector.class); partitionVectors.add(v); } } catch(SchemaChangeException e) { @@ -179,12 +174,18 @@ public class ScanBatch implements RecordBatch { private void populatePartitionVectors() { for (int i : selectedPartitionColumns) { if (partitionValues.length > i) { - VarCharVector v = (VarCharVector) partitionVectors.get(i); + NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(i); String val = partitionValues[i]; - byte[] bytes = val.getBytes(); AllocationHelper.allocate(v, recordCount, val.length()); + NullableVarCharHolder h = new NullableVarCharHolder(); + byte[] bytes = val.getBytes(); + h.buffer = Unpooled.buffer(bytes.length); + h.buffer.writeBytes(bytes); + h.start = 0; + h.isSet = 1; + h.end = bytes.length; for (int j = 0; j < recordCount; j++) { - v.getMutator().setSafe(j, bytes); + v.getMutator().setSafe(j, h); } v.getMutator().setValueCount(recordCount); } else { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java index 016d328..4261885 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java @@ -31,20 +31,20 @@ import com.codahale.metrics.MetricRegistry; public class BootStrapContext implements Closeable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BootStrapContext.class); - + private final DrillConfig config; private final NioEventLoopGroup loop; private final NioEventLoopGroup loop2; private final MetricRegistry metrics; private final BufferAllocator allocator; - + public BootStrapContext(DrillConfig config) { super(); this.config = config; this.loop = new NioEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), new NamedThreadFactory("BitServer-")); this.loop2 = new NioEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), new NamedThreadFactory("BitClient-")); this.metrics = new MetricRegistry(); - this.allocator = new TopLevelAllocator(); + this.allocator = new TopLevelAllocator(config); } public DrillConfig getConfig() { @@ -71,5 +71,5 @@ public class BootStrapContext implements Closeable{ loop.shutdownGracefully(); allocator.close(); } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index cd28d30..15d2e37 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -90,7 +90,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm options.put("extension", ((TextFormatConfig)getConfig()).getExtensions().get(0)); - RecordWriter recordWriter = new DrillTextRecordWriter(); + RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator()); recordWriter.init(options); return recordWriter; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java index b6840f8..55f2b72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java @@ -17,18 +17,20 @@ */ package org.apache.drill.exec.store.text; -import com.google.common.base.Joiner; -import org.apache.drill.exec.store.StringOutputRecordWriter; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import java.io.DataOutputStream; import java.io.IOException; import java.io.PrintStream; import java.util.List; import java.util.Map; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.store.StringOutputRecordWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Joiner; + public class DrillTextRecordWriter extends StringOutputRecordWriter { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class); @@ -47,6 +49,10 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter { private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called private StringBuilder currentRecord; // contains the current record separated by field delimiter + public DrillTextRecordWriter(BufferAllocator allocator){ + super(allocator); + } + @Override public void init(Map<String, String> writerOptions) throws IOException { this.location = writerOptions.get("location"); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index f8396bb..982f43f 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -134,5 +134,6 @@ drill.exec: { max: 20000000000, initial: 20000000 } - } + }, + debug.error_on_leak: true } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65b36e83/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java index 337477e..224d59f 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java @@ -60,11 +60,12 @@ abstract class DrillConnectionImpl extends AvaticaConnection implements org.apac super(driver, factory, url, info); this.config = new DrillConnectionConfig(info); - this.allocator = new TopLevelAllocator(); + try{ if(config.isLocal()){ DrillConfig dConfig = DrillConfig.create(); + this.allocator = new TopLevelAllocator(dConfig); RemoteServiceSet set = GlobalServiceSetReference.SETS.get(); if(set == null){ // we're embedded, start a local drill bit. @@ -83,7 +84,9 @@ abstract class DrillConnectionImpl extends AvaticaConnection implements org.apac this.client = new DrillClient(dConfig, set.getCoordinator()); this.client.connect(null, info); }else{ - this.client = new DrillClient(DrillConfig.createClient()); + DrillConfig dConfig = DrillConfig.createClient(); + this.allocator = new TopLevelAllocator(dConfig); + this.client = new DrillClient(); this.client.connect(config.getZookeeperConnectionString(), info); } }catch(RpcException e){
