[
https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034200#comment-15034200
]
ASF GitHub Bot commented on DRILL-4134:
---------------------------------------
Github user julienledem commented on a diff in the pull request:
https://github.com/apache/drill/pull/283#discussion_r46312830
--- Diff:
exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java ---
@@ -23,193 +23,246 @@
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
-public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
- static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(PooledByteBufAllocatorL.class);
-
+public class PooledByteBufAllocatorL {
private static final org.slf4j.Logger memoryLogger =
org.slf4j.LoggerFactory.getLogger("drill.allocator");
+
private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+
private static final String METRIC_PREFIX = "drill.allocator.";
+
private final MetricRegistry registry;
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
private final AtomicLong normalBufferSize = new AtomicLong(0);
private final AtomicLong normalBufferCount = new AtomicLong(0);
- private final PoolArena<ByteBuffer>[] directArenas;
- private final MemoryStatusThread statusThread;
- private final Histogram largeBuffersHist;
- private final Histogram normalBuffersHist;
+ public final InnerAllocator allocator;
+ public final UnsafeDirectLittleEndian empty;
public PooledByteBufAllocatorL(MetricRegistry registry) {
- super(true);
this.registry = registry;
+ allocator = new InnerAllocator();
+ empty = new UnsafeDirectLittleEndian(new
DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
+ }
+
+ public UnsafeDirectLittleEndian allocate(int size) {
try {
- Field f =
PooledByteBufAllocator.class.getDeclaredField("directArenas");
- f.setAccessible(true);
- this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
- } catch (Exception e) {
- throw new RuntimeException("Failure while initializing allocator.
Unable to retrieve direct arenas field.", e);
+ return allocator.directBuffer(size, size);
+ } catch (OutOfMemoryError e) {
+ throw new OutOfMemoryException("Failure allocating buffer.", e);
}
- if (memoryLogger.isTraceEnabled()) {
- statusThread = new MemoryStatusThread();
- statusThread.start();
- } else {
- statusThread = null;
- }
- removeOldMetrics();
+ }
- registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return normalBufferSize.get();
- }
- });
+ public int getChunkSize() {
+ return allocator.chunkSize;
+ }
- registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return normalBufferCount.get();
- }
- });
+ private class InnerAllocator extends PooledByteBufAllocator {
- registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return hugeBufferSize.get();
- }
- });
- registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return hugeBufferCount.get();
- }
- });
+ private final PoolArena<ByteBuffer>[] directArenas;
+ private final MemoryStatusThread statusThread;
+ private final Histogram largeBuffersHist;
+ private final Histogram normalBuffersHist;
+ private final int chunkSize;
- largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
- normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
+ public InnerAllocator() {
+ super(true);
- }
+ try {
+ Field f =
PooledByteBufAllocator.class.getDeclaredField("directArenas");
+ f.setAccessible(true);
+ this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
+ } catch (Exception e) {
+ throw new RuntimeException("Failure while initializing allocator.
Unable to retrieve direct arenas field.", e);
+ }
- private synchronized void removeOldMetrics() {
- registry.removeMatching(new MetricFilter() {
- @Override
- public boolean matches(String name, Metric metric) {
- return name.startsWith("drill.allocator.");
+ this.chunkSize = directArenas[0].chunkSize;
+
+ if (memoryLogger.isTraceEnabled()) {
+ statusThread = new MemoryStatusThread();
+ statusThread.start();
+ } else {
+ statusThread = null;
}
+ removeOldMetrics();
- });
- }
+ registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return normalBufferSize.get();
+ }
+ });
- @Override
- protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
- throw new UnsupportedOperationException("Drill doesn't support using
heap buffers.");
- }
+ registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return normalBufferCount.get();
+ }
+ });
+
+ registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return hugeBufferSize.get();
+ }
+ });
- @Override
- protected UnsafeDirectLittleEndian newDirectBuffer(int initialCapacity,
int maxCapacity) {
- PoolThreadCache cache = threadCache.get();
- PoolArena<ByteBuffer> directArena = cache.directArena;
+ registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return hugeBufferCount.get();
+ }
+ });
- if (directArena != null) {
+ largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
+ normalBuffersHist = registry.histogram(METRIC_PREFIX +
"normal.hist");
- if (initialCapacity > directArena.chunkSize) {
- // This is beyond chunk size so we'll allocate separately.
- ByteBuf buf =
UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+ }
- hugeBufferCount.incrementAndGet();
- hugeBufferSize.addAndGet(buf.capacity());
- largeBuffersHist.update(buf.capacity());
- // logger.debug("Allocating huge buffer of size {}",
initialCapacity, new Exception());
- return new UnsafeDirectLittleEndian(new LargeBuffer(buf,
hugeBufferSize, hugeBufferCount));
- } else {
- // within chunk, use arena.
- ByteBuf buf = directArena.allocate(cache, initialCapacity,
maxCapacity);
- if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
- fail();
+ private synchronized void removeOldMetrics() {
+ registry.removeMatching(new MetricFilter() {
+ @Override
+ public boolean matches(String name, Metric metric) {
+ return name.startsWith("drill.allocator.");
}
- normalBuffersHist.update(buf.capacity());
- if (ASSERT_ENABLED) {
- normalBufferSize.addAndGet(buf.capacity());
- normalBufferCount.incrementAndGet();
+ });
+ }
+
+ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity,
int maxCapacity) {
+ PoolThreadCache cache = threadCache.get();
+ PoolArena<ByteBuffer> directArena = cache.directArena;
+
+ if (directArena != null) {
+
+ if (initialCapacity > directArena.chunkSize) {
+ // This is beyond chunk size so we'll allocate separately.
+ ByteBuf buf =
UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+
+ hugeBufferCount.incrementAndGet();
+ hugeBufferSize.addAndGet(buf.capacity());
+ largeBuffersHist.update(buf.capacity());
+ // logger.debug("Allocating huge buffer of size {}",
initialCapacity, new Exception());
+ return new UnsafeDirectLittleEndian(new LargeBuffer(buf,
hugeBufferSize, hugeBufferCount));
+
+ } else {
+ // within chunk, use arena.
+ ByteBuf buf = directArena.allocate(cache, initialCapacity,
maxCapacity);
+ if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
+ fail();
+ }
+
+ normalBuffersHist.update(buf.capacity());
+ if (ASSERT_ENABLED) {
+ normalBufferSize.addAndGet(buf.capacity());
+ normalBufferCount.incrementAndGet();
+ }
+
+ return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf)
buf, normalBufferCount,
+ normalBufferSize);
}
- return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf)
buf, normalBufferCount, normalBufferSize);
+ } else {
+ throw fail();
}
-
- } else {
- throw fail();
}
- }
-
- private UnsupportedOperationException fail() {
- return new UnsupportedOperationException(
- "Drill requries that the JVM used supports access sun.misc.Unsafe.
This platform didn't provide that functionality.");
- }
+ private UnsupportedOperationException fail() {
+ return new UnsupportedOperationException(
+ "Drill requries that the JVM used supports access
sun.misc.Unsafe. This platform didn't provide that functionality.");
+ }
- @Override
- public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int
maxCapacity) {
+ public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int
maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
- newDirectBuffer(initialCapacity, maxCapacity);
+ newDirectBuffer(initialCapacity, maxCapacity);
}
validate(initialCapacity, maxCapacity);
- return newDirectBuffer(initialCapacity, maxCapacity);
- }
+ return newDirectBufferL(initialCapacity, maxCapacity);
+ }
- @Override
- public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
- throw new UnsupportedOperationException("Drill doesn't support using
heap buffers.");
- }
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+ throw new UnsupportedOperationException("Drill doesn't support using
heap buffers.");
+ }
- private static void validate(int initialCapacity, int maxCapacity) {
- if (initialCapacity < 0) {
+ private void validate(int initialCapacity, int maxCapacity) {
+ if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " +
initialCapacity + " (expectd: 0+)");
- }
- if (initialCapacity > maxCapacity) {
+ }
+ if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
- "initialCapacity: %d (expected: not greater than
maxCapacity(%d)",
- initialCapacity, maxCapacity));
+ "initialCapacity: %d (expected: not greater than
maxCapacity(%d)",
+ initialCapacity, maxCapacity));
+ }
}
- }
- private class MemoryStatusThread extends Thread {
+ private class MemoryStatusThread extends Thread {
- public MemoryStatusThread() {
- super("memory-status-logger");
- this.setDaemon(true);
- this.setName("allocation.logger");
- }
+ public MemoryStatusThread() {
+ super("memory-status-logger");
+ this.setDaemon(true);
+ this.setName("allocation.logger");
+ }
- @Override
- public void run() {
- while (true) {
- memoryLogger.trace("Memory Usage: \n{}",
PooledByteBufAllocatorL.this.toString());
- try {
- Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
- } catch (InterruptedException e) {
- return;
- }
+ @Override
+ public void run() {
+ while (true) {
+ memoryLogger.trace("Memory Usage: \n{}",
PooledByteBufAllocatorL.this.toString());
+ try {
+ Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
}
+
}
- }
+ public void checkAndReset() {
+ if (hugeBufferCount.get() != 0 || normalBufferCount.get() != 0) {
+ StringBuilder buf = new StringBuilder();
+ buf.append("Large buffers outstanding: ");
--- End diff --
String.format ?
> Incorporate remaining patches from DRILL-1942 Allocator refactor
> ----------------------------------------------------------------
>
> Key: DRILL-4134
> URL: https://issues.apache.org/jira/browse/DRILL-4134
> Project: Apache Drill
> Issue Type: Sub-task
> Components: Execution - Flow
> Reporter: Jacques Nadeau
> Assignee: Jacques Nadeau
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)