Repository: tajo Updated Branches: refs/heads/branch-0.10.1 4a02456d3 -> 5e1fa93b5
TAJO-1568: Apply UnpooledByteBufAllocator when a tajo.test.enabled is set to enable. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5e1fa93b Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5e1fa93b Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5e1fa93b Branch: refs/heads/branch-0.10.1 Commit: 5e1fa93b53cc4b575996a4aceaeb781567dc47d6 Parents: 4a02456 Author: Jinho Kim <[email protected]> Authored: Mon Apr 20 11:13:14 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Mon Apr 20 11:13:14 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/util/NumberUtil.java | 26 ++++---- .../java/org/apache/tajo/QueryTestCaseBase.java | 4 +- .../org/apache/tajo/storage/BufferPool.java | 67 +++++++++++++++++--- 4 files changed, 79 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/5e1fa93b/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index fd133d5..806da48 100644 --- a/CHANGES +++ b/CHANGES @@ -83,6 +83,9 @@ Release 0.10.1 - unreleased TASKS + TAJO-1568: Apply UnpooledByteBufAllocator when a tajo.test.enabled + is set to enable. (jinho) + TAJO-1567: Update old license in some pom.xml files. (Contributed by Dongjoon Hyun, Committed by jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/5e1fa93b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java index 9e16cec..0d70cc2 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java @@ -604,14 +604,14 @@ public class NumberUtil { * @throws NumberFormatException if the argument could not be parsed as a double */ public static double parseDouble(ByteBuf bytes, int start, int length) { - if (!PlatformDependent.hasUnsafe()) { - return parseDouble(bytes.array(), start, length); - } - if (bytes == null) { throw new NumberFormatException("String is null"); } + if (!bytes.hasMemoryAddress()) { + return parseDouble(bytes.array(), start, length); + } + if (length == 0 || bytes.writerIndex() < start + length) { throw new NumberFormatException("Empty string or Invalid buffer!"); } @@ -815,13 +815,14 @@ public class NumberUtil { * @throws NumberFormatException if the argument could not be parsed as an int quantity. */ public static int parseInt(ByteBuf bytes, int start, int length, int radix) { - if (!PlatformDependent.hasUnsafe()) { - return parseInt(bytes.array(), start, length); - } - if (bytes == null) { throw new NumberFormatException("String is null"); } + + if (!bytes.hasMemoryAddress()) { + return parseInt(bytes.array(), start, length); + } + if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) { throw new NumberFormatException("Invalid radix: " + radix); } @@ -942,13 +943,14 @@ public class NumberUtil { * @throws NumberFormatException if the argument could not be parsed as an long quantity. */ public static long parseLong(ByteBuf bytes, int start, int length, int radix) { - if (!PlatformDependent.hasUnsafe()) { - return parseInt(bytes.array(), start, length); - } - if (bytes == null) { throw new NumberFormatException("String is null"); } + + if (!bytes.hasMemoryAddress()) { + return parseInt(bytes.array(), start, length); + } + if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) { throw new NumberFormatException("Invalid radix: " + radix); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5e1fa93b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 15fbdae..ddfa7a6 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -212,7 +212,9 @@ public class QueryTestCaseBase { @Before public void printTestName() { /* protect a travis stalled build */ - System.out.println("Run: " + name.getMethodName()); + System.out.println("Run: " + name.getMethodName() + + " Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) + / (1024 * 1024)) + "MBytes"); } public QueryTestCaseBase() { http://git-wip-us.apache.org/repos/asf/tajo/blob/5e1fa93b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java index 85c79fa..d611ee3 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -19,24 +19,75 @@ package org.apache.tajo.storage; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.ResourceLeakDetector; import io.netty.util.internal.PlatformDependent; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.util.CommonTestingUtil; + +import java.lang.reflect.Field; /* this class is PooledBuffer holder */ public class BufferPool { - private static final PooledByteBufAllocator allocator; + public static final String ALLOW_CACHE = "tajo.storage.buffer.thread-local.cache"; + private static final ByteBufAllocator ALLOCATOR; private BufferPool() { } static { - //TODO we need determine the default params - allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + /* TODO Enable thread cache + * Create a pooled ByteBuf allocator but disables the thread-local cache. + * Because the TaskRunner thread is newly created + * */ + + if (System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + /* Disable pooling buffers for memory usage */ + ALLOCATOR = UnpooledByteBufAllocator.DEFAULT; + + /* if you are finding memory leak, please enable this line */ + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + } else { + TajoConf tajoConf = new TajoConf(); + ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false), 0); + } + } + + /** + * borrowed from Spark + */ + public static PooledByteBufAllocator createPooledByteBufAllocator( + boolean allowDirectBufs, + boolean allowCache, + int numCores) { + if (numCores == 0) { + numCores = Runtime.getRuntime().availableProcessors(); + } + return new PooledByteBufAllocator( + allowDirectBufs && PlatformDependent.directBufferPreferred(), + Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores), + Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0), + getPrivateStaticField("DEFAULT_PAGE_SIZE"), + getPrivateStaticField("DEFAULT_MAX_ORDER"), + allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0 + ); + } - /* if you are finding memory leak, please enable this line */ - //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + /** Used to get defaults from Netty's private static fields. */ + private static int getPrivateStaticField(String name) { + try { + Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.getInt(null); + } catch (Exception e) { + throw new RuntimeException(e); + } } public static long maxDirectMemory() { @@ -44,8 +95,8 @@ public class BufferPool { } - public synchronized static ByteBuf directBuffer(int size) { - return allocator.directBuffer(size); + public static ByteBuf directBuffer(int size) { + return ALLOCATOR.directBuffer(size); } /** @@ -55,7 +106,7 @@ public class BufferPool { * @return allocated ByteBuf from pool */ public static ByteBuf directBuffer(int size, int max) { - return allocator.directBuffer(size, max); + return ALLOCATOR.directBuffer(size, max); } @InterfaceStability.Unstable
