[
https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036659#comment-15036659
]
ASF GitHub Bot commented on DRILL-4134:
---------------------------------------
Github user adeneche commented on a diff in the pull request:
https://github.com/apache/drill/pull/283#discussion_r46477393
--- Diff:
exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
---
@@ -0,0 +1,689 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.drill.common.HistoricalLog;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.AllocatorManager.BufferLedger;
+import org.apache.drill.exec.ops.BufferManager;
+import org.apache.drill.exec.util.AssertionUtil;
+
+import com.google.common.base.Preconditions;
+
+public abstract class BaseAllocator extends Accountant implements
BufferAllocator {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
+
+ public static final String DEBUG_ALLOCATOR =
"drill.memory.debug.allocator";
+
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+ private static final int CHUNK_SIZE =
AllocatorManager.INNER_ALLOCATOR.getChunkSize();
+
+ public static final int DEBUG_LOG_LENGTH = 6;
+ public static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
+ || Boolean.parseBoolean(System.getProperty(DEBUG_ALLOCATOR,
"false"));
+ private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
+
+ private final BaseAllocator parentAllocator;
+ private final ByteBufAllocator thisAsByteBufAllocator;
+ private final IdentityHashMap<BaseAllocator, Object> childAllocators;
+ private final DrillBuf empty;
+
+ private volatile boolean isClosed = false; // the allocator has been
closed
+
+ // Package exposed for sharing between AllocatorManger and BaseAllocator
objects
+ final long id = ID_GENERATOR.incrementAndGet(); // unique ID assigned to
each allocator
+ final String name;
+ final RootAllocator root;
+
+ // members used purely for debugging
+ private final IdentityHashMap<BufferLedger, Object> childLedgers;
+ private final IdentityHashMap<Reservation, Object> reservations;
+ private final HistoricalLog historicalLog;
+
+ protected BaseAllocator(
+ final BaseAllocator parentAllocator,
+ final String name,
+ final long initReservation,
+ final long maxAllocation) throws OutOfMemoryException {
+ super(parentAllocator, initReservation, maxAllocation);
+
+ if (parentAllocator != null) {
+ this.root = parentAllocator.root;
+ empty = parentAllocator.empty;
+ } else if (this instanceof RootAllocator) {
+ this.root = (RootAllocator) this;
+ empty = createEmpty();
+ } else {
+ throw new IllegalStateException("An parent allocator must either
carry a root or be the root.");
+ }
+
+ this.parentAllocator = parentAllocator;
+ this.name = name;
+
+ // TODO: DRILL-4131
+ // this.thisAsByteBufAllocator = new DrillByteBufAllocator(this);
+ this.thisAsByteBufAllocator =
AllocatorManager.INNER_ALLOCATOR.allocator;
+
+ if (DEBUG) {
+ childAllocators = new IdentityHashMap<>();
+ reservations = new IdentityHashMap<>();
+ childLedgers = new IdentityHashMap<>();
+ historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%d]",
id);
+ hist("created by \"%s\", owned = %d", name.toString(),
this.getAllocatedMemory());
+ } else {
+ childAllocators = null;
+ reservations = null;
+ historicalLog = null;
+ childLedgers = null;
+ }
+
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public DrillBuf getEmpty() {
+ return empty;
+ }
+
+ /**
+ * For debug/verification purposes only. Allows an AllocatorManager to
tell the allocator that we have a new ledger
+ * associated with this allocator.
+ */
+ void associateLedger(BufferLedger ledger) {
+ if (DEBUG) {
+ synchronized (DEBUG_LOCK) {
+ childLedgers.put(ledger, null);
+ }
+ }
+ }
+
+ /**
+ * For debug/verification purposes only. Allows an AllocatorManager to
tell the allocator that we are removing a
+ * ledger associated with this allocator
+ */
+ void dissociateLedger(BufferLedger ledger) {
+ if (DEBUG) {
+ synchronized (DEBUG_LOCK) {
+ if (!childLedgers.containsKey(ledger)) {
+ throw new IllegalStateException("Trying to remove a child ledger
that doesn't exist.");
+ }
+ childLedgers.remove(ledger);
+ }
+ }
+ }
+
+ /**
+ * Track when a ChildAllocator of this BaseAllocator is closed. Used for
debugging purposes.
+ *
+ * @param childAllocator
+ * The child allocator that has been closed.
+ */
+ private void childClosed(final BaseAllocator childAllocator) {
+ if (DEBUG) {
+ Preconditions.checkArgument(childAllocator != null, "child allocator
can't be null");
+
+ synchronized (DEBUG_LOCK) {
+ final Object object = childAllocators.remove(childAllocator);
+ if (object == null) {
+ childAllocator.historicalLog.logHistory(logger);
+ throw new IllegalStateException("Child allocator[" +
childAllocator.id
+ + "] not found in parent allocator[" + id + "]'s
childAllocators");
+ }
+ }
+ }
+ }
+
+ private static String createErrorMsg(final BufferAllocator allocator,
final int rounded, final int requested) {
+ if (rounded != requested) {
+ return String.format(
+ "Unable to allocate buffer of size %d (rounded from %d) due to
memory limit. Current allocation: %d",
+ rounded, requested, allocator.getAllocatedMemory());
+ } else {
+ return String.format("Unable to allocate buffer of size %d due to
memory limit. Current allocation: %d",
+ rounded, allocator.getAllocatedMemory());
+ }
+ }
+
+ @Override
+ public DrillBuf buffer(final int initialRequestSize) {
+ return buffer(initialRequestSize, null);
+ }
+
+ private DrillBuf createEmpty(){
+ return new DrillBuf(new AtomicInteger(), null,
AllocatorManager.INNER_ALLOCATOR.empty, null, null, 0, 0, true);
+ }
+
+ @Override
+ public DrillBuf buffer(final int initialRequestSize, BufferManager
manager) {
+
+ Preconditions.checkArgument(initialRequestSize >= 0, "the minimimum
requested size must be non-negative");
+ Preconditions.checkArgument(initialRequestSize >= 0, "the maximum
requested size must be non-negative");
+
+ if (initialRequestSize == 0) {
+ return empty;
+ }
+
+ // round to next largest power of two if we're within a chunk since
that is how our allocator operates
+ final int actualRequestSize = initialRequestSize < CHUNK_SIZE ?
+ nextPowerOfTwo(initialRequestSize)
+ : initialRequestSize;
+ AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
+ if (!outcome.isOk()) {
+ throw new OutOfMemoryException(createErrorMsg(this,
actualRequestSize, initialRequestSize));
+ }
+
+ boolean success = true;
+ try {
+ DrillBuf buffer = bufferWithoutReservation(actualRequestSize,
manager);
+ success = true;
+ return buffer;
+ } finally {
+ if (!success) {
+ releaseBytes(actualRequestSize);
+ }
+ }
+
+ }
+
+ /**
+ * Used by usual allocation as well as for allocating a pre-reserved
buffer. Skips the typical accounting associated
+ * with creating a new buffer.
+ */
+ private DrillBuf bufferWithoutReservation(final int size, BufferManager
bufferManager) throws OutOfMemoryException {
+ AllocatorManager manager = new AllocatorManager(this, size);
+ BufferLedger ledger = manager.associate(this);
+ DrillBuf buffer = ledger.newDrillBuf(0, size, bufferManager, true);
+
+ // make sure that our allocation is equal to what we expected.
+ Preconditions.checkArgument(buffer.capacity() == size,
+ "Allocated capacity %d was not equal to requested capacity %d.",
buffer.capacity(), size);
+
+ return buffer;
+ }
+
+ @Override
+ public ByteBufAllocator getAsByteBufAllocator() {
+ return thisAsByteBufAllocator;
+ }
+
+ /**
+ * Return a unique Id for an allocator. Id's may be recycled after a
long period of time.
+ *
+ * <p>
+ * Primary use for this is for debugging output.
+ * </p>
+ *
+ * @return the allocator's id
+ */
+ long getId() {
+ return id;
+ }
+
+ @Override
+ public BufferAllocator newChildAllocator(
+ final String name,
+ final long initReservation,
+ final long maxAllocation) {
+ final ChildAllocator childAllocator = new ChildAllocator(this, name,
initReservation, maxAllocation);
+
+ if (DEBUG) {
+ childAllocators.put(childAllocator, childAllocator);
+ historicalLog.recordEvent("allocator[%d] created new child
allocator[%d]",
+ id, childAllocator.id);
+ }
+
+ return childAllocator;
+ }
+
+ private class Reservation extends AllocationReservation {
+ private final HistoricalLog historicalLog;
+
+ public Reservation() {
+ if (DEBUG) {
+ historicalLog = new HistoricalLog("Reservation[allocator[%d],
%d]", id, System.identityHashCode(this));
+ historicalLog.recordEvent("created");
+ synchronized (DEBUG_LOCK) {
+ reservations.put(this, this);
+ }
+ } else {
+ historicalLog = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ if (DEBUG) {
+ if (!isClosed()) {
+ final Object object;
+ synchronized (DEBUG_LOCK) {
+ object = reservations.remove(this);
+ }
+ if (object == null) {
+ final StringBuilder sb = new StringBuilder();
+ print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
+ logger.debug(sb.toString());
+ throw new IllegalStateException(
+ String.format("Didn't find closing reservation[%d]",
System.identityHashCode(this)));
+ }
+
+ historicalLog.recordEvent("closed");
+ }
+ }
+
+ super.close();
+ }
+
+ @Override
+ protected boolean reserve(int nBytes) {
+ final AllocationOutcome outcome =
BaseAllocator.this.allocateBytes(nBytes);
+
+ if (DEBUG) {
+ historicalLog.recordEvent("reserve(%d) => %s", nBytes,
Boolean.toString(outcome.isOk()));
+ }
+
+ return outcome.isOk();
+ }
+
+ @Override
+ protected DrillBuf allocate(int nBytes) {
+ boolean success = false;
+
+ /*
+ * The reservation already added the requested bytes to the
allocators owned and allocated bytes via reserve().
+ * This ensures that they can't go away. But when we ask for the
buffer here, that will add to the allocated bytes
+ * as well, so we need to return the same number back to avoid
double-counting them.
+ */
+ try {
+ final DrillBuf drillBuf =
BaseAllocator.this.bufferWithoutReservation(nBytes, null);
+
+ if (DEBUG) {
+ historicalLog.recordEvent("allocate() => %s",
+ drillBuf == null ? "null" : String.format("DrillBuf[%d]",
drillBuf.getId()));
--- End diff --
`drillBuf == null` check is unnecessary, `bufferWithoutReservation()`
cannot return `null`
> Incorporate remaining patches from DRILL-1942 Allocator refactor
> ----------------------------------------------------------------
>
> Key: DRILL-4134
> URL: https://issues.apache.org/jira/browse/DRILL-4134
> Project: Apache Drill
> Issue Type: Sub-task
> Components: Execution - Flow
> Reporter: Jacques Nadeau
> Assignee: Jacques Nadeau
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)