IGNITE-9360 Remove SnapTreeMap and tests on it. - Fixes #4608. Signed-off-by: Dmitriy Govorukhin <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/35706b94 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/35706b94 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/35706b94 Branch: refs/heads/master Commit: 35706b94e4d714f426eee12cd2c3d9a1e8452dad Parents: 9bfc823 Author: Ilya Kasnacheev <[email protected]> Authored: Tue Sep 4 18:49:06 2018 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Tue Sep 4 18:49:06 2018 +0300 ---------------------------------------------------------------------- .../util/snaptree/CopyOnWriteManager.java | 390 --- .../ignite/internal/util/snaptree/Epoch.java | 131 - .../internal/util/snaptree/EpochNode.java | 433 --- .../internal/util/snaptree/SnapTreeMap.java | 2917 ------------------ .../internal/util/snaptree/package-info.java | 22 - .../testsuites/IgniteUtilSelfTestSuite.java | 2 - .../apache/ignite/util/GridIndexFillTest.java | 259 -- .../loadtests/h2indexing/GridTestEntity.java | 67 - .../loadtests/h2indexing/GridTreeBenchmark.java | 280 -- 9 files changed, 4501 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/35706b94/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/CopyOnWriteManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/CopyOnWriteManager.java b/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/CopyOnWriteManager.java deleted file mode 100644 index 685668a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/CopyOnWriteManager.java +++ /dev/null @@ -1,390 +0,0 @@ -/* - * Copyright (c) 2009 Stanford University, unless otherwise specified. - * All rights reserved. - * - * This software was developed by the Pervasive Parallelism Laboratory of - * Stanford University, California, USA. - * - * Permission to use, copy, modify, and distribute this software in source - * or binary form for any purpose with or without fee is hereby granted, - * provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * 3. Neither the name of Stanford University nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY - * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ - -package org.apache.ignite.internal.util.snaptree; - -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; - -/** Manages copy-on-write behavior for a concurrent tree structure. It is - * assumed that the managed structure allows concurrent mutation, but that no - * mutating operations may be active when a copy-on-write snapshot of tree is - * taken. Because it is difficult to update the size of data structure in a - * highly concurrent fashion, the <code>CopyOnWriteManager</code> also manages - * a running total that represents the size of the contained tree structure. - * <p> - * Users should implement the {@link #freezeAndClone(Object)} and - * {@link #cloneFrozen(Object)} methods. - */ -@SuppressWarnings("ALL") -abstract public class CopyOnWriteManager<E> implements Cloneable { - - /** This is basically a stripped-down CountDownLatch. Implementing our own - * reduces the object count by one, and it gives us access to the - * uninterruptable acquireShared. - */ - private class Latch extends AbstractQueuedSynchronizer { - /** */ - private static final long serialVersionUID = 0L; - - Latch(final boolean triggered) { - setState(triggered ? 0 : 1); - } - - public int tryAcquireShared(final int acquires) { - // 1 = success, and followers may also succeed - // -1 = failure - return getState() == 0 ? 1 : -1; - } - - public boolean tryReleaseShared(final int releases) { - // Before, state is either 0 or 1. After, state is always 0. - return compareAndSetState(1, 0); - } - } - - private static final int MUTATE = 1; - private static final int MUTATE_AFTER_FREEZE = 2; - private static final int BULK_READ = 3; - private static final int BULK_READ_AFTER_FREEZE = 4; - - private class COWEpoch extends EpochNode { - /** */ - private static final long serialVersionUID = 0L; - - /** Tripped after this COWEpoch is installed as active. */ - private final Latch _activated; - - /** True iff this is a mutating epoch. */ - final boolean mutationAllowed; - - /** The value used by this epoch. */ - E value; - - /** The computed size of <code>value</code>, as of the beginning of - * this epoch. - */ - int initialSize; - - /** A frozen E equal to <code>value</code>, if not <code>dirty</code>. */ - private volatile E _frozenValue; - - /** True if any mutations have been performed on <code>value</code>. */ - volatile boolean dirty; - - /** The epoch that will follow this one, created on demand. */ - final AtomicReference<COWEpoch> successorRef = new AtomicReference<COWEpoch>(null); - - /** A ticket on the successor, released when this epoch is closed. */ - Epoch.Ticket successorTicket; - - /** True if the successor should freeze and clone this epoch's value. */ - boolean freezeRequested; - - private COWEpoch(final boolean mutationAllowed) { - this._activated = new Latch(false); - this.mutationAllowed = mutationAllowed; - } - - public COWEpoch(final E value, final E frozenValue, final int initialSize) { - this._activated = new Latch(true); // pre-triggered - this.mutationAllowed = true; - this.value = value; - this.initialSize = initialSize; - this._frozenValue = frozenValue; - this.dirty = frozenValue == null; - } - - EpochNode attemptInitialArrive() { - return super.attemptArrive(); - } - - @Override - public EpochNode attemptArrive() { - final EpochNode ticket = super.attemptArrive(); - if (ticket != null && !dirty) { - dirty = true; - _frozenValue = null; - } - return ticket; - } - - private void setFrozenValue(final E v) { - if (!dirty) { - _frozenValue = v; - if (dirty) { - _frozenValue = null; - } - } - } - - E getFrozenValue() { - final E v = _frozenValue; - return dirty ? null : v; - } - - protected void onClosed(final int dataSum) { - assert(dataSum == 0 || dirty); - - final COWEpoch succ = successorRef.get(); - if (freezeRequested) { - succ.value = freezeAndClone(value); - succ.setFrozenValue(value); - } - else { - succ.value = value; - if (dirty) { - succ.dirty = true; - } - else { - succ.setFrozenValue(_frozenValue); - } - } - succ.initialSize = initialSize + dataSum; - - _active = succ; - successorTicket.leave(0); - succ._activated.releaseShared(1); - } - - public void awaitActivated() { - _activated.acquireShared(1); - } - - public COWEpoch getOrCreateSuccessor(final boolean preferredMutation) { - final COWEpoch existing = successorRef.get(); - if (existing != null) { - return existing; - } - - final COWEpoch repl = new COWEpoch(preferredMutation); - if (attemptInstallSuccessor(repl)) { - return repl; - } - - return successorRef.get(); - } - - public boolean attemptInstallSuccessor(final COWEpoch succ) { - final Epoch.Ticket t = succ.attemptInitialArrive(); - if (successorRef.compareAndSet(null, succ)) { - successorTicket = t; - beginClose(); - return true; - } - else { - return false; - } - } - } - - private volatile COWEpoch _active; - - /** Creates a new {@link CopyOnWriteManager} holding - * <code>initialValue</code>, with an assumed size of - * <code>initialSize</code>. - */ - public CopyOnWriteManager(final E initialValue, final int initialSize) { - _active = new COWEpoch(initialValue, null, initialSize); - } - - /** The implementing method must mark <code>value</code> as shared, and - * return a new object to use in its place. Hopefully, the majority of - * the work of the clone can be deferred by copy-on-write. - */ - abstract protected E freezeAndClone(final E value); - - /** Returns a clone of a frozen E. */ - abstract protected E cloneFrozen(E frozenValue); - - public CopyOnWriteManager<E> clone() { - final CopyOnWriteManager<E> copy; - try { - copy = (CopyOnWriteManager<E>) super.clone(); - } - catch (final CloneNotSupportedException xx) { - throw new Error("unexpected", xx); - } - - COWEpoch a = _active; - E f = a.getFrozenValue(); - while (f == null) { - a.freezeRequested = true; - final COWEpoch succ = a.getOrCreateSuccessor(a.mutationAllowed); - succ.awaitActivated(); - if (a.value != succ.value) { - f = a.value; - } - a = succ; - } - - copy.createNewEpoch(f, a); - return copy; - } - - private void createNewEpoch(E f, COWEpoch a) - { - _active = new COWEpoch(cloneFrozen(f), f, a.initialSize); - } - - /** Returns a reference to the tree structure suitable for a read - * operation. The returned structure may be mutated by operations that - * have the permission of this {@link CopyOnWriteManager}, but they will - * not observe changes managed by other instances. - */ - public E read() { - return _active.value; - } - - /** Obtains permission to mutate the copy-on-write value held by this - * instance, perhaps blocking while a concurrent snapshot is being - * performed. {@link Epoch.Ticket#leave} must be called exactly once on - * the object returned from this method, after the mutation has been - * completed. The change in size reflected by the mutation should be - * passed as the parameter to <code>leave</code>. - */ - public Epoch.Ticket beginMutation() { - return begin(true); - } - - public Epoch.Ticket beginQuiescent() { - return begin(false); - } - - private Epoch.Ticket begin(final boolean mutation) { - final COWEpoch active = _active; - if (active.mutationAllowed == mutation) { - final Epoch.Ticket ticket = active.attemptArrive(); - if (ticket != null) { - return ticket; - } - } - return begin(mutation, active); - } - - private Epoch.Ticket begin(final boolean mutation, COWEpoch epoch) { - while (true) { - COWEpoch succ = epoch.successorRef.get(); - if (succ == null) { - final COWEpoch newEpoch = new COWEpoch(mutation); - final Epoch.Ticket newTicket = newEpoch.attemptArrive(); - if (epoch.attemptInstallSuccessor(newEpoch)) { - // can't use the ticket until the new epoch is activated - newEpoch.awaitActivated(); - return newTicket; - } - - // if our CAS failed, somebody else succeeded - succ = epoch.successorRef.get(); - } - - // is the successor created by someone else suitable? - if (succ.mutationAllowed == mutation) { - final Epoch.Ticket ticket = succ.attemptArrive(); - if (ticket != null) { - succ.awaitActivated(); - return ticket; - } - } - - epoch = succ; - } - } - - /** Returns a reference to the tree structure suitable for a mutating - * operation. This method may only be called under the protection of a - * ticket returned from {@link #beginMutation}. - */ - public E mutable() { - return _active.value; - } - - /** Returns a reference to a snapshot of this instance's tree structure - * that may be read, but not written. This is accomplished by suspending - * mutation, replacing the mutable root of this manager with the result of - * <code>freezeAndClone(root, false)</code>, and then returning a - * reference to the old root. Successive calls to this method may return - * the same instance. - */ - public E frozen() { - COWEpoch a = _active; - E f = a.getFrozenValue(); - while (f == null) { - a.freezeRequested = true; - final COWEpoch succ = a.getOrCreateSuccessor(a.mutationAllowed); - succ.awaitActivated(); - if (a.value != succ.value) { - f = a.value; - } - a = succ; - } - return f; - } - - /** Returns a reference to a snapshot of this instance's tree structure, - * if one is available without requiring any additional copying, otherwise - * returns null. May be used in combination with {@link #beginQuiescent} - * to perform quiescent reads with minimal cost. - */ - public E availableFrozen() { - return _active.getFrozenValue(); - } - - /** Returns true if the computed {@link #size} is zero. */ - public boolean isEmpty() { - // for a different internal implementation (such as a C-SNZI) we might - // be able to do better than this - return size() == 0; - } - - /** Returns the sum of the <code>initialSize</code> parameter passed to the - * constructor, and the size deltas passed to {@link Epoch.Ticket#leave} - * for all of the mutation tickets. The result returned is linearizable - * with mutations, which requires mutation to be quiesced. No tree freeze - * is required, however. - */ - public int size() { - final COWEpoch a = _active; - final Integer delta = a.attemptDataSum(); - if (delta != null) { - return a.initialSize + delta; - } - - // wait for an existing successor, or force one if not already in progress - final COWEpoch succ = a.getOrCreateSuccessor(a.mutationAllowed); - succ.awaitActivated(); - return succ.initialSize; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/35706b94/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/Epoch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/Epoch.java b/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/Epoch.java deleted file mode 100644 index c85320d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/Epoch.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2009 Stanford University, unless otherwise specified. - * All rights reserved. - * - * This software was developed by the Pervasive Parallelism Laboratory of - * Stanford University, California, USA. - * - * Permission to use, copy, modify, and distribute this software in source - * or binary form for any purpose with or without fee is hereby granted, - * provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * 3. Neither the name of Stanford University nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY - * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ - -package org.apache.ignite.internal.util.snaptree; - -/** A <code>Epoch</code> has a lifecycle consisting of three phases: active, - * closing, and closed. During the active phase partipants may arrive and - * leave the epoch. Once a close has been requested, new participants are not - * allowed, only leaving is possible. Once close has been requested and all - * participants have left, the epoch is transitioned to the closed state. - * <p> - * Entry is performed with {@link #attemptArrive}, which returns a non-null - * ticket on success or null if {@link #beginClose} has already been called. - * Each successful call to <code>attemptArrive</code> must be paired by a call - * to {@link Ticket#leave} on the returned ticket. - * <p> - * The abstract method {@link #onClosed} will be invoked exactly once after - * the epoch becomes closed. It will be passed the sum of the values passed - * to {@link Ticket#leave}. There is no way to query the current participant - * count or state of the epoch without changing it. - * <p> - * Internally the epoch responds to contention by increasing its size, - * striping the participant count across multiple objects (and hopefully - * multiple cache lines). Once close has begun, the epoch converts itself to - * a single-shot hierarchical barrier, that also performs a hierarchical - * reduction of the leave parameters. - */ -@SuppressWarnings("ALL") -abstract public class Epoch { - - /** Represents a single successful arrival to an {@link Epoch}. */ - public interface Ticket { - /** Informs the epoch that returned this ticket that the participant - * has left. This method should be called exactly once per ticket. - * The sum of the <code>data</code> values for all tickets will be - * computed and passed to {@link Epoch#onClosed}. - */ - void leave(int data); - } - - private final Root _root = new Root(); - - /** Returns a {@link Ticket} indicating a successful arrival, if no call to - * {@link #beginClose} has been made for this epoch, or returns null if - * close has already begun. {@link Ticket#leave} must be called exactly - * once on any returned ticket. - */ - public Ticket attemptArrive() { - return _root.attemptArrive(); - } - - /** Prevents new arrivals from succeeding, then returns immediately. - * {@link #onClosed} will be called after all outstanding tickets have - * been returned. To block until close is complete, add some sort of - * synchronization logic to the user-defined implementation of {@link - * #onClosed}. - */ - public void beginClose() { - _root.beginClose(); - } - - /** Override this method to provide user-defined behavior. - * <code>dataSum</code> will be the sum of the <code>data</code> values - * passed to {@link Ticket#leave} for all tickets in this epoch. - * <p> - * As a simple example, a blocking close operation may be defined by:<pre> - * class BlockingEpoch extends Epoch { - * private final CountDownLatch _closed = new CountDownLatch(1); - * - * public void blockingClose() throws InterruptedException { - * beginClose(); - * _closed.await(); - * } - * - * protected void onClosed(int dataSum) { - * _closed.countDown(1); - * } - * } - * </pre> - */ - abstract protected void onClosed(int dataSum); - - //////////////// debugging stuff - - int computeSpread() { - return _root.computeSpread(); - } - - //////////////// internal implementation - - private class Root extends EpochNode { - /** */ - private static final long serialVersionUID = 0L; - - protected void onClosed(final int dataSum) { - Epoch.this.onClosed(dataSum); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/35706b94/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/EpochNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/EpochNode.java b/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/EpochNode.java deleted file mode 100644 index 7ceec45..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/snaptree/EpochNode.java +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Copyright (c) 2009 Stanford University, unless otherwise specified. - * All rights reserved. - * - * This software was developed by the Pervasive Parallelism Laboratory of - * Stanford University, California, USA. - * - * Permission to use, copy, modify, and distribute this software in source - * or binary form for any purpose with or without fee is hereby granted, - * provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * 3. Neither the name of Stanford University nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY - * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ - -package org.apache.ignite.internal.util.snaptree; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -/** Provides an implementation of the behavior of an {@link Epoch}. */ -@SuppressWarnings("ALL") -abstract class EpochNode extends AtomicLong implements Epoch.Ticket { - - private static final int TRIES_BEFORE_SUBTREE = 2; - private static final int CLOSER_HEAD_START = 1000; - - /** This includes the root. 7 or fewer procs gets 2, 63 or fewer gets - * 3, 511 or fewer 4. We observe that the node count reported by {@link - * #computeSpread} is roughly twice the number of hardware contexts in - * use. - */ - private static final int MAX_LEVELS = 2 + log8(Runtime.getRuntime().availableProcessors()); - - /** Returns floor(log_base_8(value)). */ - private static int log8(final int value) { - return (31 - Integer.numberOfLeadingZeros(value)) / 3; - } - - //////////////// branching factor - - private static final int LOG_BF = 3; - private static final int BF = 1 << LOG_BF; - private static final int BF_MASK = BF - 1; - - //////////////// bit packing - - private static final int DATA_SUM_SHIFT = 32; - private static int dataSum(long state) { return (int)(state >> DATA_SUM_SHIFT); } - private static long withDataDelta(long state, int delta) { return state + (((long) delta) << DATA_SUM_SHIFT); } - - private static final int CHILD_CLOSED_SHIFT = 32 - BF; - private static long ALL_CHILDREN_CLOSED = ((1L << BF) - 1L) << CHILD_CLOSED_SHIFT; - private static long childClosedBit(int which) { return 1L << (CHILD_CLOSED_SHIFT + which); } - private static boolean isChildClosed(long state, int which) { return (state & childClosedBit(which)) != 0; } - private static long withChildClosed(long state, int which, long childState) { - assert(!isChildClosed(state, which)); - return withDataDelta(state | childClosedBit(which), dataSum(childState)); - } - private static boolean isAllChildrenClosed(long state) { return (state & ALL_CHILDREN_CLOSED) == ALL_CHILDREN_CLOSED; } - - private static final int CHILD_PRESENT_SHIFT = CHILD_CLOSED_SHIFT - BF; - private static final long ANY_CHILD_PRESENT = ((1L << BF) - 1L) << CHILD_PRESENT_SHIFT; - private static long childPresentBit(int which) { return 1L << (CHILD_PRESENT_SHIFT + which); } - private static boolean isChildPresent(long state, int which) { return (state & childPresentBit(which)) != 0; } - private static long withChildPresent(long state, int which) { return state | childPresentBit(which); } - private static boolean isAnyChildPresent(long state) { return (state & ANY_CHILD_PRESENT) != 0; } - - private static final long MARK = (1L << (CHILD_PRESENT_SHIFT - 1)); - private static boolean isMarked(long state) { return (state & MARK) != 0L; } - /** Records all non-present children as closed. */ - private static long withMarked(long state) { - final int missingChildren = (~((int) state) >> CHILD_PRESENT_SHIFT) & ((1 << BF) - 1); - return state | MARK | (((long) missingChildren) << CHILD_CLOSED_SHIFT); - } - - private static final long ENTRY_COUNT_MASK = MARK - 1; - private static int entryCount(long state) { return (int) (state & ENTRY_COUNT_MASK); } - private static long withArrive(long state) { return state + 1; } - private static long withLeave(long state, int dataDelta) { return withDataDelta(state - 1, dataDelta); } - private static boolean mayArrive(long state) { return entryCount(state) != ENTRY_COUNT_MASK; } - private static boolean mayLeave(long state) { return entryCount(state) != 0; } - - private static final long CLOSED_MASK = MARK | ALL_CHILDREN_CLOSED | ENTRY_COUNT_MASK; - private static final long CLOSED_VALUE = MARK | ALL_CHILDREN_CLOSED; - private static boolean isClosed(long state) { return (state & CLOSED_MASK) == CLOSED_VALUE; } - - private static final long ENTRY_FAST_PATH_MASK = ANY_CHILD_PRESENT | MARK | (ENTRY_COUNT_MASK - (ENTRY_COUNT_MASK >> 1)); - /** Not marked, no children, and no overflow possible. */ - private static boolean isEntryFastPath(long state) { return (state & ENTRY_FAST_PATH_MASK) == 0L; } - - //////////////// subclasses - - private static class Child extends EpochNode { - /** */ - private static final long serialVersionUID = 0L; - - private Child(final EpochNode parent, final int whichInParent) { - super(parent, whichInParent); - } - - protected void onClosed(final int dataSum) { - throw new Error(); - } - } - - //////////////// instance state - - private static final AtomicReferenceFieldUpdater[] childrenUpdaters = { - AtomicReferenceFieldUpdater.newUpdater(EpochNode.class, EpochNode.class, "_child0"), - AtomicReferenceFieldUpdater.newUpdater(EpochNode.class, EpochNode.class, "_child1"), - AtomicReferenceFieldUpdater.newUpdater(EpochNode.class, EpochNode.class, "_child2"), - AtomicReferenceFieldUpdater.newUpdater(EpochNode.class, EpochNode.class, "_child3"), - AtomicReferenceFieldUpdater.newUpdater(EpochNode.class, EpochNode.class, "_child4"), - AtomicReferenceFieldUpdater.newUpdater(EpochNode.class, EpochNode.class, "_child5"), - AtomicReferenceFieldUpdater.newUpdater(EpochNode.class, EpochNode.class, "_child6"), - AtomicReferenceFieldUpdater.newUpdater(EpochNode.class, EpochNode.class, "_child7") - }; - - private final EpochNode _parent; - private final int _whichInParent; - - // It would be cleaner to use an array of children, but we want to force - // all of the bulk into the same object as the AtomicLong.value. - - // To avoid races between creating a child and marking a node as closed, - // we add a bit to the state for each child that records whether it - // *should* exist. If we find that the bit is set but a child is missing, - // we can create it ourself. - - private volatile EpochNode _child0; - private volatile EpochNode _child1; - private volatile EpochNode _child2; - private volatile EpochNode _child3; - private volatile EpochNode _child4; - private volatile EpochNode _child5; - private volatile EpochNode _child6; - private volatile EpochNode _child7; - - EpochNode() { - _parent = null; - _whichInParent = 0; - } - - private EpochNode(final EpochNode parent, final int whichInParent) { - _parent = parent; - _whichInParent = whichInParent; - } - - //////////////// provided by the caller - - abstract protected void onClosed(int dataSum); - - //////////////// child management - - private EpochNode getChildFromField(final int which) { - switch (which) { - case 0: return _child0; - case 1: return _child1; - case 2: return _child2; - case 3: return _child3; - case 4: return _child4; - case 5: return _child5; - case 6: return _child6; - case 7: return _child7; - default: return null; - } - } - - private EpochNode getChild(final long state, final int which) { - if (!isChildPresent(state, which)) { - return null; - } - final EpochNode existing = getChildFromField(which); - return (existing != null) ? existing : constructPresentChild(which); - } - - @SuppressWarnings("unchecked") - private EpochNode constructPresentChild(final int which) { - final EpochNode n = new Child(this, which); - return childrenUpdaters[which].compareAndSet(this, null, n) ? n : getChildFromField(which); - } - - private EpochNode getOrCreateChild(final int which) { - final EpochNode existing = getChildFromField(which); - return (existing != null) ? existing : createChild(which); - } - - private EpochNode createChild(final int which) { - while (true) { - final long state = get(); - if (isMarked(state)) { - // whatever we've got is what we've got - return getChild(state, which); - } - if (compareAndSet(state, withChildPresent(state, which))) { - // the child now should exist, but we must still actually - // construct and link in the instance - return constructPresentChild(which); - } - } - } - - /** Returns the <code>Node</code> to decr on success, null if - * {@link #beginClose} has already been called on this instance. - */ - public EpochNode attemptArrive() { - final long state = get(); - if (isEntryFastPath(state) && compareAndSet(state, withArrive(state))) { - return this; - } - else { - return attemptArrive(0, 1); - } - } - - private int getIdentity() { - final int h = System.identityHashCode(Thread.currentThread()); - - // Multiply by -127, as suggested by java.util.IdentityHashMap. - // We also set an bit we don't use, to make sure it is never zero. - return (h - (h << 7)) | (1 << 31); - } - - /** level 1 is the root. */ - private EpochNode attemptArrive(int id, final int level) { - int tries = 0; - while (true) { - final long state = get(); - if (isMarked(state)) { - return null; - } - if (isAnyChildPresent(state) || - (tries >= TRIES_BEFORE_SUBTREE && level < MAX_LEVELS)) { - // Go deeper if we have previously detected contention, or if - // we are currently detecting it. Lazy computation of our - // current identity. - if (id == 0) { - id = getIdentity(); - } - final EpochNode child = getOrCreateChild(id & BF_MASK); - if (child == null) { - return null; - } - return child.attemptArrive(id >> LOG_BF, level + 1); - } - if (!mayArrive(state)) { - throw new IllegalStateException("maximum arrival count of " + ENTRY_COUNT_MASK + " exceeded"); - } - if (compareAndSet(state, withArrive(state))) { - // success - return this; - } - - ++tries; - } - } - - /** Should be called on every non-null return value from attemptArrive. */ - public void leave(final int dataDelta) { - while (true) { - final long state = get(); - if (!mayLeave(state)) { - throw new IllegalStateException("incorrect call to Epoch.leave"); - } - final long after = withLeave(state, dataDelta); - if (compareAndSet(state, after)) { - if (isClosed(after)) { - newlyClosed(after); - } - return; - } - } - } - - private void newlyClosed(final long state) { - if (_parent != null) { - // propogate - _parent.childIsNowClosed(_whichInParent, state); - } - else { - // report - onClosed(dataSum(state)); - } - } - - private void childIsNowClosed(final int which, final long childState) { - while (true) { - final long state = get(); - if (isChildClosed(state, which)) { - // not our problem - return; - } - final long after = withChildClosed(state, which, childState); - if (compareAndSet(state, after)) { - if (isClosed(after)) { - newlyClosed(after); - } - return; - } - } - } - - /** Prevents subsequent calls to {@link #attemptArrive} from succeeding. */ - public void beginClose() { - int attempts = 0; - long state; - while (true) { - ++attempts; - - state = get(); - if (isClosed(state)) { - return; - } - - if (isMarked(state)) { - // give the thread that actually performed this transition a - // bit of a head start - if (attempts < CLOSER_HEAD_START) { - continue; - } - break; - } - - // every child that is not present will be recorded as closed by withMarked - final long after = withMarked(state); - if (compareAndSet(state, after)) { - if (isAllChildrenClosed(after)) { - if (isClosed(after) && _parent == null) { - // finished in one CAS, yeah! - onClosed(dataSum(after)); - } - // no second stage necessary - return; - } - // CAS successful, so now we need to beginClose() the children - break; - } - } - - // no new child bits can be set after marking, so this gets everyone - for (int which = 0; which < BF; ++which) { - final EpochNode child = getChild(state, which); - if (child != null) { - child.beginClose(); - } - } - - // Rather than have each child bubble up its closure, we gather it - // here to reduce the number of CASs required. - while (true) { - final long before = get(); - long after = before; - for (int which = 0; which < BF; ++which) { - if (!isChildClosed(before, which)) { - final long childState = getChildFromField(which).get(); - if (isClosed(childState)) { - after = withChildClosed(after, which, childState); - } - } - } - if (before == after) { - return; - } - if (compareAndSet(before, after)) { - if (isClosed(after) && _parent == null) { - onClosed(dataSum(after)); - } - return; - } - } - } - - /** If possible returns the <code>dataSum</code> that would be delivered - * to {@link #onClosed(int)} if this epoch were closed at this moment, - * otherwise returns null. This will succeed if and only if the tree - * consists only of a single node. - */ - public Integer attemptDataSum() { - final long state = get(); - if (!isAnyChildPresent(state) && entryCount(state) == 0) { - // this is better than Integer.valueOf for dynamic escape analysis - //return new Integer(dataSum(state)); - // this is better than new Integer() for object creation - return Integer.valueOf(dataSum(state)); - } - else { - return null; - } - } - - /** For debugging purposes. */ - int computeSpread() { - final long state = get(); - if (isAnyChildPresent(state)) { - int sum = 0; - for (int which = 0; which < BF; ++which) { - final EpochNode child = getChild(state, which); - if (child != null) { - sum += child.computeSpread(); - } - else { - // child would be created for arrive, so count it - sum += 1; - } - } - return sum; - } - else { - return 1; - } - } -} \ No newline at end of file
