Repository: tajo Updated Branches: refs/heads/master 374929d86 -> 0a1a41d25
TAJO-1568: Apply UnpooledByteBufAllocator when a tajo.test.enabled is set to enable. (jinho) Closes #540 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0a1a41d2 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0a1a41d2 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0a1a41d2 Branch: refs/heads/master Commit: 0a1a41d258b083810601fb5790d423a5763f5051 Parents: 374929d Author: Jinho Kim <[email protected]> Authored: Mon Apr 20 11:17:16 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Mon Apr 20 11:17:16 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/util/NumberUtil.java | 26 ++++---- .../java/org/apache/tajo/QueryTestCaseBase.java | 8 +-- .../org/apache/tajo/storage/BufferPool.java | 65 +++++++++++++++++--- 4 files changed, 78 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0a1a41d2/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index cbf416d..694e2ae 100644 --- a/CHANGES +++ b/CHANGES @@ -190,6 +190,9 @@ Release 0.11.0 - unreleased TASKS + TAJO-1568: Apply UnpooledByteBufAllocator when a tajo.test.enabled + is set to enable. (jinho) + TAJO-1567: Update old license in some pom.xml files. (Contributed by Dongjoon Hyun, Committed by jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/0a1a41d2/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java index 9e16cec..0d70cc2 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java @@ -604,14 +604,14 @@ public class NumberUtil { * @throws NumberFormatException if the argument could not be parsed as a double */ public static double parseDouble(ByteBuf bytes, int start, int length) { - if (!PlatformDependent.hasUnsafe()) { - return parseDouble(bytes.array(), start, length); - } - if (bytes == null) { throw new NumberFormatException("String is null"); } + if (!bytes.hasMemoryAddress()) { + return parseDouble(bytes.array(), start, length); + } + if (length == 0 || bytes.writerIndex() < start + length) { throw new NumberFormatException("Empty string or Invalid buffer!"); } @@ -815,13 +815,14 @@ public class NumberUtil { * @throws NumberFormatException if the argument could not be parsed as an int quantity. */ public static int parseInt(ByteBuf bytes, int start, int length, int radix) { - if (!PlatformDependent.hasUnsafe()) { - return parseInt(bytes.array(), start, length); - } - if (bytes == null) { throw new NumberFormatException("String is null"); } + + if (!bytes.hasMemoryAddress()) { + return parseInt(bytes.array(), start, length); + } + if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) { throw new NumberFormatException("Invalid radix: " + radix); } @@ -942,13 +943,14 @@ public class NumberUtil { * @throws NumberFormatException if the argument could not be parsed as an long quantity. */ public static long parseLong(ByteBuf bytes, int start, int length, int radix) { - if (!PlatformDependent.hasUnsafe()) { - return parseInt(bytes.array(), start, length); - } - if (bytes == null) { throw new NumberFormatException("String is null"); } + + if (!bytes.hasMemoryAddress()) { + return parseInt(bytes.array(), start, length); + } + if (radix < Character.MIN_RADIX || radix > Character.MAX_RADIX) { throw new NumberFormatException("Invalid radix: " + radix); } http://git-wip-us.apache.org/repos/asf/tajo/blob/0a1a41d2/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index ede667e..22c3640 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -21,12 +21,9 @@ package org.apache.tajo; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; import org.apache.tajo.algebra.*; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.CatalogService; @@ -60,7 +57,6 @@ import org.junit.runner.Description; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.lang.annotation.Annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @@ -226,7 +222,9 @@ public class QueryTestCaseBase { @Before public void printTestName() { /* protect a travis stalled build */ - System.out.println("Run: " + name.getMethodName()); + System.out.println("Run: " + name.getMethodName() + + " Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) + / (1024 * 1024)) + "MBytes"); } public QueryTestCaseBase() { http://git-wip-us.apache.org/repos/asf/tajo/blob/0a1a41d2/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java index e4f9072..d611ee3 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -19,24 +19,75 @@ package org.apache.tajo.storage; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.ResourceLeakDetector; import io.netty.util.internal.PlatformDependent; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.util.CommonTestingUtil; + +import java.lang.reflect.Field; /* this class is PooledBuffer holder */ public class BufferPool { - private static final PooledByteBufAllocator allocator; + public static final String ALLOW_CACHE = "tajo.storage.buffer.thread-local.cache"; + private static final ByteBufAllocator ALLOCATOR; private BufferPool() { } static { - //TODO we need determine the default params - allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + /* TODO Enable thread cache + * Create a pooled ByteBuf allocator but disables the thread-local cache. + * Because the TaskRunner thread is newly created + * */ + + if (System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + /* Disable pooling buffers for memory usage */ + ALLOCATOR = UnpooledByteBufAllocator.DEFAULT; + + /* if you are finding memory leak, please enable this line */ + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + } else { + TajoConf tajoConf = new TajoConf(); + ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false), 0); + } + } + + /** + * borrowed from Spark + */ + public static PooledByteBufAllocator createPooledByteBufAllocator( + boolean allowDirectBufs, + boolean allowCache, + int numCores) { + if (numCores == 0) { + numCores = Runtime.getRuntime().availableProcessors(); + } + return new PooledByteBufAllocator( + allowDirectBufs && PlatformDependent.directBufferPreferred(), + Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores), + Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0), + getPrivateStaticField("DEFAULT_PAGE_SIZE"), + getPrivateStaticField("DEFAULT_MAX_ORDER"), + allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0 + ); + } - /* if you are finding memory leak, please enable this line */ - //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + /** Used to get defaults from Netty's private static fields. */ + private static int getPrivateStaticField(String name) { + try { + Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.getInt(null); + } catch (Exception e) { + throw new RuntimeException(e); + } } public static long maxDirectMemory() { @@ -45,7 +96,7 @@ public class BufferPool { public static ByteBuf directBuffer(int size) { - return allocator.directBuffer(size); + return ALLOCATOR.directBuffer(size); } /** @@ -55,7 +106,7 @@ public class BufferPool { * @return allocated ByteBuf from pool */ public static ByteBuf directBuffer(int size, int max) { - return allocator.directBuffer(size, max); + return ALLOCATOR.directBuffer(size, max); } @InterfaceStability.Unstable
