Repository: cassandra Updated Branches: refs/heads/trunk a562394ce -> f31f68957
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java deleted file mode 100644 index 4396caf..0000000 --- a/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.utils.memory; - -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.concurrent.OpOrder; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The SlabAllocator is a bump-the-pointer allocator that allocates - * large (2MB by default) regions and then doles them out to threads that request - * slices into the array. - * <p/> - * The purpose of this class is to combat heap fragmentation in long lived - * objects: by ensuring that all allocations with similar lifetimes - * only to large regions of contiguous memory, we ensure that large blocks - * get freed up at the same time. - * <p/> - * Otherwise, variable length byte arrays allocated end up - * interleaved throughout the heap, and the old generation gets progressively - * more fragmented until a stop-the-world compacting collection occurs. - */ -public class HeapSlabAllocator extends PoolAllocator -{ - private static final Logger logger = LoggerFactory.getLogger(HeapSlabAllocator.class); - - private final static int REGION_SIZE = 1024 * 1024; - private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region - - // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more - private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>(); - - private final AtomicReference<Region> currentRegion = new AtomicReference<Region>(); - private final AtomicInteger regionCount = new AtomicInteger(0); - private AtomicLong unslabbed = new AtomicLong(0); - - HeapSlabAllocator(Pool pool) - { - super(pool); - } - - public ByteBuffer allocate(int size) - { - return allocate(size, null); - } - - public ByteBuffer allocate(int size, OpOrder.Group opGroup) - { - assert size >= 0; - if (size == 0) - return ByteBufferUtil.EMPTY_BYTE_BUFFER; - - markAllocated(size, opGroup); - // satisfy large allocations directly from JVM since they don't cause fragmentation - // as badly, and fill up our regions quickly - if (size > MAX_CLONED_SIZE) - { - unslabbed.addAndGet(size); - return ByteBuffer.allocate(size); - } - - while (true) - { - Region region = getRegion(); - - // Try to allocate from this region - ByteBuffer cloned = region.allocate(size); - if (cloned != null) - return cloned; - - // not enough space! - currentRegion.compareAndSet(region, null); - } - } - - public void free(ByteBuffer name) - { - // have to assume we cannot free the memory here, and just reclaim it all when we flush - } - - /** - * Get the current region, or, if there is no current region, allocate a new one - */ - private Region getRegion() - { - while (true) - { - // Try to get the region - Region region = currentRegion.get(); - if (region != null) - return region; - - // No current region, so we want to allocate one. We race - // against other allocators to CAS in a Region, and if we fail we stash the region for re-use - region = RACE_ALLOCATED.poll(); - if (region == null) - region = new Region(REGION_SIZE); - if (currentRegion.compareAndSet(null, region)) - { - regionCount.incrementAndGet(); - logger.trace("{} regions now allocated in {}", regionCount, this); - return region; - } - - // someone else won race - that's fine, we'll try to grab theirs - // in the next iteration of the loop. - RACE_ALLOCATED.add(region); - } - } - - /** - * A region of memory out of which allocations are sliced. - * - * This serves two purposes: - * - to provide a step between initialization and allocation, so that racing to CAS a - * new region in is harmless - * - encapsulates the allocation offset - */ - private static class Region - { - /** - * Actual underlying data - */ - private ByteBuffer data; - - /** - * Offset for the next allocation, or the sentinel value -1 - * which implies that the region is still uninitialized. - */ - private AtomicInteger nextFreeOffset = new AtomicInteger(0); - - /** - * Total number of allocations satisfied from this buffer - */ - private AtomicInteger allocCount = new AtomicInteger(); - - /** - * Create an uninitialized region. Note that memory is not allocated yet, so - * this is cheap. - * - * @param size in bytes - */ - private Region(int size) - { - data = ByteBuffer.allocate(size); - } - - /** - * Try to allocate <code>size</code> bytes from the region. - * - * @return the successful allocation, or null to indicate not-enough-space - */ - public ByteBuffer allocate(int size) - { - while (true) - { - int oldOffset = nextFreeOffset.get(); - - if (oldOffset + size > data.capacity()) // capacity == remaining - return null; - - // Try to atomically claim this region - if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) - { - // we got the alloc - allocCount.incrementAndGet(); - return (ByteBuffer) data.duplicate().position(oldOffset).limit(oldOffset + size); - } - // we raced and lost alloc, try again - } - } - - @Override - public String toString() - { - return "Region@" + System.identityHashCode(this) + - " allocs=" + allocCount.get() + "waste=" + - (data.capacity() - nextFreeOffset.get()); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java b/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java deleted file mode 100644 index 0fceeef..0000000 --- a/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cassandra.utils.memory; - -import org.apache.cassandra.utils.concurrent.OpOrder; - -public class HeapSlabPool extends Pool -{ - public HeapSlabPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner) - { - super(maxOnHeapMemory, cleanupThreshold, cleaner); - } - - public HeapSlabAllocator newAllocator(OpOrder writes) - { - return new HeapSlabAllocator(this); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/Pool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/Pool.java b/src/java/org/apache/cassandra/utils/memory/Pool.java index e22b636..aa5e05c 100644 --- a/src/java/org/apache/cassandra/utils/memory/Pool.java +++ b/src/java/org/apache/cassandra/utils/memory/Pool.java @@ -18,140 +18,182 @@ */ package org.apache.cassandra.utils.memory; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.concurrent.WaitQueue; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; +import org.apache.cassandra.utils.concurrent.WaitQueue; /** * Represents an amount of memory used for a given purpose, that can be allocated to specific tasks through - * child AbstractAllocator objects. AbstractAllocator and MemoryTracker correspond approximately to PoolAllocator and Pool, - * respectively, with the MemoryTracker bookkeeping the total shared use of resources, and the AbstractAllocator the amount - * checked out and in use by a specific PoolAllocator. - * - * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners, - * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources, - * but only needs to allocate if there are none already available. This distinction is not always meaningful. + * child PoolAllocator objects. */ public abstract class Pool { - // total memory/resource permitted to allocate - public final long limit; + final PoolCleanerThread<?> cleaner; - // ratio of used to spare (both excluding 'reclaiming') at which to trigger a clean - public final float cleanThreshold; - - // total bytes allocated and reclaiming - private AtomicLong allocated = new AtomicLong(); - private AtomicLong reclaiming = new AtomicLong(); + // the total memory used by this pool + public final SubPool onHeap; + public final SubPool offHeap; final WaitQueue hasRoom = new WaitQueue(); - // a cache of the calculation determining at what allocation threshold we should next clean, and the cleaner we trigger - private volatile long nextClean; - private final PoolCleanerThread<?> cleanerThread; - - public Pool(long limit, float cleanThreshold, Runnable cleaner) + Pool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner) { - this.limit = limit; - this.cleanThreshold = cleanThreshold; - updateNextClean(); - cleanerThread = cleaner == null ? null : new PoolCleanerThread<>(this, cleaner); - if (cleanerThread != null) - cleanerThread.start(); + this.onHeap = getSubPool(maxOnHeapMemory, cleanThreshold); + this.offHeap = getSubPool(maxOffHeapMemory, cleanThreshold); + this.cleaner = getCleaner(cleaner); + if (this.cleaner != null) + this.cleaner.start(); } - /** Methods for tracking and triggering a clean **/ - - boolean needsCleaning() + SubPool getSubPool(long limit, float cleanThreshold) { - return used() >= nextClean && updateNextClean() && cleanerThread != null; + return new SubPool(limit, cleanThreshold); } - void maybeClean() + PoolCleanerThread<?> getCleaner(Runnable cleaner) { - if (needsCleaning()) - cleanerThread.trigger(); + return cleaner == null ? null : new PoolCleanerThread<>(this, cleaner); } - private boolean updateNextClean() + public abstract boolean needToCopyOnHeap(); + public abstract PoolAllocator newAllocator(); + + /** + * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners, + * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources, + * but only needs to allocate if there are none already available. This distinction is not always meaningful. + */ + public class SubPool { - long reclaiming = this.reclaiming.get(); - return used() >= (nextClean = reclaiming - + (long) (this.limit * cleanThreshold)); - } - /** Methods to allocate space **/ + // total memory/resource permitted to allocate + public final long limit; - boolean tryAllocate(int size) - { - while (true) + // ratio of used to spare (both excluding 'reclaiming') at which to trigger a clean + public final float cleanThreshold; + + // total bytes allocated and reclaiming + volatile long allocated; + volatile long reclaiming; + + // a cache of the calculation determining at what allocation threshold we should next clean + volatile long nextClean; + + public SubPool(long limit, float cleanThreshold) { - long cur; - if ((cur = allocated.get()) + size > limit) - return false; - if (allocated.compareAndSet(cur, cur + size)) + this.limit = limit; + this.cleanThreshold = cleanThreshold; + } + + /** Methods for tracking and triggering a clean **/ + + boolean needsCleaning() + { + // use strictly-greater-than so we don't clean when limit is 0 + return used() > nextClean && updateNextClean(); + } + + void maybeClean() + { + if (needsCleaning() && cleaner != null) + cleaner.trigger(); + } + + private boolean updateNextClean() + { + while (true) { - maybeClean(); - return true; + long current = nextClean; + long reclaiming = this.reclaiming; + long next = reclaiming + (long) (this.limit * cleanThreshold); + if (current == next || nextCleanUpdater.compareAndSet(this, current, next)) + return used() > next; } } - } - /** - * apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the - * allocated total, we will signal waiters - */ - void adjustAllocated(long size) - { - if (size == 0) - return; - while (true) + /** Methods to allocate space **/ + + boolean tryAllocate(long size) { - long cur = allocated.get(); - if (allocated.compareAndSet(cur, cur + size)) + while (true) { - if (size > 0) - { - maybeClean(); - } + long cur; + if ((cur = allocated) + size > limit) + return false; + if (allocatedUpdater.compareAndSet(this, cur, cur + size)) + return true; + } + } + + /** + * apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the + * allocated total, we will signal waiters + */ + void adjustAllocated(long size) + { + if (size == 0) return; + while (true) + { + long cur = allocated; + if (allocatedUpdater.compareAndSet(this, cur, cur + size)) + return; } } - } - void release(long size) - { - adjustAllocated(-size); - hasRoom.signalAll(); - } + // 'acquires' an amount of memory, and maybe also marks it allocated. This method is meant to be overridden + // by implementations with a separate concept of acquired/allocated. As this method stands, an acquire + // without an allocate is a no-op (acquisition is achieved through allocation), however a release (where size < 0) + // is always processed and accounted for in allocated. + void adjustAcquired(long size, boolean alsoAllocated) + { + if (size > 0 || alsoAllocated) + { + if (alsoAllocated) + adjustAllocated(size); + maybeClean(); + } + else if (size < 0) + { + adjustAllocated(size); + hasRoom.signalAll(); + } + } - // space reclaimed should be released prior to calling this, to avoid triggering unnecessary cleans - void adjustReclaiming(long reclaiming) - { - if (reclaiming == 0) - return; - this.reclaiming.addAndGet(reclaiming); - if (reclaiming < 0 && updateNextClean() && cleanerThread != null) - cleanerThread.trigger(); - } + // space reclaimed should be released prior to calling this, to avoid triggering unnecessary cleans + void adjustReclaiming(long reclaiming) + { + if (reclaiming == 0) + return; + reclaimingUpdater.addAndGet(this, reclaiming); + if (reclaiming < 0 && updateNextClean() && cleaner != null) + cleaner.trigger(); + } - public long allocated() - { - return allocated.get(); - } + public long allocated() + { + return allocated; + } - public long used() - { - return allocated.get(); - } + public long used() + { + return allocated; + } - public long reclaiming() - { - return reclaiming.get(); + public PoolAllocator.SubAllocator newAllocator() + { + return new PoolAllocator.SubAllocator(this); + } + + public WaitQueue hasRoom() + { + return hasRoom; + } } - public abstract PoolAllocator newAllocator(OpOrder writes); -} + private static final AtomicLongFieldUpdater<SubPool> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "reclaiming"); + private static final AtomicLongFieldUpdater<SubPool> allocatedUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "allocated"); + private static final AtomicLongFieldUpdater<SubPool> nextCleanUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "nextClean"); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java index 289d0ac..aa374fe 100644 --- a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java +++ b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java @@ -18,50 +18,64 @@ package org.apache.cassandra.utils.memory; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicLong; import org.apache.cassandra.utils.concurrent.OpOrder; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.concurrent.WaitQueue; -public abstract class PoolAllocator<P extends Pool> extends AbstractAllocator +public abstract class PoolAllocator extends AbstractAllocator { - public final P pool; + + private final SubAllocator onHeap; + private final SubAllocator offHeap; volatile LifeCycle state = LifeCycle.LIVE; static enum LifeCycle { LIVE, DISCARDING, DISCARDED; - LifeCycle transition(LifeCycle target) + LifeCycle transition(LifeCycle targetState) { - assert target.ordinal() == ordinal() + 1; - return target; + switch (targetState) + { + case DISCARDING: + assert this == LifeCycle.LIVE; + return LifeCycle.DISCARDING; + case DISCARDED: + assert this == LifeCycle.DISCARDING; + return LifeCycle.DISCARDED; + } + throw new IllegalStateException(); } } - // the amount of memory/resource owned by this object - private AtomicLong owns = new AtomicLong(); - // the amount of memory we are reporting to collect; this may be inaccurate, but is close - // and is used only to ensure that once we have reclaimed we mark the tracker with the same amount - private AtomicLong reclaiming = new AtomicLong(); + PoolAllocator(SubAllocator onHeap, SubAllocator offHeap) + { + this.onHeap = onHeap; + this.offHeap = offHeap; + } + + public SubAllocator onHeap() + { + return onHeap; + } - PoolAllocator(P pool) + public SubAllocator offHeap() { - this.pool = pool; + return offHeap; } /** - * Mark this allocator as reclaiming; this will mark the memory it owns as reclaiming, so remove it from - * any calculation deciding if further cleaning/reclamation is necessary. + * Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily + * overshoot the maximum memory limit so that flushing can begin immediately */ public void setDiscarding() { state = state.transition(LifeCycle.DISCARDING); // mark the memory owned by this allocator as reclaiming - long prev = reclaiming.get(); - long cur = owns.get(); - reclaiming.set(cur); - pool.adjustReclaiming(cur - prev); + onHeap.markAllReclaiming(); + offHeap.markAllReclaiming(); } /** @@ -72,60 +86,8 @@ public abstract class PoolAllocator<P extends Pool> extends AbstractAllocator { state = state.transition(LifeCycle.DISCARDED); // release any memory owned by this allocator; automatically signals waiters - pool.release(owns.getAndSet(0)); - pool.adjustReclaiming(-reclaiming.get()); - } - - public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup); - - /** Mark the BB as unused, permitting it to be reclaimed */ - public abstract void free(ByteBuffer name); - - // mark ourselves as owning memory from the tracker. meant to be called by subclass - // allocate method that actually allocates and returns a ByteBuffer - protected void markAllocated(int size, OpOrder.Group opGroup) - { - while (true) - { - if (pool.tryAllocate(size)) - { - acquired(size); - return; - } - WaitQueue.Signal signal = opGroup.isBlockingSignal(pool.hasRoom.register()); - boolean allocated = pool.tryAllocate(size); - if (allocated || opGroup.isBlocking()) - { - signal.cancel(); - if (allocated) // if we allocated, take ownership - acquired(size); - else // otherwise we're blocking so we're permitted to overshoot our constraints, to just allocate without blocking - allocated(size); - return; - } - else - signal.awaitUninterruptibly(); - } - } - - // retroactively mark (by-passes any constraints) an amount allocated in the tracker, and owned by us. - private void allocated(int size) - { - pool.adjustAllocated(size); - owns.addAndGet(size); - } - - // retroactively mark (by-passes any constraints) an amount owned by us - private void acquired(int size) - { - owns.addAndGet(size); - } - - // release an amount of memory from our ownership, and deallocate it in the tracker - void release(int size) - { - pool.release(size); - owns.addAndGet(-size); + onHeap.releaseAll(); + offHeap.releaseAll(); } public boolean isLive() @@ -133,6 +95,9 @@ public abstract class PoolAllocator<P extends Pool> extends AbstractAllocator return state == LifeCycle.LIVE; } + public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup); + public abstract void free(ByteBuffer name); + /** * Allocate a slice of the given length. */ @@ -154,21 +119,107 @@ public abstract class PoolAllocator<P extends Pool> extends AbstractAllocator return new ContextAllocator(opGroup, this); } - @Override - public long owns() + /** Mark the BB as unused, permitting it to be reclaimed */ + public static final class SubAllocator { - return owns.get(); - } + // the tracker we are owning memory from + private final Pool.SubPool parent; - @Override - public float ownershipRatio() - { - return owns.get() / (float) pool.limit; - } + // the amount of memory/resource owned by this object + private volatile long owns; + // the amount of memory we are reporting to collect; this may be inaccurate, but is close + // and is used only to ensure that once we have reclaimed we mark the tracker with the same amount + private volatile long reclaiming; - @Override - public long reclaiming() - { - return reclaiming.get(); + SubAllocator(Pool.SubPool parent) + { + this.parent = parent; + } + + // should only be called once we know we will never allocate to the object again. + // currently no corroboration/enforcement of this is performed. + void releaseAll() + { + parent.adjustAcquired(-ownsUpdater.getAndSet(this, 0), false); + parent.adjustReclaiming(-reclaimingUpdater.getAndSet(this, 0)); + } + + // allocate memory in the tracker, and mark ourselves as owning it + public void allocate(long size, OpOrder.Group opGroup) + { + while (true) + { + if (parent.tryAllocate(size)) + { + acquired(size); + return; + } + WaitQueue.Signal signal = opGroup.isBlockingSignal(parent.hasRoom().register()); + boolean allocated = parent.tryAllocate(size); + if (allocated || opGroup.isBlocking()) + { + signal.cancel(); + if (allocated) // if we allocated, take ownership + acquired(size); + else // otherwise we're blocking so we're permitted to overshoot our constraints, to just allocate without blocking + allocated(size); + return; + } + else + signal.awaitUninterruptibly(); + } + } + + // retroactively mark an amount allocated amd acquired in the tracker, and owned by us + void allocated(long size) + { + parent.adjustAcquired(size, true); + ownsUpdater.addAndGet(this, size); + } + + // retroactively mark an amount acquired in the tracker, and owned by us + void acquired(long size) + { + parent.adjustAcquired(size, false); + ownsUpdater.addAndGet(this, size); + } + + void release(long size) + { + parent.adjustAcquired(-size, false); + ownsUpdater.addAndGet(this, -size); + } + + // mark everything we currently own as reclaiming, both here and in our parent + void markAllReclaiming() + { + while (true) + { + long cur = owns; + long prev = reclaiming; + if (reclaimingUpdater.compareAndSet(this, prev, cur)) + { + parent.adjustReclaiming(cur - prev); + return; + } + } + } + + public long owns() + { + return owns; + } + + public float ownershipRatio() + { + float r = owns / (float) parent.limit; + if (Float.isNaN(r)) + return 0; + return r; + } + + private static final AtomicLongFieldUpdater<SubAllocator> ownsUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "owns"); + private static final AtomicLongFieldUpdater<SubAllocator> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "reclaiming"); } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java index 24f71d2..68b0c20 100644 --- a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java +++ b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java @@ -21,7 +21,7 @@ package org.apache.cassandra.utils.memory; import org.apache.cassandra.utils.concurrent.WaitQueue; /** - * A thread that reclaims memor from a Pool on demand. The actual reclaiming work is delegated to the + * A thread that reclaims memory from a Pool on demand. The actual reclaiming work is delegated to the * cleaner Runnable, e.g., FlushLargestColumnFamily */ class PoolCleanerThread<P extends Pool> extends Thread @@ -44,7 +44,7 @@ class PoolCleanerThread<P extends Pool> extends Thread boolean needsCleaning() { - return pool.needsCleaning(); + return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning(); } // should ONLY be called when we really think it already needs cleaning http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java new file mode 100644 index 0000000..a90357c --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.memory; + +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.concurrent.OpOrder; +import sun.nio.ch.DirectBuffer; + +/** + * The SlabAllocator is a bump-the-pointer allocator that allocates + * large (2MB by default) regions and then doles them out to threads that request + * slices into the array. + * <p/> + * The purpose of this class is to combat heap fragmentation in long lived + * objects: by ensuring that all allocations with similar lifetimes + * only to large regions of contiguous memory, we ensure that large blocks + * get freed up at the same time. + * <p/> + * Otherwise, variable length byte arrays allocated end up + * interleaved throughout the heap, and the old generation gets progressively + * more fragmented until a stop-the-world compacting collection occurs. + */ +public class SlabAllocator extends PoolAllocator +{ + private static final Logger logger = LoggerFactory.getLogger(SlabAllocator.class); + + private final static int REGION_SIZE = 1024 * 1024; + private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region + + // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more + private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>(); + + private final AtomicReference<Region> currentRegion = new AtomicReference<Region>(); + private final AtomicInteger regionCount = new AtomicInteger(0); + + // this queue is used to keep references to off-heap allocated regions so that we can free them when we are discarded + private final ConcurrentLinkedQueue<Region> offHeapRegions = new ConcurrentLinkedQueue<>(); + private AtomicLong unslabbedSize = new AtomicLong(0); + private final boolean allocateOnHeapOnly; + + SlabAllocator(SubAllocator onHeap, SubAllocator offHeap, boolean allocateOnHeapOnly) + { + super(onHeap, offHeap); + this.allocateOnHeapOnly = allocateOnHeapOnly; + } + + public ByteBuffer allocate(int size) + { + return allocate(size, null); + } + + public ByteBuffer allocate(int size, OpOrder.Group opGroup) + { + assert size >= 0; + if (size == 0) + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + + (allocateOnHeapOnly ? onHeap() : offHeap()).allocate(size, opGroup); + // satisfy large allocations directly from JVM since they don't cause fragmentation + // as badly, and fill up our regions quickly + if (size > MAX_CLONED_SIZE) + { + unslabbedSize.addAndGet(size); + if (allocateOnHeapOnly) + return ByteBuffer.allocate(size); + Region region = new Region(ByteBuffer.allocateDirect(size)); + offHeapRegions.add(region); + return region.allocate(size); + } + + while (true) + { + Region region = getRegion(); + + // Try to allocate from this region + ByteBuffer cloned = region.allocate(size); + if (cloned != null) + return cloned; + + // not enough space! + currentRegion.compareAndSet(region, null); + } + } + + public void free(ByteBuffer name) + { + // have to assume we cannot free the memory here, and just reclaim it all when we flush + } + + public void setDiscarded() + { + for (Region region : offHeapRegions) + ((DirectBuffer) region.data).cleaner().clean(); + super.setDiscarded(); + } + + /** + * Get the current region, or, if there is no current region, allocate a new one + */ + private Region getRegion() + { + while (true) + { + // Try to get the region + Region region = currentRegion.get(); + if (region != null) + return region; + + // No current region, so we want to allocate one. We race + // against other allocators to CAS in a Region, and if we fail we stash the region for re-use + region = RACE_ALLOCATED.poll(); + if (region == null) + region = new Region(allocateOnHeapOnly ? ByteBuffer.allocate(REGION_SIZE) : ByteBuffer.allocateDirect(REGION_SIZE)); + if (currentRegion.compareAndSet(null, region)) + { + if (!allocateOnHeapOnly) + offHeapRegions.add(region); + regionCount.incrementAndGet(); + logger.trace("{} regions now allocated in {}", regionCount, this); + return region; + } + + // someone else won race - that's fine, we'll try to grab theirs + // in the next iteration of the loop. + RACE_ALLOCATED.add(region); + } + } + + /** + * A region of memory out of which allocations are sliced. + * + * This serves two purposes: + * - to provide a step between initialization and allocation, so that racing to CAS a + * new region in is harmless + * - encapsulates the allocation offset + */ + private static class Region + { + /** + * Actual underlying data + */ + private ByteBuffer data; + + /** + * Offset for the next allocation, or the sentinel value -1 + * which implies that the region is still uninitialized. + */ + private AtomicInteger nextFreeOffset = new AtomicInteger(0); + + /** + * Total number of allocations satisfied from this buffer + */ + private AtomicInteger allocCount = new AtomicInteger(); + + /** + * Create an uninitialized region. Note that memory is not allocated yet, so + * this is cheap. + * + * @param buffer bytes + */ + private Region(ByteBuffer buffer) + { + data = buffer; + } + + /** + * Try to allocate <code>size</code> bytes from the region. + * + * @return the successful allocation, or null to indicate not-enough-space + */ + public ByteBuffer allocate(int size) + { + while (true) + { + int oldOffset = nextFreeOffset.get(); + + if (oldOffset + size > data.capacity()) // capacity == remaining + return null; + + // Try to atomically claim this region + if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) + { + // we got the alloc + allocCount.incrementAndGet(); + return (ByteBuffer) data.duplicate().position(oldOffset).limit(oldOffset + size); + } + // we raced and lost alloc, try again + } + } + + @Override + public String toString() + { + return "Region@" + System.identityHashCode(this) + + " allocs=" + allocCount.get() + "waste=" + + (data.capacity() - nextFreeOffset.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/src/java/org/apache/cassandra/utils/memory/SlabPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/SlabPool.java b/src/java/org/apache/cassandra/utils/memory/SlabPool.java new file mode 100644 index 0000000..7276e57 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/SlabPool.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.utils.memory; + + +public class SlabPool extends Pool +{ + + final boolean allocateOnHeap; + public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, Runnable cleaner) + { + super(maxOnHeapMemory, maxOffHeapMemory, cleanupThreshold, cleaner); + this.allocateOnHeap = maxOffHeapMemory == 0; + } + + public SlabAllocator newAllocator() + { + return new SlabAllocator(onHeap.newAllocator(), offHeap.newAllocator(), allocateOnHeap); + } + + public boolean needToCopyOnHeap() + { + return !allocateOnHeap; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index a207bc6..37b0b96 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -3,6 +3,7 @@ # Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file. # cluster_name: Test Cluster +memtable_allocation_type: offheap_buffers in_memory_compaction_limit_in_mb: 1 commitlog_sync: batch commitlog_sync_batch_window_in_ms: 1.0 http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/unit/org/apache/cassandra/db/CollationControllerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CollationControllerTest.java b/test/unit/org/apache/cassandra/db/CollationControllerTest.java index b3a9126..fc92aae 100644 --- a/test/unit/org/apache/cassandra/db/CollationControllerTest.java +++ b/test/unit/org/apache/cassandra/db/CollationControllerTest.java @@ -18,7 +18,7 @@ */ package org.apache.cassandra.db; -import static org.junit.Assert.assertEquals; +import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; @@ -26,7 +26,8 @@ import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.junit.Test; + +import static org.junit.Assert.assertEquals; public class CollationControllerTest extends SchemaLoader { @@ -67,7 +68,7 @@ public class CollationControllerTest extends SchemaLoader // It should only iterate the last flushed sstable, since it probably contains the most recent value for Column1 QueryFilter filter = Util.namesQueryFilter(cfs, dk, "Column1"); CollationController controller = new CollationController(cfs, filter, Integer.MIN_VALUE); - controller.getTopLevelColumns(); + controller.getTopLevelColumns(true); assertEquals(1, controller.getSstablesIterated()); // SliceQueryFilter goes down another path (through collectAllData()) @@ -75,7 +76,7 @@ public class CollationControllerTest extends SchemaLoader // recent than the maxTimestamp of the very first sstable we flushed, we should only read the 2 first sstables. filter = QueryFilter.getIdentityFilter(dk, cfs.name, System.currentTimeMillis()); controller = new CollationController(cfs, filter, Integer.MIN_VALUE); - controller.getTopLevelColumns(); + controller.getTopLevelColumns(true); assertEquals(2, controller.getSstablesIterated()); } @@ -109,10 +110,10 @@ public class CollationControllerTest extends SchemaLoader filter = QueryFilter.getNamesFilter(dk, cfs.name, FBUtilities.singleton(cellName, cfs.getComparator()), queryAt); CollationController controller = new CollationController(cfs, filter, gcBefore); - assert ColumnFamilyStore.removeDeleted(controller.getTopLevelColumns(), gcBefore) == null; + assert ColumnFamilyStore.removeDeleted(controller.getTopLevelColumns(true), gcBefore) == null; filter = QueryFilter.getIdentityFilter(dk, cfs.name, queryAt); controller = new CollationController(cfs, filter, gcBefore); - assert ColumnFamilyStore.removeDeleted(controller.getTopLevelColumns(), gcBefore) == null; + assert ColumnFamilyStore.removeDeleted(controller.getTopLevelColumns(true), gcBefore) == null; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java index 8502dd5..e9fc746 100644 --- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java +++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java @@ -19,35 +19,45 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; -import java.util.*; - -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.junit.Test; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; +import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.IndexType; -import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy; -import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.index.*; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.CellNames; +import org.apache.cassandra.db.composites.Composites; +import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.filter.IDiskAtomFilter; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.SliceQueryFilter; +import org.apache.cassandra.db.index.PerColumnSecondaryIndex; +import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.index.SecondaryIndexSearcher; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.PoolAllocator; +import static org.apache.cassandra.Util.dk; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.apache.cassandra.Util.dk; - public class RangeTombstoneTest extends SchemaLoader { private static final String KSNAME = "Keyspace1"; @@ -571,7 +581,7 @@ public class RangeTombstoneTest extends SchemaLoader public void forceBlockingFlush(){} @Override - public AbstractAllocator getOnHeapAllocator() + public PoolAllocator getAllocator() { return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/unit/org/apache/cassandra/db/SecondaryIndexCellSizeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexCellSizeTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexCellSizeTest.java index 220e3b9..9e911b4 100644 --- a/test/unit/org/apache/cassandra/db/SecondaryIndexCellSizeTest.java +++ b/test/unit/org/apache/cassandra/db/SecondaryIndexCellSizeTest.java @@ -21,16 +21,17 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; import java.util.Set; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; import org.junit.Test; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.db.index.PerColumnSecondaryIndex; import org.apache.cassandra.db.index.PerRowSecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexSearcher; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.PoolAllocator; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -92,7 +93,7 @@ public class SecondaryIndexCellSizeTest { } - public AbstractAllocator getOnHeapAllocator() + public PoolAllocator getAllocator() { return null; } @@ -172,7 +173,7 @@ public class SecondaryIndexCellSizeTest } @Override - public AbstractAllocator getOnHeapAllocator() + public PoolAllocator getAllocator() { return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/unit/org/apache/cassandra/db/context/CounterContextTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java index 55d5b7c..bc297ab 100644 --- a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java +++ b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java @@ -20,18 +20,25 @@ */ package org.apache.cassandra.db.context; -import static org.junit.Assert.*; - import java.nio.ByteBuffer; import org.junit.Test; +import org.apache.cassandra.Util; import org.apache.cassandra.db.ClockAndCount; import org.apache.cassandra.db.context.CounterContext.Relationship; -import org.apache.cassandra.Util; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CounterId; +import org.apache.cassandra.utils.memory.AbstractAllocator; +import org.apache.cassandra.utils.memory.Pool; +import org.apache.cassandra.utils.memory.SlabPool; import static org.apache.cassandra.db.context.CounterContext.ContextState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; public class CounterContextTest { @@ -44,6 +51,16 @@ public class CounterContextTest private static final int countLength = 8; private static final int stepLength = idLength + clockLength + countLength; + private static final Pool POOL = new SlabPool(Integer.MAX_VALUE, 0, 1f, null); + + /** Allocates 1 byte from a new SlabAllocator and returns it. */ + private AbstractAllocator bumpedSlab() + { + AbstractAllocator allocator = POOL.newAllocator(); + allocator.allocate(1); + return allocator; + } + @Test public void testAllocate() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a3b5dbc/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java index 660a6e0..c57ba05 100644 --- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java @@ -22,19 +22,23 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Set; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; import org.junit.Before; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.Util; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.PoolAllocator; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -175,7 +179,7 @@ public class PerRowSecondaryIndexTest extends SchemaLoader } @Override - public AbstractAllocator getOnHeapAllocator() + public PoolAllocator getAllocator() { return null; }
