Repository: drill Updated Branches: refs/heads/master 0a27a033a -> 48d8a59d1
DRILL-3445: BufferAllocator.buffer() implementations should throw an OutOfMemoryRuntimeException Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/48d8a59d Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/48d8a59d Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/48d8a59d Branch: refs/heads/master Commit: 48d8a59d1b97988c006f85daad0ae2fcb3a9cd06 Parents: 1c9093e Author: adeneche <[email protected]> Authored: Wed Jul 1 12:10:07 2015 -0700 Committer: Hanifi Gunes <[email protected]> Committed: Mon Jul 6 16:28:29 2015 -0700 ---------------------------------------------------------------------- .../codegen/templates/FixedValueVectors.java | 10 +----- .../templates/VariableLengthVectors.java | 28 ++++----------- .../cache/VectorAccessibleSerializable.java | 6 +--- .../drill/exec/memory/BufferAllocator.java | 2 ++ .../drill/exec/memory/TopLevelAllocator.java | 23 ++++++------- .../physical/impl/filter/FilterTemplate2.java | 2 +- .../physical/impl/xsort/ExternalSortBatch.java | 4 +-- .../exec/record/selection/SelectionVector2.java | 16 ++++++--- .../drill/exec/rpc/ProtobufLengthDecoder.java | 7 ++-- .../ParquetDirectByteBufferAllocator.java | 3 -- .../org/apache/drill/exec/vector/BitVector.java | 12 ++----- .../parquet/hadoop/ColumnChunkIncReadStore.java | 3 ++ .../apache/drill/TestAllocationException.java | 36 ++++++-------------- .../drill/exec/memory/TestAllocators.java | 34 ++++++------------ 14 files changed, 67 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java index e8a4d5f..cadcfd9 100644 --- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java @@ -125,11 +125,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F final int curSize = (int)size; clear(); - final DrillBuf newBuf = allocator.buffer(curSize); - if (newBuf == null) { - throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", size)); - } - data = newBuf; + data = allocator.buffer(curSize); data.readerIndex(0); allocationSizeInBytes = curSize; } @@ -147,10 +143,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F logger.debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]", field, allocationSizeInBytes, newAllocationSize); final DrillBuf newBuf = allocator.buffer((int)newAllocationSize); - if (newBuf == null) { - throw new OutOfMemoryRuntimeException(String.format("Failure while reallocating buffer to %d bytes", newAllocationSize)); - } - newBuf.setBytes(0, data, 0, data.capacity()); newBuf.setZero(newBuf.capacity() / 2, newBuf.capacity() / 2); newBuf.writerIndex(data.writerIndex()); http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java index 2c2e6b6..fc08ac6 100644 --- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java @@ -300,34 +300,24 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V * buffers for multiple vectors. If one of the allocations failed we need to * clear all the memory that we allocated */ - boolean success = false; try { final int requestedSize = (int)curAllocationSize; - DrillBuf newBuf = allocator.buffer(requestedSize); - if (newBuf == null) { - return false; - } - this.data = newBuf; - success = offsetVector.allocateNewSafe(); - } finally { - if (!success) { - clear(); - } + data = allocator.buffer(requestedSize); + offsetVector.allocateNew(); + } catch (OutOfMemoryRuntimeException e) { + clear(); + return false; } data.readerIndex(0); offsetVector.zeroVector(); - return success; + return true; } public void allocateNew(int totalBytes, int valueCount) { clear(); assert totalBytes >= 0; try { - final DrillBuf newBuf = allocator.buffer(totalBytes); - if (newBuf == null) { - throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", totalBytes)); - } - data = newBuf; + data = allocator.buffer(totalBytes); offsetVector.allocateNew(valueCount + 1); } catch (DrillRuntimeException e) { clear(); @@ -345,10 +335,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } final DrillBuf newBuf = allocator.buffer((int)newAllocationSize); - if(newBuf == null) { - throw new OutOfMemoryRuntimeException( - String.format("Failure while reallocating buffer of %d bytes", newAllocationSize)); - } newBuf.setBytes(0, data, 0, data.capacity()); data.release(); data = newBuf; http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java index 016cd92..a5d2ce8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java @@ -26,7 +26,6 @@ import java.util.List; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.SerializedField; @@ -47,7 +46,7 @@ import com.google.common.collect.Lists; * from an InputStream and construct a new VectorContainer. */ public class VectorAccessibleSerializable extends AbstractStreamSerializable { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class); static final MetricRegistry metrics = DrillMetrics.getInstance(); static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime"); @@ -114,9 +113,6 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { int dataLength = metaData.getBufferLength(); MaterializedField field = MaterializedField.create(metaData); DrillBuf buf = allocator.buffer(dataLength); - if (buf == null) { - throw new IOException(new OutOfMemoryException()); - } final ValueVector vector; try { buf.writeBytes(input, dataLength); http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java index 811cceb..b01534e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java @@ -39,6 +39,7 @@ public interface BufferAllocator extends Closeable { * @param size * The size in bytes. * @return A new ByteBuf. + * @throws OutOfMemoryRuntimeException if buffer cannot be allocated */ public abstract DrillBuf buffer(int size); @@ -49,6 +50,7 @@ public interface BufferAllocator extends Closeable { * @param minSize The minimum size in bytes. * @param maxSize The maximum size in bytes. * @return A new ByteBuf. + * @throws OutOfMemoryRuntimeException if buffer cannot be allocated */ public abstract DrillBuf buffer(int minSize, int maxSize); http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index b4386a4..05849ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -93,7 +93,7 @@ public class TopLevelAllocator implements BufferAllocator { return empty; } if(!acct.reserve(min)) { - return null; + throw new OutOfMemoryRuntimeException(createErrorMsg(this, min)); } try { @@ -104,7 +104,7 @@ public class TopLevelAllocator implements BufferAllocator { } catch (OutOfMemoryError e) { if ("Direct buffer memory".equals(e.getMessage())) { acct.release(min); - return null; + throw new OutOfMemoryRuntimeException(createErrorMsg(this, min), e); } else { throw e; } @@ -233,25 +233,18 @@ public class TopLevelAllocator implements BufferAllocator { return acct.transferIn(b, b.capacity()); } + @Override public DrillBuf buffer(int size, int max) { if (ENABLE_ACCOUNTING) { - try { - injector.injectUnchecked(fragmentContext, CHILD_BUFFER_INJECTION_SITE); - } catch (NullPointerException e) { - // This is an unusual way to use exception injection. If we inject a NullPointerException into this site - // it will actually cause this method to return null, simulating a "normal" failure to allocate memory - // this can be useful to check if the caller will properly handle nulls - return null; - } + injector.injectUnchecked(fragmentContext, CHILD_BUFFER_INJECTION_SITE); } if (size == 0) { return empty; } if(!childAcct.reserve(size)) { - logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory(), new Exception()); - return null; + throw new OutOfMemoryRuntimeException(createErrorMsg(this, size)); } try { @@ -262,7 +255,7 @@ public class TopLevelAllocator implements BufferAllocator { } catch (OutOfMemoryError e) { if ("Direct buffer memory".equals(e.getMessage())) { childAcct.release(size); - return null; + throw new OutOfMemoryRuntimeException(createErrorMsg(this, size), e); } else { throw e; } @@ -401,4 +394,8 @@ public class TopLevelAllocator implements BufferAllocator { } } + private static String createErrorMsg(final BufferAllocator allocator, final int size) { + return String.format("Unable to allocate buffer of size %d due to memory limit. Current allocation: %d", + size, allocator.getAllocatedMemory()); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index cd2fbe9..11d01d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -64,7 +64,7 @@ public abstract class FilterTemplate2 implements Filterer{ if (recordCount == 0) { return; } - if (! outgoingSelectionVector.allocateNew(recordCount)) { + if (! outgoingSelectionVector.allocateNewSafe(recordCount)) { throw new OutOfMemoryRuntimeException("Unable to allocate filter batch"); } switch(svMode){ http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 6da5582..4bb1572 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -533,7 +533,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { private SelectionVector2 newSV2() throws OutOfMemoryException, InterruptedException { SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator()); - if (!sv2.allocateNew(incoming.getRecordCount())) { + if (!sv2.allocateNewSafe(incoming.getRecordCount())) { try { spilledBatchGroups.addFirst(mergeAndSpill(batchGroups)); } catch (SchemaChangeException e) { @@ -550,7 +550,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } } waitTime *= 2; - if (sv2.allocateNew(incoming.getRecordCount())) { + if (sv2.allocateNewSafe(incoming.getRecordCount())) { break; } if (waitTime >= 32) { http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java index 7a7c012..ba8640a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java @@ -23,13 +23,14 @@ import java.io.Closeable; import java.io.IOException; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; import org.apache.drill.exec.record.DeadBuf; /** * A selection vector that fronts, at most, a */ public class SelectionVector2 implements Closeable{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class); private final BufferAllocator allocator; private int recordCount; @@ -90,15 +91,20 @@ public class SelectionVector2 implements Closeable{ buffer.setChar(index, value); } - public boolean allocateNew(int size){ - clear(); - buffer = allocator.buffer(size * RECORD_SIZE); - if (buffer == null) { + public boolean allocateNewSafe(int size) { + try { + allocateNew(size); + } catch (OutOfMemoryRuntimeException e) { return false; } return true; } + public void allocateNew(int size) { + clear(); + buffer = allocator.buffer(size * RECORD_SIZE); + } + @Override public SelectionVector2 clone(){ SelectionVector2 newSV = new SelectionVector2(allocator); http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java index 4e03f11..3e2adaf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java @@ -27,6 +27,7 @@ import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; import com.google.protobuf.CodedInputStream; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; /** * Modified version of {@link io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder} that avoids bytebuf copy. @@ -81,8 +82,10 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder { } else { // need to make buffer copy, otherwise netty will try to refill this buffer if we move the readerIndex forward... // TODO: Can we avoid this copy? - ByteBuf outBuf = allocator.buffer(length); - if (outBuf == null) { + ByteBuf outBuf; + try { + outBuf = allocator.buffer(length); + } catch (OutOfMemoryRuntimeException e) { logger.warn("Failure allocating buffer on incoming stream due to memory limits. Current Allocation: {}.", allocator.getAllocatedMemory()); in.resetReaderIndex(); outOfMemoryHandler.handle(); http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java index cf30db6..10c8fd1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java @@ -49,9 +49,6 @@ public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator { @Override public ByteBuffer allocate(int sz) { ByteBuf bb = allocator.buffer(sz); - if (bb == null) { - throw new OutOfMemoryRuntimeException(); - } ByteBuffer b = bb.nioBuffer(0, sz); allocatedBuffers.put(System.identityHashCode(b), bb); logger.debug("ParquetDirectByteBufferAllocator: Allocated "+sz+" bytes. Allocated ByteBuffer id: "+System.identityHashCode(b)); http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java index 1d48043..054ef82 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -121,11 +121,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe final int curSize = (int)size; clear(); - final DrillBuf newBuf = allocator.buffer(curSize); - if (newBuf == null) { - throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of d% bytes.", curSize)); - } - data = newBuf; + data = allocator.buffer(curSize); zeroVector(); allocationSizeInBytes = curSize; } @@ -141,15 +137,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe final int curSize = (int)newAllocationSize; final DrillBuf newBuf = allocator.buffer(curSize); - if (newBuf == null) { - throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes.", newAllocationSize)); - } - newBuf.setZero(0, newBuf.capacity()); newBuf.setBytes(0, data, 0, data.capacity()); data.release(); data = newBuf; - allocationSizeInBytes = curSize; + allocationSizeInBytes = curSize; } /** http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java index 6337d4c..d8bf2fd 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; import org.apache.drill.exec.store.parquet.DirectCodecFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -177,6 +178,8 @@ public class ColumnChunkIncReadStore implements PageReadStore { } in.close(); return null; + } catch (OutOfMemoryRuntimeException e) { + throw e; // throw as it is } catch (Exception e) { throw new DrillRuntimeException("Error reading page." + "\nFile path: " + path.toUri().getPath() + http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java b/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java index 051ad4e..10fd5da 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java @@ -20,8 +20,8 @@ package org.apache.drill; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; import org.apache.drill.exec.memory.TopLevelAllocator; -import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; +import org.apache.drill.exec.testing.Controls; import org.apache.drill.exec.testing.ControlsInjectionUtil; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -36,26 +36,17 @@ public class TestAllocationException extends BaseTestQuery { private static final String SINGLE_MODE = "ALTER SESSION SET `planner.disable_exchanges` = true"; - private void testWithException(final String fileName) throws Exception { - testWithException(fileName, OutOfMemoryRuntimeException.class); - } - - private void testWithException(final String fileName, Class<? extends Throwable> exceptionClass) throws Exception{ + private void testWithException(final String fileName) throws Exception{ test(SINGLE_MODE); - CoordinationProtos.DrillbitEndpoint endpoint = bits[0].getContext().getEndpoint(); - - String controlsString = "{\"injections\":[{" - + "\"address\":\"" + endpoint.getAddress() + "\"," - + "\"port\":\"" + endpoint.getUserPort() + "\"," - + "\"type\":\"exception\"," - + "\"siteClass\":\"" + TopLevelAllocator.class.getName() + "\"," - + "\"desc\":\"" + TopLevelAllocator.CHILD_BUFFER_INJECTION_SITE + "\"," - + "\"nSkip\":200," - + "\"nFire\":1," - + "\"exceptionClass\":\"" + exceptionClass.getName() + "\"" - + "}]}"; - ControlsInjectionUtil.setControls(client, controlsString); + final String controls = Controls.newBuilder() + .addException(TopLevelAllocator.class, + TopLevelAllocator.CHILD_BUFFER_INJECTION_SITE, + OutOfMemoryRuntimeException.class, + 200, + 1 + ).build(); + ControlsInjectionUtil.setControls(client, controls); String query = getFile(fileName); @@ -71,12 +62,7 @@ public class TestAllocationException extends BaseTestQuery { } @Test - public void testWithNull() throws Exception{ - testWithException("queries/tpch/01.sql"); - } - - @Test public void testWithOOM() throws Exception{ - testWithException("queries/tpch/03.sql", NullPointerException.class); + testWithException("queries/tpch/01.sql"); } } http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java index 74ce225..0b8314c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java @@ -19,7 +19,8 @@ package org.apache.drill.exec.memory; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + import io.netty.buffer.DrillBuf; import java.util.Iterator; @@ -143,36 +144,23 @@ public class TestAllocators { ((AutoCloseable) oContext22).close(); // Fragment 3 asks for more and fails - boolean outOfMem=false; try { - DrillBuf b31b = oContext31.getAllocator().buffer(4400000); - if(b31b!=null) { - b31b.release(); - }else{ - outOfMem=true; - } - }catch(Exception e){ - outOfMem=true; + oContext31.getAllocator().buffer(4400000); + fail("Fragment 3 should fail to allocate buffer"); + } catch (OutOfMemoryRuntimeException e) { + // expected } - assertEquals(true, (boolean)outOfMem); // Operator is Exempt from Fragment limits. Fragment 3 asks for more and succeeds - outOfMem=false; OperatorContext oContext32 = fragmentContext3.newOperatorContext(physicalOperator6, false); - DrillBuf b32=null; try { - b32=oContext32.getAllocator().buffer(4400000); - }catch(Exception e){ - outOfMem=true; - }finally{ - if(b32!=null) { - b32.release(); - }else{ - outOfMem=true; - } + DrillBuf b32 = oContext32.getAllocator().buffer(4400000); + b32.release(); + } catch (OutOfMemoryRuntimeException e) { + fail("Fragment 3 failed to allocate buffer"); + } finally { closeOp(oContext32); } - assertEquals(false, (boolean)outOfMem); b11.release(); closeOp(oContext11);
