[
https://issues.apache.org/jira/browse/DRILL-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15038653#comment-15038653
]
ASF GitHub Bot commented on DRILL-4134:
---------------------------------------
Github user adeneche commented on a diff in the pull request:
https://github.com/apache/drill/pull/283#discussion_r46617075
--- Diff:
exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocatorManager.java
---
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import static org.apache.drill.exec.memory.BaseAllocator.indent;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.HistoricalLog;
+import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ops.BufferManager;
+
+import com.carrotsearch.hppc.LongObjectOpenHashMap;
+import com.google.common.base.Preconditions;
+
+/**
+ * Manages the relationship between one or more allocators and a
particular UDLE. Ensures that one allocator owns the
+ * memory that multiple allocators may be referencing. Manages a
BufferLedger between each of its associated allocators.
+ * This class is also responsible for managing when memory is allocated
and returned to the Netty-based
+ * PooledByteBufAllocatorL.
+ *
+ * The only reason that this isn't package private is we're forced to put
DrillBuf in Netty's package which need access
+ * to these objects or methods.
+ *
+ * Threading: AllocatorManager manages thread-safety internally.
Operations within the context of a single BufferLedger
+ * are lockless in nature and can be leveraged by multiple threads.
Operations that cross the context of two ledgers
+ * will acquire a lock on the AllocatorManager instance. Important note,
there is one AllocatorManager per
+ * UnsafeDirectLittleEndian buffer allocation. As such, there will be
thousands of these in a typical query. The
+ * contention of acquiring a lock on AllocatorManager should be very low.
+ *
+ */
+public class AllocatorManager {
+ // private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(AllocatorManager.class);
+
+ private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0);
+ static final PooledByteBufAllocatorL INNER_ALLOCATOR = new
PooledByteBufAllocatorL(DrillMetrics.getInstance());
+
+ private final RootAllocator root;
+ private volatile BufferLedger owningLedger;
+ private final int size;
+ private final UnsafeDirectLittleEndian underlying;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final LongObjectOpenHashMap<BufferLedger> map = new
LongObjectOpenHashMap<>();
+ private final AutoCloseableLock readLock = new
AutoCloseableLock(lock.readLock());
+ private final AutoCloseableLock writeLock = new
AutoCloseableLock(lock.writeLock());
+ private final IdentityHashMap<DrillBuf, Object> buffers =
+ BaseAllocator.DEBUG ? new IdentityHashMap<DrillBuf, Object>() : null;
+
+ AllocatorManager(BaseAllocator accountingAllocator, int size) {
+ Preconditions.checkNotNull(accountingAllocator);
+ this.root = accountingAllocator.root;
+ this.underlying = INNER_ALLOCATOR.allocate(size);
+ this.owningLedger = associate(accountingAllocator);
+ this.size = underlying.capacity();
+ }
+
+ /**
+ * Associate the existing underlying buffer with a new allocator.
+ *
+ * @param allocator
+ * The target allocator to associate this buffer with.
+ * @return The Ledger (new or existing) that associates the underlying
buffer to this new ledger.
+ */
+ public BufferLedger associate(BaseAllocator allocator) {
+ if (root != allocator.root) {
+ throw new IllegalStateException(
+ "A buffer can only be associated between two allocators that
share the same root.");
+ }
+
+ final long allocatorId = allocator.getId();
+ try (AutoCloseableLock read = readLock.open()) {
+
+ final BufferLedger ledger = map.get(allocatorId);
+ if (ledger != null) {
+ return ledger;
+ }
+
+ }
+ try (AutoCloseableLock write = writeLock.open()) {
+ final BufferLedger ledger = new BufferLedger(allocator, new
ReleaseListener(allocatorId));
+ map.put(allocatorId, ledger);
+ allocator.associateLedger(ledger);
+ return ledger;
+ }
+ }
+
+
+ /**
+ * The way that a particular BufferLedger communicates back to the
AllocatorManager that it now longer needs to hold a
+ * reference to particular piece of memory.
+ */
+ private class ReleaseListener {
+
+ private final long allocatorId;
+
+ public ReleaseListener(long allocatorId) {
+ this.allocatorId = allocatorId;
+ }
+
+ public void release() {
+ try (AutoCloseableLock write = writeLock.open()) {
+ final BufferLedger oldLedger = map.remove(allocatorId);
+ oldLedger.allocator.dissociateLedger(oldLedger);
+
+ if (oldLedger == owningLedger) {
+ if (map.isEmpty()) {
+ // no one else owns, lets release.
+ oldLedger.allocator.releaseBytes(size);
+ underlying.release();
+ } else {
+ // we need to change the owning allocator. we've been removed
so we'll get whatever is top of list
+ BufferLedger newLedger = map.iterator().next().value;
+
+ // we'll forcefully transfer the ownership and not worry about
whether we exceeded the limit
+ // since this consumer can do anything with this.
+ oldLedger.transferBalance(newLedger);
+ owningLedger = newLedger;
--- End diff --
`BufferLedger.transferBalance()` already changes `owningLedger` to
`newLedger`
> Incorporate remaining patches from DRILL-1942 Allocator refactor
> ----------------------------------------------------------------
>
> Key: DRILL-4134
> URL: https://issues.apache.org/jira/browse/DRILL-4134
> Project: Apache Drill
> Issue Type: Sub-task
> Components: Execution - Flow
> Reporter: Jacques Nadeau
> Assignee: Jacques Nadeau
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)