[
https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036655#comment-15036655
]
ASF GitHub Bot commented on DRILL-4134:
---------------------------------------
Github user adeneche commented on a diff in the pull request:
https://github.com/apache/drill/pull/283#discussion_r46477028
--- Diff:
exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
---
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import static org.apache.drill.exec.memory.BaseAllocator.indent;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.HistoricalLog;
+import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ops.BufferManager;
+
+import com.carrotsearch.hppc.LongObjectOpenHashMap;
+import com.google.common.base.Preconditions;
+
+/**
+ * Manages the relationship between one or more allocators and a
particular UDLE. Ensures that one allocator owns the
+ * memory that multiple allocators may be referencing. Manages a
BufferLedger between each of its associated allocators.
+ * This class is also responsible for managing when memory is allocated
and returned to the Netty-based
+ * PooledByteBufAllocatorL.
+ *
+ * The only reason that this isn't package private is we're forced to put
DrillBuf in Netty's package which need access
+ * to these objects or methods.
+ *
+ * Threading: AllocatorManager manages thread-safety internally.
Operations within the context of a single BufferLedger
+ * are lockless in nature and can be leveraged by multiple threads.
Operations that cross the context of two ledgers
+ * will acquire a lock on the AllocatorManager instance. Important note,
there is one AllocatorManager per
+ * UnsafeDirectLittleEndian buffer allocation. As such, there will be
thousands of these in a typical query. The
+ * contention of acquiring a lock on AllocatorManager should be very low.
+ *
+ */
+public class AllocatorManager {
+ // private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(AllocatorManager.class);
+
+ private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
+ static final PooledByteBufAllocatorL INNER_ALLOCATOR = new
PooledByteBufAllocatorL(DrillMetrics.getInstance());
+
+ private final RootAllocator root;
+ private volatile BufferLedger owningLedger;
+ private final int size;
+ private final UnsafeDirectLittleEndian underlying;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final LongObjectOpenHashMap<BufferLedger> map = new
LongObjectOpenHashMap<>();
+ private final AutoCloseableLock readLock = new
AutoCloseableLock(lock.readLock());
+ private final AutoCloseableLock writeLock = new
AutoCloseableLock(lock.writeLock());
+ private final IdentityHashMap<DrillBuf, Object> buffers =
+ BaseAllocator.DEBUG ? new IdentityHashMap<DrillBuf, Object>() : null;
+
+ AllocatorManager(BaseAllocator accountingAllocator, int size) {
+ Preconditions.checkNotNull(accountingAllocator);
+ this.root = accountingAllocator.root;
+ this.underlying = INNER_ALLOCATOR.allocate(size);
+ this.owningLedger = associate(accountingAllocator);
+ this.size = underlying.capacity();
+ }
+
+ /**
+ * Associate the existing underlying buffer with a new allocator.
+ *
+ * @param allocator
+ * The target allocator to associate this buffer with.
+ * @return The Ledger (new or existing) that associates the underlying
buffer to this new ledger.
+ */
+ public BufferLedger associate(BaseAllocator allocator) {
+ if (root != allocator.root) {
+ throw new IllegalStateException(
+ "A buffer can only be associated between two allocators that
share the same root.");
+ }
+
+ final long allocatorId = allocator.getId();
+ try (AutoCloseableLock read = readLock.open()) {
+
+ final BufferLedger ledger = map.get(allocatorId);
+ if (ledger != null) {
+ return ledger;
+ }
+
+ }
+ try (AutoCloseableLock write = writeLock.open()) {
+ final BufferLedger ledger = new BufferLedger(allocator, new
ReleaseListener(allocatorId));
+ map.put(allocatorId, ledger);
+ allocator.associateLedger(ledger);
+ return ledger;
+ }
+ }
+
+
+ /**
+ * The way that a particular BufferLedger communicates back to the
AllocatorManager that it now longer needs to hold a
+ * reference to particular piece of memory.
+ */
+ private class ReleaseListener {
+
+ private final long allocatorId;
+
+ public ReleaseListener(long allocatorId) {
+ this.allocatorId = allocatorId;
+ }
+
+ public void release() {
+ try (AutoCloseableLock write = writeLock.open()) {
+ final BufferLedger oldLedger = map.remove(allocatorId);
+ oldLedger.allocator.dissociateLedger(oldLedger);
+
+ if (oldLedger == owningLedger) {
+ if (map.isEmpty()) {
+ // no one else owns, lets release.
+ oldLedger.allocator.releaseBytes(size);
+ underlying.release();
+ } else {
+ // we need to change the owning allocator. we've been removed
so we'll get whatever is top of list
+ BufferLedger newLedger = map.iterator().next().value;
+
+ // we'll forcefully transfer the ownership and not worry about
whether we exceeded the limit
+ // since this consumer can do anything with this.
+ oldLedger.transferBalance(newLedger);
+ owningLedger = newLedger;
+ }
+ }
+
+
+ }
+ }
+ }
+
+ /**
+ * Simple wrapper class that allows Locks to be released via an
try-with-resources block.
+ */
+ private class AutoCloseableLock implements AutoCloseable {
+
+ private final Lock lock;
+
+ public AutoCloseableLock(Lock lock) {
+ this.lock = lock;
+ }
+
+ public AutoCloseableLock open() {
+ lock.lock();
+ return this;
+ }
+
+ @Override
+ public void close() {
+ lock.unlock();
+ }
+
+ }
+
+ /**
+ * The reference manager that binds an allocator manager to a particular
BaseAllocator. Also responsible for creating
+ * a set of DrillBufs that share a common fate and set of reference
counts.
+ *
+ * As with AllocatorManager, the only reason this is public is due to
DrillBuf being in io.netty.buffer package.
+ */
+ public class BufferLedger {
+ private final long id = LEDGER_ID_GENERATOR.incrementAndGet(); //
unique ID assigned to each ledger
+ private final AtomicInteger bufRefCnt = new AtomicInteger(0); // start
at zero so we can manage request for retain
+ //
correctly
+ private final BaseAllocator allocator;
+ private final ReleaseListener listener;
+ private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new
HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
+ "BufferLedger[%d]", 1)
+ : null;
+
+ private BufferLedger(BaseAllocator allocator, ReleaseListener
listener) {
+ this.allocator = allocator;
+ this.listener = listener;
+ }
+
+ /**
+ * Transfer any balance the current ledger has to the target ledger.
In the case that the current ledger holds no
+ * memory, no transfer is made to the new ledger.
+ *
+ * @param target
+ * The ledger to transfer ownership account to.
+ * @return Whether transfer fit within target ledgers limits.
+ */
+ public boolean transferBalance(BufferLedger target) {
+ Preconditions.checkNotNull(target);
+ Preconditions.checkArgument(allocator.root == target.allocator.root,
+ "You can only transfer between two allocators that share the
same root.");
+
+ // since two balance transfers out from the allocator manager could
cause incorrect accounting, we need to ensure
+ // that this won't happen by synchronizing on the allocator manager
instance.
+ synchronized (AllocatorManager.this) {
+ if (this != owningLedger || target == this) {
+ return true;
+ }
+
+ if (BaseAllocator.DEBUG) {
+ this.historicalLog.recordEvent("transferBalance(%s)",
target.allocator.name);
+ target.historicalLog.recordEvent("incoming(from %s)",
owningLedger.allocator.name);
+ }
+
+ boolean overlimit = target.allocator.forceAllocate(size);
+ allocator.releaseBytes(size);
+ owningLedger = target;
+ return overlimit;
+ }
+
+ }
+
+ /**
+ * Print the current ledger state to a the provided StringBuilder.
+ *
+ * @param sb
+ * The StringBuilder to populate.
+ * @param indent
+ * The level of indentation to position the data.
+ * @param verbosity
+ * The level of verbosity to print.
+ */
+ public void print(StringBuilder sb, int indent, Verbosity verbosity) {
+ indent(sb, indent)
+ .append("ledger (allocator: ")
+ .append(allocator.name)
+ .append("), isOwning: ")
+ .append(owningLedger == this)
+ .append(", size: ")
+ .append(size)
+ .append(", references: ")
+ .append(bufRefCnt.get())
+ .append('\n');
+
+ if (BaseAllocator.DEBUG) {
+ // This doesn't seem as useful as the individual buffer logs
below. Removing from default presentation.
+ // if (verbosity.includeHistoricalLog) {
+ // historicalLog.buildHistory(sb, indent + 2,
verbosity.includeStackTraces);
+ // }
+ synchronized (buffers) {
+ indent(sb, indent + 1).append("BufferLedger[" + id + "] holds
").append(buffers.size())
+ .append(" buffers. \n");
+ for (DrillBuf buf : buffers.keySet()) {
+ buf.print(sb, indent + 2, verbosity);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Release this ledger. This means that all reference counts
associated with this ledger are no longer used. This
+ * will inform the AllocatorManager to make a decision about how to
manage any memory owned by this particular
+ * BufferLedger
+ */
+ public void release() {
+ listener.release();
+ }
+
+ /**
+ * Returns the ledger associated with a particular BufferAllocator. If
the BufferAllocator doesn't currently have a
+ * ledger associated with this AllocatorManager, a new one is created.
This is placed on BufferLedger rather than
+ * AllocatorManager direclty because DrillBufs don't have access to
AllocatorManager and they are the ones
+ * responsible for exposing the ability to associate mutliple
allocators with a particular piece of underlying
+ * memory.
+ *
+ * @param allocator
+ * @return
+ */
+ public BufferLedger getLedgerForAllocator(BufferAllocator allocator) {
+ return associate((BaseAllocator) allocator);
+ }
+
+ /**
+ * Create a new DrillBuf associated with this AllocatorManager and
memory. Does not impact reference count.
+ * Typically used for slicing.
+ * @param offset
+ * The offset in bytes to start this new DrillBuf.
+ * @param length
+ * The length in bytes that this DrillBuf will provide access
to.
+ * @return A new DrillBuf that shares references with all DrillBufs
associated with this BufferLedger
+ */
+ public DrillBuf newDrillBuf(int offset, int length) {
+ return newDrillBuf(offset, length, null, false);
+ }
+
+ /**
+ * Create a new DrillBuf associated with this AllocatorManager and
memory.
+ * @param offset
+ * The offset in bytes to start this new DrillBuf.
+ * @param length
+ * The length in bytes that this DrillBuf will provide access
to.
+ * @param manager
+ * An optional BufferManager argument that can be used to
manage expansion of this DrillBuf
+ * @param retain
+ * Whether or not the newly created buffer should get an
additional reference count added to it.
+ * @return A new DrillBuf that shares references with all DrillBufs
associated with this BufferLedger
+ * @return
--- End diff --
extra `@return`
> Incorporate remaining patches from DRILL-1942 Allocator refactor
> ----------------------------------------------------------------
>
> Key: DRILL-4134
> URL: https://issues.apache.org/jira/browse/DRILL-4134
> Project: Apache Drill
> Issue Type: Sub-task
> Components: Execution - Flow
> Reporter: Jacques Nadeau
> Assignee: Jacques Nadeau
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)