Revert "IGNITE-1678 code + test+ benchmark config" This reverts commit 784958bf6e0e8e81191af498f6a993b2bfd78204.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/980df4ea Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/980df4ea Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/980df4ea Branch: refs/heads/ignite-3727-2 Commit: 980df4ea1be385867c69d2fa977f22d808701aca Parents: 21e0093 Author: DmitriyGovorukhin <[email protected]> Authored: Tue Sep 13 14:27:08 2016 +0300 Committer: DmitriyGovorukhin <[email protected]> Committed: Tue Sep 13 14:27:08 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteAtomicSequence.java | 15 - .../configuration/AtomicConfiguration.java | 25 - .../datastructures/DataStructuresProcessor.java | 111 ++-- .../GridCacheAtomicSequenceImpl.java | 384 +++++++----- ...AtomicSequenceMultiThreadedAbstractTest.java | 579 ------------------- .../GridCacheSequenceApiSelfAbstractTest.java | 101 +--- ...titionedAtomicSequenceMultiThreadedTest.java | 313 +++++++++- ...GridCachePartitionedSequenceApiSelfTest.java | 4 - ...plicatedAtomicSequenceMultiThreadedTest.java | 33 -- .../GridCacheReplicatedSequenceApiSelfTest.java | 4 - .../IgniteCacheDataStructuresSelfTestSuite.java | 2 - .../cache/IgniteAtomicSequenceBenchmark.java | 45 -- 12 files changed, 625 insertions(+), 991 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/980df4ea/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java index aa1cbdf..a1e1392 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java @@ -118,21 +118,6 @@ public interface IgniteAtomicSequence extends Closeable { public void batchSize(int size); /** - * Gets local reserve percentage for this atomic sequence. When a reserve percentage of a batch size - * is reached when sequence starts a new reservation in background. - * - * @return Sequence reserve pecentage. - */ - public int reservePercentage(); - - /** - * Sets local reserve percentage for this atomic sequence. - * - * @param percentage Reserve pecentage. Must be between 0 and 100. - */ - public void reservePercentage(int percentage); - - /** * Gets status of atomic sequence. * * @return {@code true} if atomic sequence was removed from cache, {@code false} otherwise. http://git-wip-us.apache.org/repos/asf/ignite/blob/980df4ea/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java index ad96b73..6649b5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java @@ -36,9 +36,6 @@ public class AtomicConfiguration { /** Default atomic sequence reservation size. */ public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE = 1000; - /** Default atomic sequence reservation size. */ - public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_PERCENTAGE = 80; - /** Default batch size for all cache's sequences. */ private int seqReserveSize = DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE; @@ -48,9 +45,6 @@ public class AtomicConfiguration { /** Number of backups. */ private int backups = DFLT_BACKUPS; - /** Atomic sequence reservation percentage. */ - private int atomicSeqReservePercentage = DFLT_ATOMIC_SEQUENCE_RESERVE_PERCENTAGE; - /** * @return Number of backup nodes. */ @@ -104,25 +98,6 @@ public class AtomicConfiguration { this.seqReserveSize = seqReserveSize; } - /** - * Gets reserve percentage for configuration. When a reserve percentage of a batch size - * is reached when sequence starts a new reservation in background. - * - * @return Atomic sequence reservation percentage. - */ - public int getAtomicSequenceReservePercentage() { - return atomicSeqReservePercentage; - } - - /** - * Sets reserve percentage for configuration. - * * - * @param atomicSeqReservePercentage Atomic sequence reservation percentage. - */ - public void setAtomicSequenceReservePercentage(int atomicSeqReservePercentage) { - this.atomicSeqReservePercentage = atomicSeqReservePercentage; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(AtomicConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/980df4ea/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index eb81ca2..1cad22f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -36,7 +36,6 @@ import javax.cache.event.EventType; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; - import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicReference; import org.apache.ignite.IgniteAtomicSequence; @@ -358,9 +357,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (seqVal == null && !create) return null; - /* // We should use offset because we already reserved left side of range. + // We should use offset because we already reserved left side of range. long off = atomicCfg.getAtomicSequenceReserveSize() > 1 ? - atomicCfg.getAtomicSequenceReserveSize() - 1 : 1;*/ + atomicCfg.getAtomicSequenceReserveSize() - 1 : 1; long upBound; long locCntr; @@ -368,16 +367,18 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (seqVal == null) { locCntr = initVal; - upBound = locCntr + atomicCfg.getAtomicSequenceReserveSize(); + upBound = locCntr + off; - seqVal = new GridCacheAtomicSequenceValue(upBound); + // Global counter must be more than reserved region. + seqVal = new GridCacheAtomicSequenceValue(upBound + 1); } else { locCntr = seqVal.get(); - upBound = locCntr + atomicCfg.getAtomicSequenceReserveSize(); + upBound = locCntr + off; - seqVal.set(upBound); + // Global counter must be more than reserved region. + seqVal.set(upBound + 1); } // Update global counter. @@ -389,7 +390,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { seqView, dsCacheCtx, atomicCfg.getAtomicSequenceReserveSize(), - atomicCfg.getAtomicSequenceReservePercentage(), locCntr, upBound); @@ -448,7 +448,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * Gets an atomic long from cache or creates one if it's not cached. * * @param name Name of atomic long. - * @param initVal Initial value for atomic long. If atomic long already cached, {@code initVal} will be ignored. + * @param initVal Initial value for atomic long. If atomic long already cached, {@code initVal} + * will be ignored. * @param create If {@code true} atomic long will be created in case it is not in cache. * @return Atomic long. * @throws IgniteCheckedException If loading failed. @@ -525,7 +526,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { final DataStructureInfo dsInfo, final boolean create, Class<? extends T> cls) - throws IgniteCheckedException { + throws IgniteCheckedException + { Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name))) @@ -605,7 +607,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { String name, DataStructureType type, @Nullable final IgniteInClosureX<T> afterRmv) - throws IgniteCheckedException { + throws IgniteCheckedException + { Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); if (dsMap == null || !dsMap.containsKey(name)) @@ -653,8 +656,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * Gets an atomic reference from cache or creates one if it's not cached. * * @param name Name of atomic reference. - * @param initVal Initial value for atomic reference. If atomic reference already cached, {@code initVal} will be - * ignored. + * @param initVal Initial value for atomic reference. If atomic reference already cached, {@code initVal} + * will be ignored. * @param create If {@code true} atomic reference will be created in case it is not in cache. * @return Atomic reference. * @throws IgniteCheckedException If loading failed. @@ -663,7 +666,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { public final <T> IgniteAtomicReference<T> atomicReference(final String name, final T initVal, final boolean create) - throws IgniteCheckedException { + throws IgniteCheckedException + { A.notNull(name, "name"); awaitInitialization(); @@ -757,10 +761,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * Gets an atomic stamped from cache or creates one if it's not cached. * * @param name Name of atomic stamped. - * @param initVal Initial value for atomic stamped. If atomic stamped already cached, {@code initVal} will be - * ignored. - * @param initStamp Initial stamp for atomic stamped. If atomic stamped already cached, {@code initStamp} will be - * ignored. + * @param initVal Initial value for atomic stamped. If atomic stamped already cached, {@code initVal} + * will be ignored. + * @param initStamp Initial stamp for atomic stamped. If atomic stamped already cached, {@code initStamp} + * will be ignored. * @param create If {@code true} atomic stamped will be created in case it is not in cache. * @return Atomic stamped. * @throws IgniteCheckedException If loading failed. @@ -1001,7 +1005,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { @Nullable private <T> T getCollection(final IgniteClosureX<GridCacheContext, T> c, final DataStructureInfo dsInfo, boolean create) - throws IgniteCheckedException { + throws IgniteCheckedException + { awaitInitialization(); Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); @@ -1078,7 +1083,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { @Nullable private static IgniteCheckedException validateDataStructure( @Nullable Map<String, DataStructureInfo> dsMap, DataStructureInfo info, - boolean create) { + boolean create) + { if (dsMap == null) return null; @@ -1096,17 +1102,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * * @param name Name of the latch. * @param cnt Initial count. - * @param autoDel {@code True} to automatically delete latch from cache when its count reaches zero. - * @param create If {@code true} latch will be created in case it is not in cache, if it is {@code false} all - * parameters except {@code name} are ignored. - * @return Count down latch for the given name or {@code null} if it is not found and {@code create} is false. + * @param autoDel {@code True} to automatically delete latch from cache when + * its count reaches zero. + * @param create If {@code true} latch will be created in case it is not in cache, + * if it is {@code false} all parameters except {@code name} are ignored. + * @return Count down latch for the given name or {@code null} if it is not found and + * {@code create} is false. * @throws IgniteCheckedException If operation failed. */ public IgniteCountDownLatch countDownLatch(final String name, final int cnt, final boolean autoDel, final boolean create) - throws IgniteCheckedException { + throws IgniteCheckedException + { A.notNull(name, "name"); awaitInitialization(); @@ -1192,12 +1201,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. GridCacheCountDownLatchValue val = - cast(dsView.get(key), GridCacheCountDownLatchValue.class); + cast(dsView.get(key), GridCacheCountDownLatchValue.class); if (val != null) { if (val.get() > 0) { throw new IgniteCheckedException("Failed to remove count down latch " + - "with non-zero count: " + val.get()); + "with non-zero count: " + val.get()); } dsView.remove(key); @@ -1223,9 +1232,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @param name Name of the semaphore. * @param cnt Initial count. * @param failoverSafe {@code True} FailoverSafe parameter. - * @param create If {@code true} semaphore will be created in case it is not in cache, if it is {@code false} all - * parameters except {@code name} are ignored. - * @return Semaphore for the given name or {@code null} if it is not found and {@code create} is false. + * @param create If {@code true} semaphore will be created in case it is not in cache, + * if it is {@code false} all parameters except {@code name} are ignored. + * @return Semaphore for the given name or {@code null} if it is not found and + * {@code create} is false. * @throws IgniteCheckedException If operation failed. */ public IgniteSemaphore semaphore(final String name, final int cnt, final boolean failoverSafe, final boolean create) @@ -1341,11 +1351,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @param failoverSafe Flag indicating behaviour in case of failure. * @param fair Flag indicating fairness policy of this lock. * @param create If {@code true} reentrant lock will be created in case it is not in cache. - * @return ReentrantLock for the given name or {@code null} if it is not found and {@code create} is false. + * @return ReentrantLock for the given name or {@code null} if it is not found and + * {@code create} is false. * @throws IgniteCheckedException If operation failed. */ - public IgniteLock reentrantLock(final String name, final boolean failoverSafe, final boolean fair, - final boolean create) + public IgniteLock reentrantLock(final String name, final boolean failoverSafe, final boolean fair, final boolean create) throws IgniteCheckedException { A.notNull(name, "name"); @@ -1522,7 +1532,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onUpdated( Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts) - throws CacheEntryListenerException { + throws CacheEntryListenerException + { for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) { if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) { GridCacheInternal val0 = evt.getValue(); @@ -1592,8 +1603,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } else if (sem != null) { U.error(log, "Failed to cast object " + - "[expected=" + IgniteSemaphore.class.getSimpleName() + - ", actual=" + sem.getClass() + ", value=" + sem + ']'); + "[expected=" + IgniteSemaphore.class.getSimpleName() + + ", actual=" + sem.getClass() + ", value=" + sem + ']'); } } else if (val0 instanceof GridCacheLockState) { @@ -1750,18 +1761,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (atomicCfg == null) throw new IgniteException("Atomic data structure can not be created, " + "need to provide IgniteAtomicConfiguration."); - - if (atomicCfg.getAtomicSequenceReserveSize() <= 0) - throw new IgniteException( - "Atomic sequence can not be created, " + - "reserve size must be more than 0, but atomicSequenceReserveSize: " + atomicCfg.getAtomicSequenceReserveSize() - ); - - if (atomicCfg.getAtomicSequenceReservePercentage() > 100) - throw new IgniteException( - "Atomic sequence can not be created, reserve percentage must have value " + - "between 0 and 100, but atomicSequenceReservePercentage: " + atomicCfg.getAtomicSequenceReservePercentage() - ); } /** @@ -1780,7 +1779,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { col.cfg.getBackups() == cfg.getBackups() && col.cfg.getOffHeapMaxMemory() == cfg.getOffHeapMaxMemory() && ((col.cfg.getNodeFilter() == null && cfg.getNodeFilter() == null) || - (col.cfg.getNodeFilter() != null && col.cfg.getNodeFilter().equals(cfg.getNodeFilter())))) + (col.cfg.getNodeFilter() != null && col.cfg.getNodeFilter().equals(cfg.getNodeFilter())))) return col.cacheName; } @@ -1789,8 +1788,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** * @param c Closure to run. - * @return Closure return value. * @throws IgniteCheckedException If failed. + * @return Closure return value. */ private static <T> T retryTopologySafe(IgniteOutClosureX<T> c) throws IgniteCheckedException { for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) { @@ -2145,7 +2144,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { @Override public IgniteCheckedException process( MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry, Object... args) - throws EntryProcessorException { + throws EntryProcessorException + { Map<String, DataStructureInfo> map = entry.getValue(); if (map == null) { @@ -2223,7 +2223,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public T2<String, IgniteCheckedException> process( MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry, - Object... args) { + Object... args) + { Map<String, DataStructureInfo> map = entry.getValue(); CollectionInfo colInfo = (CollectionInfo)info.info; @@ -2302,7 +2303,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public String process( MutableEntry<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> entry, - Object... args) { + Object... args) + { List<CacheCollectionInfo> list = entry.getValue(); if (list == null) { @@ -2378,7 +2380,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public T2<Boolean, IgniteCheckedException> process( MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry, - Object... args) { + Object... args) + { Map<String, DataStructureInfo> map = entry.getValue(); if (map == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/980df4ea/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 77bbb41..7474f46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -17,21 +17,36 @@ package org.apache.ignite.internal.processors.datastructures; -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; - -import java.io.*; -import java.util.concurrent.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.internal.util.typedef.internal.CU.*; -import static org.apache.ignite.transactions.TransactionConcurrency.*; -import static org.apache.ignite.transactions.TransactionIsolation.*; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InvalidObjectException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.ObjectStreamException; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * Cache sequence implementation. @@ -72,32 +87,26 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc /** Local value of sequence. */ private long locVal; - /** Upper bound of local counter. */ - private long upBound; // TODO should be not included + /** Upper bound of local counter. */ + private long upBound; - /** Reserved bottom bound of local counter (included). */ - private long reservedBottomBound; - - /** Reserved upper bound of local counter (not included). */ - private long reservedUpBound; - - /** A limit after which a new reservation should be done. */ - private long newReservationLine; - - /** Whether reserveFuture already processed or not. */ - private boolean isReserveFutResultsProcessed = true; - - /** default 80% */ - private volatile int percentage; - - /** Sequence batch size */ + /** Sequence batch size */ private volatile int batchSize; /** Synchronization lock. */ private final Lock lock = new ReentrantLock(); - /** Reservation future. */ - private IgniteInternalFuture<?> reservationFut = new GridFinishedFuture<>(); + /** Await condition. */ + private Condition cond = lock.newCondition(); + + /** Callable for execution {@link #incrementAndGet} operation in async and sync mode. */ + private final Callable<Long> incAndGetCall = internalUpdate(1, true); + + /** Callable for execution {@link #getAndIncrement} operation in async and sync mode. */ + private final Callable<Long> getAndIncCall = internalUpdate(1, false); + + /** Add and get cache call guard. */ + private final AtomicBoolean updateGuard = new AtomicBoolean(); /** * Empty constructor required by {@link Externalizable}. @@ -122,15 +131,13 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView, GridCacheContext ctx, int batchSize, - int percentage, long locVal, - long upBound) { + long upBound) + { assert key != null; assert seqView != null; assert ctx != null; - assert batchSize > 0 : "BatchSize: " + batchSize; assert locVal <= upBound; - assert percentage >= 0 && percentage <= 100 : "Percentage: " + percentage; this.batchSize = batchSize; this.ctx = ctx; @@ -139,9 +146,6 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc this.upBound = upBound; this.locVal = locVal; this.name = name; - this.percentage = percentage; - - newReservationLine = locVal + (batchSize * percentage / 100); log = ctx.logger(getClass()); } @@ -168,7 +172,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc /** {@inheritDoc} */ @Override public long incrementAndGet() { try { - return internalUpdate(1, true); + return internalUpdate(1, incAndGetCall, true); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -178,7 +182,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc /** {@inheritDoc} */ @Override public long getAndIncrement() { try { - return internalUpdate(1, false); + return internalUpdate(1, getAndIncCall, false); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -190,7 +194,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc A.ensure(l > 0, " Parameter mustn't be less then 1: " + l); try { - return internalUpdate(l, true); + return internalUpdate(l, null, true); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -202,7 +206,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc A.ensure(l > 0, " Parameter mustn't be less then 1: " + l); try { - return internalUpdate(l, false); + return internalUpdate(l, null, false); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -213,134 +217,162 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc * Synchronous sequence update operation. Will add given amount to the sequence value. * * @param l Increment amount. + * @param updateCall Cache call that will update sequence reservation count in accordance with l. * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value - * prior to update. + * prior to update. * @return Sequence value. * @throws IgniteCheckedException If update failed. */ @SuppressWarnings("SignalWithoutCorrespondingAwait") - private long internalUpdate(final long l, final boolean updated) throws IgniteCheckedException { + private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException { + checkRemoved(); + assert l > 0; - while (true) { - checkRemoved(); + lock.lock(); - lock.lock(); // TODO locks here? + try { + // If reserved range isn't exhausted. + if (locVal + l <= upBound) { + long curVal = locVal; - try { - if (locVal + l >= newReservationLine && isReserveFutResultsProcessed && reservationFut.isDone()) - reservationFut = runAsyncReservation(0); + locVal += l; - // If reserved range isn't exhausted. - if (locVal + l < upBound) { - long curVal = locVal; + return updated ? locVal : curVal; + } + } + finally { + lock.unlock(); + } - locVal += l; + if (updateCall == null) + updateCall = internalUpdate(l, updated); - return updated ? locVal : curVal; + while (true) { + if (updateGuard.compareAndSet(false, true)) { + try { + // This call must be outside lock. + return CU.outTx(updateCall, ctx); } + finally { + lock.lock(); - if (!isReserveFutResultsProcessed && reservationFut.isDone()) { - isReserveFutResultsProcessed = true; + try { + updateGuard.set(false); - if (locVal + l < reservedUpBound) { - long curVal = locVal; + cond.signalAll(); + } + finally { + lock.unlock(); + } + } + } + else { + lock.lock(); - locVal = (locVal + l < reservedBottomBound) ? reservedBottomBound : locVal + l; + try { + while (locVal >= upBound && updateGuard.get()) + U.await(cond, 500, MILLISECONDS); - upBound = reservedUpBound; + checkRemoved(); - return updated ? locVal : curVal; - } - else { - long diff = locVal + l - reservedUpBound; + // If reserved range isn't exhausted. + if (locVal + l <= upBound) { + long curVal = locVal; - long off = (diff / batchSize) * batchSize; + locVal += l; - reservationFut = runAsyncReservation(off); + return updated ? locVal : curVal; } } + finally { + lock.unlock(); + } } - finally { - lock.unlock(); - } - - // If reserved range is exhausted. - reservationFut.get(); } } /** - * Runs async reservation of new range for current node. + * Asynchronous sequence update operation. Will add given amount to the sequence value. * - * @param off Offset. - * @return Future. + * @param l Increment amount. + * @param updateCall Cache call that will update sequence reservation count in accordance with l. + * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value + * prior to update. + * @return Future indicating sequence value. + * @throws IgniteCheckedException If update failed. */ - private IgniteInternalFuture<?> runAsyncReservation(final long off) { - assert off >= 0 : "Offset: " + off; - - return ctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - Callable<Void> reserveCall = retryTopologySafe(new Callable<Void>() { - @Override public Void call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicSequenceValue seq = seqView.get(key); - - checkRemoved(); - - assert seq != null; + @SuppressWarnings("SignalWithoutCorrespondingAwait") + private IgniteInternalFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated) + throws IgniteCheckedException { + checkRemoved(); - long newUpBound = -1; + A.ensure(l > 0, " Parameter mustn't be less then 1: " + l); - lock.lock(); + lock.lock(); - try { - assert isReserveFutResultsProcessed; + try { + // If reserved range isn't exhausted. + if (locVal + l <= upBound) { + long curVal = locVal; - isReserveFutResultsProcessed = false; + locVal += l; - long curGlobalVal = seq.get(); + return new GridFinishedFuture<>(updated ? locVal : curVal); + } + } + finally { + lock.unlock(); + } - reservedBottomBound = curGlobalVal + off; + if (updateCall == null) + updateCall = internalUpdate(l, updated); - newUpBound = reservedBottomBound + batchSize; + while (true) { + if (updateGuard.compareAndSet(false, true)) { + try { + // This call must be outside lock. + return ctx.closures().callLocalSafe(updateCall, true); + } + finally { + lock.lock(); - reservedUpBound = newUpBound; + try { + updateGuard.set(false); - newReservationLine = reservedBottomBound + (batchSize * percentage / 100); - } - finally { - lock.unlock(); - } + cond.signalAll(); + } + finally { + lock.unlock(); + } + } + } + else { + lock.lock(); - seq.set(newUpBound); + try { + while (locVal >= upBound && updateGuard.get()) + U.await(cond, 500, MILLISECONDS); - seqView.put(key, seq); + checkRemoved(); - tx.commit(); - } - catch (Error | Exception e) { - U.error(log, "Failed to get and add: " + this, e); + // If reserved range isn't exhausted. + if (locVal + l <= upBound) { + long curVal = locVal; - throw e; - } + locVal += l; - return null; + return new GridFinishedFuture<>(updated ? locVal : curVal); } - }); - - try { - CU.outTx(reserveCall, ctx); } - catch (IgniteCheckedException e) { - throw new IgniteException(e); + finally { + lock.unlock(); } } - }, /*sys pool*/ false); + } } - /** - * Get local batch size for this sequences. + /** Get local batch size for this sequences. * * @return Sequence batch size. */ @@ -366,25 +398,6 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc } } - /** {@inheritDoc} */ - @Override public int reservePercentage() { - return percentage; - } - - /** {@inheritDoc} */ - @Override public void reservePercentage(int percentage) { - A.ensure(percentage >= 0 && percentage <= 100, "Invalid reserve percentage: " + percentage); - - lock.lock(); - - try { - this.percentage = percentage; - } - finally { - lock.unlock(); - } - } - /** * Check removed status. * @@ -452,6 +465,89 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc } } + /** + * Method returns callable for execution all update operations in async and sync mode. + * + * @param l Value will be added to sequence. + * @param updated If {@code true}, will return updated value, if {@code false}, will return previous value. + * @return Callable for execution in async and sync mode. + */ + @SuppressWarnings("TooBroadScope") + private Callable<Long> internalUpdate(final long l, final boolean updated) { + return retryTopologySafe(new Callable<Long>() { + @Override public Long call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicSequenceValue seq = seqView.get(key); + + checkRemoved(); + + assert seq != null; + + long curLocVal; + + long newUpBound; + + lock.lock(); + + try { + curLocVal = locVal; + + // If local range was already reserved in another thread. + if (locVal + l <= upBound) { + long retVal = locVal; + + locVal += l; + + return updated ? locVal : retVal; + } + + long curGlobalVal = seq.get(); + + long newLocVal; + + /* We should use offset because we already reserved left side of range.*/ + long off = batchSize > 1 ? batchSize - 1 : 1; + + // Calculate new values for local counter, global counter and upper bound. + if (curLocVal + l >= curGlobalVal) { + newLocVal = curLocVal + l; + + newUpBound = newLocVal + off; + } + else { + newLocVal = curGlobalVal; + + newUpBound = newLocVal + off; + } + + locVal = newLocVal; + upBound = newUpBound; + + if (updated) + curLocVal = newLocVal; + } + finally { + lock.unlock(); + } + + // Global counter must be more than reserved upper bound. + seq.set(newUpBound + 1); + + seqView.put(key, seq); + + tx.commit(); + + return curLocVal; + } + catch (Error | Exception e) { + U.error(log, "Failed to get and add: " + this, e); + + throw e; + } + } + }); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx.kernalContext()); http://git-wip-us.apache.org/repos/asf/ignite/blob/980df4ea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceMultiThreadedAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceMultiThreadedAbstractTest.java deleted file mode 100644 index 10b57e8..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceMultiThreadedAbstractTest.java +++ /dev/null @@ -1,579 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.datastructures; - -import java.util.Random; -import java.util.UUID; -import org.apache.ignite.IgniteAtomicSequence; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.AtomicConfiguration; -import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicSequenceImpl; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.testframework.GridTestUtils; - -/** - * Cache partitioned multi-threaded tests. - */ -public abstract class GridCacheAtomicSequenceMultiThreadedAbstractTest extends IgniteAtomicsAbstractTest { - /** Number of threads for multithreaded test. */ - private static final int THREAD_NUM = 30; - - /** Number of iterations per thread for multithreaded test. */ - private static final int ITERATION_NUM = 10000; - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 1; - } - - /** {@inheritDoc} */ - @Override protected AtomicConfiguration atomicConfiguration() { - AtomicConfiguration cfg = super.atomicConfiguration(); - - cfg.setBackups(1); - cfg.setAtomicSequenceReserveSize(10); - - return cfg; - } - - /** @throws Exception If failed. */ - public void testValues() throws Exception { - String seqName = UUID.randomUUID().toString(); - - final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true); - - // Local reservations. - assertEquals(1, seq.incrementAndGet()); - assertEquals(1, seq.getAndIncrement()); // Seq = 2 - assertEquals(3L, seq.incrementAndGet()); - assertEquals(3L, seq.getAndIncrement()); // Seq=4 - - assertEquals(4, seq.getAndAdd(3)); - assertEquals(9, seq.addAndGet(2)); - - assertEquals(new Long(9L), U.field(seq, "locVal")); - assertEquals(new Long(10L), U.field(seq, "upBound")); - - // Cache calls. - assertEquals(10, seq.incrementAndGet()); - - assertEquals(new Long(10L), U.field(seq, "locVal")); - assertEquals(new Long(20L), U.field(seq, "upBound")); - - seq.addAndGet(9); - - assertEquals(new Long(19L), U.field(seq, "locVal")); - assertEquals(new Long(20L), U.field(seq, "upBound")); - - assertEquals(20L, seq.incrementAndGet()); - - assertEquals(new Long(20L), U.field(seq, "locVal")); - assertEquals(new Long(30L), U.field(seq, "upBound")); - - seq.addAndGet(9); - - assertEquals(new Long(29L), U.field(seq, "locVal")); - assertEquals(new Long(30L), U.field(seq, "upBound")); - - assertEquals(29, seq.getAndIncrement()); - - assertEquals(new Long(30L), U.field(seq, "locVal")); - assertEquals(new Long(40L), U.field(seq, "upBound")); - - seq.addAndGet(9); - - assertEquals(new Long(39L), U.field(seq, "locVal")); - assertEquals(new Long(40L), U.field(seq, "upBound")); - - assertEquals(39L, seq.getAndIncrement()); - - assertEquals(new Long(40L), U.field(seq, "locVal")); - assertEquals(new Long(50L), U.field(seq, "upBound")); - - seq.addAndGet(9); - - assertEquals(new Long(49L), U.field(seq, "locVal")); - assertEquals(new Long(50L), U.field(seq, "upBound")); - - assertEquals(50, seq.addAndGet(1)); - - assertEquals(new Long(50L), U.field(seq, "locVal")); - assertEquals(new Long(60L), U.field(seq, "upBound")); - - seq.addAndGet(9); - - assertEquals(new Long(59L), U.field(seq, "locVal")); - assertEquals(new Long(60L), U.field(seq, "upBound")); - - assertEquals(59, seq.getAndAdd(1)); - - assertEquals(new Long(60L), U.field(seq, "locVal")); - assertEquals(new Long(70L), U.field(seq, "upBound")); - } - - /** @throws Exception If failed. */ - public void testValues2() throws Exception { - String seqName = UUID.randomUUID().toString(); - - final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 10, true); - - assertSeqFields(seq, /*locVal*/ 10, /*upBound*/ 20, /*resBound*/ 18, /*resBottomBound*/ 0, /*resUpBound*/ 0); - - assertEquals(17, seq.addAndGet(7)); - - assertSeqFields(seq, /*locVal*/ 17, /*upBound*/ 20, /*resBound*/ 18, /*resBottomBound*/ 0, /*resUpBound*/ 0); - - assertEquals(18, seq.incrementAndGet()); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return !F.eq(U.field(seq, "isReserveFutResultsProcessed"), true); - } - }, 1000); - - assertSeqFields(seq, /*locVal*/ 18, /*upBound*/ 20, /*resBound*/ 28, /*resBottomBound*/ 20, /*resUpBound*/ 30); - - assertEquals(19, seq.incrementAndGet()); - - assertSeqFields(seq, /*locVal*/ 19, /*upBound*/ 20, /*resBound*/ 28, /*resBottomBound*/ 20, /*resUpBound*/ 30); - - assertEquals(20, seq.incrementAndGet()); - - assertSeqFields(seq, /*locVal*/ 20, /*upBound*/ 30, /*resBound*/ 28, /*resBottomBound*/ 20, /*resUpBound*/ 30); - } - - /** @throws Exception If failed. */ - public void testValuesPercentage50() throws Exception { - String seqName = UUID.randomUUID().toString(); - - final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true); - - seq.reservePercentage(50); - - assertSeqFields(seq, /*locVal*/ 0, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0); - - // Exhaust a first reserved range to get recalculated values according to new reserve percentage. - assertEquals(10, seq.addAndGet(10)); - - assertSeqFields(seq, /*locVal*/ 10, /*upBound*/ 20, /*resBound*/ 15, /*resBottomBound*/ 10, /*resUpBound*/ 20); - - assertEquals(15, seq.addAndGet(5)); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return !F.eq(U.field(seq, "isReserveFutResultsProcessed"), true); - } - }, 1000); - - assertSeqFields(seq, /*locVal*/ 15, /*upBound*/ 20, /*resBound*/ 25, /*resBottomBound*/ 20, /*resUpBound*/ 30); - } - - /** @throws Exception If failed. */ - public void testValuesPercentage0() throws Exception { - String seqName = UUID.randomUUID().toString(); - - final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true); - - seq.reservePercentage(0); - - assertSeqFields(seq, /*locVal*/ 0, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0); - - // Exhaust a first reserved range to get recalculated values according to new reserve percentage. - assertEquals(10, seq.addAndGet(10)); - - assertSeqFields(seq, /*locVal*/ 10, /*upBound*/ 20, /*resBound*/ 10, /*resBottomBound*/ 10, /*resUpBound*/ 20); - - assertEquals(11, seq.addAndGet(1)); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return !F.eq(U.field(seq, "isReserveFutResultsProcessed"), true); - } - }, 1000); - - assertSeqFields(seq, /*locVal*/ 11, /*upBound*/ 20, /*resBound*/ 20, /*resBottomBound*/ 20, /*resUpBound*/ 30); - - assertEquals(12, seq.incrementAndGet()); - - assertSeqFields(seq, /*locVal*/ 12, /*upBound*/ 20, /*resBound*/ 20, /*resBottomBound*/ 20, /*resUpBound*/ 30); - - assertEquals(20, seq.addAndGet(8)); - - assertSeqFields(seq, /*locVal*/ 20, /*upBound*/ 30, /*resBound*/ 20, /*resBottomBound*/ 20, /*resUpBound*/ 30); - } - - /** @throws Exception If failed. */ - public void testValuesPercentage100() throws Exception { - String seqName = UUID.randomUUID().toString(); - - final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true); - - seq.reservePercentage(100); - - assertSeqFields(seq, /*locVal*/ 0, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0); - - // Exhaust a first reserved range to get recalculated values according to new reserve percentage. - assertEquals(10, seq.addAndGet(10)); - - assertSeqFields(seq, /*locVal*/ 10, /*upBound*/ 20, /*resBound*/ 20, /*resBottomBound*/ 10, /*resUpBound*/ 20); - - assertEquals(19, seq.addAndGet(9)); - - assertSeqFields(seq, /*locVal*/ 19, /*upBound*/ 20, /*resBound*/ 20, /*resBottomBound*/ 10, /*resUpBound*/ 20); - - assertEquals(20, seq.incrementAndGet()); - - assertSeqFields(seq, /*locVal*/ 20, /*upBound*/ 30, /*resBound*/ 30, /*resBottomBound*/ 20, /*resUpBound*/ 30); - } - - /** @throws Exception If failed. */ - public void testValuesDoubleReservation() throws Exception { - String seqName = UUID.randomUUID().toString(); - - final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true); - - assertSeqFields(seq, /*locVal*/ 0, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0); - - assertEquals(30, seq.addAndGet(30)); - } - - /** @throws Exception If failed. */ - public void testValues2Nodes() throws Exception { - String seqName = UUID.randomUUID().toString(); - - startGrid(1); - - try { - final GridCacheAtomicSequenceImpl seq1 = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true); - final GridCacheAtomicSequenceImpl seq2 = (GridCacheAtomicSequenceImpl)grid(1).atomicSequence(seqName, 0, false); - - assertSeqFields(seq1, /*locVal*/ 0, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0); - assertSeqFields(seq2, /*locVal*/ 10, /*upBound*/ 20, /*resBound*/ 18, /*resBottomBound*/ 0, /*resUpBound*/ 0); - - assertEquals(1, seq1.incrementAndGet()); - assertEquals(11, seq2.incrementAndGet()); - - assertSeqFields(seq1, /*locVal*/ 1, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0); - assertSeqFields(seq2, /*locVal*/ 11, /*upBound*/ 20, /*resBound*/ 18, /*resBottomBound*/ 0, /*resUpBound*/ 0); - - assertEquals(7, seq1.addAndGet(6)); - assertEquals(17, seq2.addAndGet(6)); - - assertSeqFields(seq1, /*locVal*/ 7, /*upBound*/ 10, /*resBound*/ 8, /*resBottomBound*/ 0, /*resUpBound*/ 0); - assertSeqFields(seq2, /*locVal*/ 17, /*upBound*/ 20, /*resBound*/ 18, /*resBottomBound*/ 0, /*resUpBound*/ 0); - - // New reservation (reverse order) - assertEquals(18, seq2.incrementAndGet()); - - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return !F.eq(U.field(seq2, "isReserveFutResultsProcessed"), true); - } - }, 1000)); - - assertEquals(8, seq1.incrementAndGet()); - - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return !F.eq(U.field(seq1, "isReserveFutResultsProcessed"), true); - } - }, 1000)); - - assertSeqFields(seq1, /*locVal*/ 8, /*upBound*/ 10, /*resBound*/ 38, /*resBottomBound*/ 30, /*resUpBound*/ 40); - assertSeqFields(seq2, /*locVal*/ 18, /*upBound*/ 20, /*resBound*/ 28, /*resBottomBound*/ 20, /*resUpBound*/ 30); - - assertEquals(30, seq1.addAndGet(7)); - assertEquals(20, seq2.addAndGet(2)); - - assertSeqFields(seq1, /*locVal*/ 30, /*upBound*/ 40, /*resBound*/ 38, /*resBottomBound*/ 30, /*resUpBound*/ 40); - assertSeqFields(seq2, /*locVal*/ 20, /*upBound*/ 30, /*resBound*/ 28, /*resBottomBound*/ 20, /*resUpBound*/ 30); - } - finally { - stopGrid(1); - } - } - - /** - * @param seq Sequence. - * @param locVal Local value. - * @param upBound Up bound. - * @param newReservationLine Reservation bnound. - * @param reservedBottomBound Reservation bottom bound. - * @param reservedUpBound Reservation up bound. - */ - private void assertSeqFields(GridCacheAtomicSequenceImpl seq, long locVal, long upBound, long newReservationLine, - long reservedBottomBound, long reservedUpBound) { - assertEquals(new Long(locVal), U.field(seq, "locVal")); - assertEquals(new Long(upBound), U.field(seq, "upBound")); - assertEquals(new Long(newReservationLine), U.field(seq, "newReservationLine")); - assertEquals(new Long(reservedBottomBound), U.field(seq, "reservedBottomBound")); - assertEquals(new Long(reservedUpBound), U.field(seq, "reservedUpBound")); - } - - /** @throws Exception If failed. */ - public void testValues2NodesDoubleReservation() throws Exception { - String seqName = UUID.randomUUID().toString(); - - startGrid(1); - - try { - final GridCacheAtomicSequenceImpl seq1 = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true); - final GridCacheAtomicSequenceImpl seq2 = (GridCacheAtomicSequenceImpl)grid(1).atomicSequence(seqName, 0, false); - - assertEquals(1, seq1.incrementAndGet()); - assertEquals(11, seq2.incrementAndGet()); - - assertEquals(new Long(1L), U.field(seq1, "locVal")); - assertEquals(new Long(10L), U.field(seq1, "upBound")); - assertEquals(new Long(11L), U.field(seq2, "locVal")); - assertEquals(new Long(20L), U.field(seq2, "upBound")); - - assertEquals(31, seq2.addAndGet(20)); - - assertEquals(new Long(1L), U.field(seq1, "locVal")); - assertEquals(new Long(10L), U.field(seq1, "upBound")); - assertEquals(new Long(31L), U.field(seq2, "locVal")); - assertEquals(new Long(40L), U.field(seq2, "upBound")); - - // Jump - assertEquals(40, seq1.addAndGet(23)); - } - finally { - stopGrid(1); - } - } - - /** @throws Exception If failed. */ - public void testUpdatedSync() throws Exception { - checkUpdate(true); - } - - /** @throws Exception If failed. */ - public void testPreviousSync() throws Exception { - checkUpdate(false); - } - - /** @throws Exception If failed. */ - public void testIncrementAndGet() throws Exception { - // Random sequence names. - String seqName = UUID.randomUUID().toString(); - - final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); - - runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { - @Override public void apply(IgniteAtomicSequence t) { - t.incrementAndGet(); - } - }, seq, ITERATION_NUM, THREAD_NUM); - - assertEquals(ITERATION_NUM * THREAD_NUM, seq.get()); - } - - /** @throws Exception If failed. */ - public void testIncrementAndGet2Nodes() throws Exception { - startGrid(1); - - try { - String seqName = UUID.randomUUID().toString(); - - final IgniteAtomicSequence seq1 = grid(0).atomicSequence(seqName, 0L, true); - final IgniteAtomicSequence seq2 = grid(1).atomicSequence(seqName, 0L, true); - - multithreaded(new Runnable() { - @Override public void run() { - for (int i = 0; i < ITERATION_NUM; i++) { - if (i % 2 == 0) - seq1.incrementAndGet(); - else - seq2.incrementAndGet(); - } - } - }, THREAD_NUM); - - long seq1Val = seq1.get(); - long seq2Val = seq2.get(); - - assertEquals(ITERATION_NUM * THREAD_NUM + (seq1Val < seq2Val ? 0 : 10), seq1Val); - assertEquals(ITERATION_NUM * THREAD_NUM + (seq1Val < seq2Val ? 10 : 0), seq2Val); - } - finally { - stopGrid(1); - } - } - - /** @throws Exception If failed. */ - public void testGetAndIncrement() throws Exception { - // Random sequence names. - String seqName = UUID.randomUUID().toString(); - - final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); - - runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { - @Override public void apply(IgniteAtomicSequence t) { - t.getAndIncrement(); - } - }, seq, ITERATION_NUM, THREAD_NUM); - - assertEquals(ITERATION_NUM * THREAD_NUM, seq.get()); - } - - /** @throws Exception If failed. */ - public void testAddAndGet() throws Exception { - // Random sequence names. - String seqName = UUID.randomUUID().toString(); - - final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); - - runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { - @Override public void apply(IgniteAtomicSequence t) { - t.addAndGet(5); - } - }, seq, ITERATION_NUM, THREAD_NUM); - - assertEquals(5 * ITERATION_NUM * THREAD_NUM, seq.get()); - } - - /** @throws Exception If failed. */ - public void testGetAndAdd() throws Exception { - checkGetAndAdd(5); - } - - /** @throws Exception If failed. */ - public void testGetAndAdd2() throws Exception { - checkGetAndAdd(3); - } - - /** - * @param val Value. - * @throws Exception If failed. - */ - private void checkGetAndAdd(final int val) throws Exception { - // Random sequence names. - String seqName = UUID.randomUUID().toString(); - - final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); - - runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { - @Override public void apply(IgniteAtomicSequence t) { - t.getAndAdd(val); - } - }, seq, ITERATION_NUM, THREAD_NUM); - - assertEquals(val * ITERATION_NUM * THREAD_NUM, seq.get()); - } - - /** @throws Exception If failed. */ - public void testMixed1() throws Exception { - // Random sequence names. - String seqName = UUID.randomUUID().toString(); - - final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); - - runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { - @Override public void apply(IgniteAtomicSequence t) { - t.incrementAndGet(); - t.getAndIncrement(); - t.incrementAndGet(); - t.getAndIncrement(); - t.getAndAdd(3); - t.addAndGet(3); - } - }, seq, ITERATION_NUM, THREAD_NUM); - - assertEquals(10 * ITERATION_NUM * THREAD_NUM, seq.get()); - } - - /** @throws Exception If failed. */ - public void testMixed2() throws Exception { - // Random sequence names. - String seqName = UUID.randomUUID().toString(); - - final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); - - runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { - @Override public void apply(IgniteAtomicSequence t) { - t.getAndAdd(2); - t.addAndGet(3); - t.addAndGet(5); - t.getAndAdd(7); - } - }, seq, ITERATION_NUM, THREAD_NUM); - - assertEquals(17 * ITERATION_NUM * THREAD_NUM, seq.get()); - } - - /** - * Executes given closure in a given number of threads given number of times. - * - * @param c Closure to execute. - * @param seq Sequence to pass into closure. - * @param cnt Count of iterations per thread. - * @param threadCnt Thread count. - * @throws Exception If failed. - */ - protected void runSequenceClosure(final GridInUnsafeClosure<IgniteAtomicSequence> c, - final IgniteAtomicSequence seq, final int cnt, final int threadCnt) throws Exception { - multithreaded(new Runnable() { - @Override public void run() { - try { - for (int i = 0; i < cnt; i++) - c.apply(seq); - } - catch (IgniteCheckedException e) { - throw new RuntimeException(e); - } - } - }, threadCnt); - } - - /** - * @param updated Whether use updated values. - * @throws Exception If failed. - */ - @SuppressWarnings("IfMayBeConditional") - private void checkUpdate(boolean updated) throws Exception { - String seqName = UUID.randomUUID().toString(); - - final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); - - long curVal = 0; - - Random r = new Random(); - - for (int i = 0; i < ITERATION_NUM; i++) { - long delta = r.nextInt(10) + 1; - - long retVal = updated ? seq.addAndGet(delta) : seq.getAndAdd(delta); - - assertEquals(updated ? curVal + delta : curVal, retVal); - - curVal += delta; - } - } - - /** - * Closure that throws exception. - * - * @param <E> Closure argument type. - */ - private abstract static class GridInUnsafeClosure<E> { - public abstract void apply(E p) throws IgniteCheckedException; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/980df4ea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java index e7a477f..d988b2c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java @@ -18,12 +18,12 @@ package org.apache.ignite.internal.processors.cache.datastructures; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; - import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.configuration.CacheConfiguration; @@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.datastructures.GridCacheInternalKey import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; -import org.eclipse.jetty.util.ConcurrentHashSet; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -120,7 +119,7 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics assertEquals(BATCH_SIZE, seq.batchSize()); } - assertEquals(gridCount(), G.allGrids().size()); + assertEquals(1, G.allGrids().size()); } /** {@inheritDoc} */ @@ -309,29 +308,9 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics * @throws Exception If failed. */ public void testMultiThreadedSequenceIntegrity() throws Exception { - multiThreadedSequenceIntegrity(/*batchSize*/ 1, /*percentage*/ 30, /*initVal*/0); - multiThreadedSequenceIntegrity(/*batchSize*/ 7, /*percentage*/ 0, /*initVal*/-1500); - multiThreadedSequenceIntegrity(/*batchSize*/ 3, /*percentage*/ 100, /*initVal*/345); - } - - /** - * @throws Exception If failed. - */ - public void testMultiNodeSequenceIntegrity() throws Exception { - if (gridCount() < 2) - return; - - multiNodeSequenceIntegrity(/*batchSize*/ 1, /*percentage*/ 80, /*initVal*/0); - multiNodeSequenceIntegrity(/*batchSize*/ 1, /*percentage*/ 0, /*initVal*/-11); - multiNodeSequenceIntegrity(/*batchSize*/ 1, /*percentage*/ 100, /*initVal*/183); - - multiNodeSequenceIntegrity(/*batchSize*/ 7, /*percentage*/ 20, /*initVal*/83); - multiNodeSequenceIntegrity(/*batchSize*/ 7, /*percentage*/ 0, /*initVal*/-17); - multiNodeSequenceIntegrity(/*batchSize*/ 7, /*percentage*/ 100, /*initVal*/11); - - multiNodeSequenceIntegrity(/*batchSize*/ 11, /*percentage*/ 50, /*initVal*/-7); - multiNodeSequenceIntegrity(/*batchSize*/ 11, /*percentage*/ 0, /*initVal*/55); - multiNodeSequenceIntegrity(/*batchSize*/ 11, /*percentage*/ 100, /*initVal*/22); + multiThreadedSequenceIntegrity(1, 0); + multiThreadedSequenceIntegrity(7, -1500); + multiThreadedSequenceIntegrity(3, 345); } /** @@ -385,8 +364,8 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics * Sequence get and increment. * * @param seq Sequence for test. - * @return Result of operation. * @throws Exception If failed. + * @return Result of operation. */ private long getAndIncrement(IgniteAtomicSequence seq) throws Exception { long locSeqVal = seq.get(); @@ -402,8 +381,8 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics * Sequence add and increment * * @param seq Sequence for test. - * @return Result of operation. * @throws Exception If failed. + * @return Result of operation. */ private long incrementAndGet(IgniteAtomicSequence seq) throws Exception { long locSeqVal = seq.get(); @@ -420,8 +399,8 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics * * @param seq Sequence for test. * @param l Number of added elements. - * @return Result of operation. * @throws Exception If failed. + * @return Result of operation. */ private long addAndGet(IgniteAtomicSequence seq, long l) throws Exception { long locSeqVal = seq.get(); @@ -438,8 +417,8 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics * * @param seq Sequence for test. * @param l Number of added elements. - * @return Result of operation. * @throws Exception If failed. + * @return Result of operation. */ private long getAndAdd(IgniteAtomicSequence seq, long l) throws Exception { long locSeqVal = seq.get(); @@ -452,10 +431,10 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics } /** - * Sequence integrity. + * Sequence integrity. * * @param batchSize Sequence batch size. - * @param initVal Sequence initial value. + * @param initVal Sequence initial value. * @throws Exception If test fail. */ private void sequenceIntegrity(int batchSize, long initVal) throws Exception { @@ -486,24 +465,24 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics } /** - * Multi-threaded integrity. + * Multi-threaded integrity. * * @param batchSize Sequence batch size. - * @param initVal Sequence initial value. + * @param initVal Sequence initial value. * @throws Exception If test fail. */ - private void multiThreadedSequenceIntegrity(int batchSize, int percentage, long initVal) throws Exception { + private void multiThreadedSequenceIntegrity(int batchSize, long initVal) throws Exception { // Random sequence names. String locSeqName = UUID.randomUUID().toString(); // Sequence. - final IgniteAtomicSequence locSeq = grid().atomicSequence(locSeqName, initVal, true); + final IgniteAtomicSequence locSeq = grid().atomicSequence(locSeqName, initVal, + true); locSeq.batchSize(batchSize); - locSeq.reservePercentage(percentage); // Result set. - final Set<Long> resSet = new ConcurrentHashSet<>(); + final Set<Long> resSet = Collections.synchronizedSet(new HashSet<Long>()); // Get sequence value and try to put it result set. for (int i = 0; i < MAX_LOOPS_NUM; i++) { @@ -542,6 +521,7 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics resSet.add(val); + if (i % 100 == 0) info("Finished iteration 2: " + i); } @@ -559,51 +539,6 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends IgniteAtomics } /** - * Multi-threaded integrity. - * - * @param batchSize Sequence batch size. - * @param initVal Sequence initial value. - * @throws Exception If test fail. - */ - private void multiNodeSequenceIntegrity(int batchSize, int percentage, long initVal) throws Exception { - // Random sequence names. - String locSeqName = UUID.randomUUID().toString(); - - // Sequences. - final IgniteAtomicSequence[] locSeqs = new IgniteAtomicSequence[3]; - - for (int i = 0; i < locSeqs.length; i++) { - locSeqs[i] = grid(i).atomicSequence(locSeqName, initVal, true); - - locSeqs[i].batchSize(batchSize); - - locSeqs[i].reservePercentage(percentage); - } - - final Set<Long> resSet = new ConcurrentHashSet<>(); - - multithreaded( - new Callable() { - @Nullable @Override public Object call() throws Exception { - // Get sequence value and try to put it result set. - for (int i = 0; i < MAX_LOOPS_NUM; i++) { - Long val = locSeqs[i % locSeqs.length].getAndIncrement(); - - assert !resSet.contains(val) : "Element already in set : " + val; - - resSet.add(val); - } - - return null; - } - }, THREAD_NUM); - - assert resSet.size() == MAX_LOOPS_NUM * THREAD_NUM; - - removeSequence(locSeqName); - } - - /** * Test sequence integrity. * * @param seq Sequence for test. http://git-wip-us.apache.org/repos/asf/ignite/blob/980df4ea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java index ac38fd5..945650d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicSequenceMultiThreadedTest.java @@ -17,17 +17,324 @@ package org.apache.ignite.internal.processors.cache.datastructures.partitioned; +import java.util.Random; +import java.util.UUID; +import org.apache.ignite.IgniteAtomicSequence; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.internal.processors.cache.datastructures.GridCacheAtomicSequenceMultiThreadedAbstractTest; +import org.apache.ignite.configuration.AtomicConfiguration; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteAtomicsAbstractTest; +import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicSequenceImpl; +import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.cache.CacheMode.PARTITIONED; /** * Cache partitioned multi-threaded tests. */ -public class GridCachePartitionedAtomicSequenceMultiThreadedTest extends GridCacheAtomicSequenceMultiThreadedAbstractTest { +public class GridCachePartitionedAtomicSequenceMultiThreadedTest extends IgniteAtomicsAbstractTest { + /** Number of threads for multithreaded test. */ + private static final int THREAD_NUM = 30; + + /** Number of iterations per thread for multithreaded test. */ + private static final int ITERATION_NUM = 4000; + /** {@inheritDoc} */ @Override protected CacheMode atomicsCacheMode() { return PARTITIONED; } -} + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected AtomicConfiguration atomicConfiguration() { + AtomicConfiguration cfg = super.atomicConfiguration(); + + cfg.setBackups(1); + cfg.setAtomicSequenceReserveSize(10); + + return cfg; + } + + /** @throws Exception If failed. */ + public void testValues() throws Exception { + String seqName = UUID.randomUUID().toString(); + + final GridCacheAtomicSequenceImpl seq = (GridCacheAtomicSequenceImpl)grid(0).atomicSequence(seqName, 0, true); + + // Local reservations. + assertEquals(1, seq.incrementAndGet()); + assertEquals(1, seq.getAndIncrement()); // Seq = 2 + assertEquals(3L, seq.incrementAndGet()); + assertEquals(3L, seq.getAndIncrement()); // Seq=4 + + assertEquals(4, seq.getAndAdd(3)); + assertEquals(9, seq.addAndGet(2)); + + assertEquals(new Long(9L), U.field(seq, "locVal")); + assertEquals(new Long(9L), U.field(seq, "upBound")); + + // Cache calls. + assertEquals(10, seq.incrementAndGet()); + + assertEquals(new Long(10L), U.field(seq, "locVal")); + assertEquals(new Long(19L), U.field(seq, "upBound")); + + seq.addAndGet(9); + + assertEquals(new Long(19L), U.field(seq, "locVal")); + assertEquals(new Long(19L), U.field(seq, "upBound")); + + assertEquals(20L, seq.incrementAndGet()); + + assertEquals(new Long(20L), U.field(seq, "locVal")); + assertEquals(new Long(29L), U.field(seq, "upBound")); + + seq.addAndGet(9); + + assertEquals(new Long(29L), U.field(seq, "locVal")); + assertEquals(new Long(29L), U.field(seq, "upBound")); + + assertEquals(29, seq.getAndIncrement()); + + assertEquals(new Long(30L), U.field(seq, "locVal")); + assertEquals(new Long(39L), U.field(seq, "upBound")); + + seq.addAndGet(9); + + assertEquals(new Long(39L), U.field(seq, "locVal")); + assertEquals(new Long(39L), U.field(seq, "upBound")); + + assertEquals(39L, seq.getAndIncrement()); + + assertEquals(new Long(40L), U.field(seq, "locVal")); + assertEquals(new Long(49L), U.field(seq, "upBound")); + + seq.addAndGet(9); + + assertEquals(new Long(49L), U.field(seq, "locVal")); + assertEquals(new Long(49L), U.field(seq, "upBound")); + + assertEquals(50, seq.addAndGet(1)); + + assertEquals(new Long(50L), U.field(seq, "locVal")); + assertEquals(new Long(59L), U.field(seq, "upBound")); + + seq.addAndGet(9); + + assertEquals(new Long(59L), U.field(seq, "locVal")); + assertEquals(new Long(59L), U.field(seq, "upBound")); + + assertEquals(59, seq.getAndAdd(1)); + + assertEquals(new Long(60L), U.field(seq, "locVal")); + assertEquals(new Long(69L), U.field(seq, "upBound")); + } + + /** @throws Exception If failed. */ + public void testUpdatedSync() throws Exception { + checkUpdate(true); + } + + /** @throws Exception If failed. */ + public void testPreviousSync() throws Exception { + checkUpdate(false); + } + + /** @throws Exception If failed. */ + public void testIncrementAndGet() throws Exception { + // Random sequence names. + String seqName = UUID.randomUUID().toString(); + + final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); + + runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { + @Override public void apply(IgniteAtomicSequence t) { + t.incrementAndGet(); + } + }, seq, ITERATION_NUM, THREAD_NUM); + + assertEquals(ITERATION_NUM * THREAD_NUM, seq.get()); + } + + /** @throws Exception If failed. */ + public void testIncrementAndGetAsync() throws Exception { + // Random sequence names. + String seqName = UUID.randomUUID().toString(); + + final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); + + runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { + @Override public void apply(IgniteAtomicSequence t) { + t.incrementAndGet(); + } + }, seq, ITERATION_NUM, THREAD_NUM); + + assertEquals(ITERATION_NUM * THREAD_NUM, seq.get()); + } + + /** @throws Exception If failed. */ + public void testGetAndIncrement() throws Exception { + // Random sequence names. + String seqName = UUID.randomUUID().toString(); + + final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); + + runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { + @Override public void apply(IgniteAtomicSequence t) { + t.getAndIncrement(); + } + }, seq, ITERATION_NUM, THREAD_NUM); + + assertEquals(ITERATION_NUM * THREAD_NUM, seq.get()); + } + + /** @throws Exception If failed. */ + public void testGetAndIncrementAsync() throws Exception { + // Random sequence names. + String seqName = UUID.randomUUID().toString(); + + final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); + + runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { + @Override public void apply(IgniteAtomicSequence t) { + t.getAndIncrement(); + } + }, seq, ITERATION_NUM, THREAD_NUM); + + assertEquals(ITERATION_NUM * THREAD_NUM, seq.get()); + } + + /** @throws Exception If failed. */ + public void testAddAndGet() throws Exception { + // Random sequence names. + String seqName = UUID.randomUUID().toString(); + + final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); + + runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { + @Override public void apply(IgniteAtomicSequence t) { + t.addAndGet(5); + } + }, seq, ITERATION_NUM, THREAD_NUM); + + assertEquals(5 * ITERATION_NUM * THREAD_NUM, seq.get()); + } + + /** @throws Exception If failed. */ + public void testGetAndAdd() throws Exception { + // Random sequence names. + String seqName = UUID.randomUUID().toString(); + + final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); + + runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { + @Override public void apply(IgniteAtomicSequence t) { + t.getAndAdd(5); + } + }, seq, ITERATION_NUM, THREAD_NUM); + + assertEquals(5 * ITERATION_NUM * THREAD_NUM, seq.get()); + } + + /** @throws Exception If failed. */ + public void testMixed1() throws Exception { + // Random sequence names. + String seqName = UUID.randomUUID().toString(); + + final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); + + runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { + @Override public void apply(IgniteAtomicSequence t) { + t.incrementAndGet(); + t.getAndIncrement(); + t.incrementAndGet(); + t.getAndIncrement(); + t.getAndAdd(3); + t.addAndGet(3); + } + }, seq, ITERATION_NUM, THREAD_NUM); + + assertEquals(10 * ITERATION_NUM * THREAD_NUM, seq.get()); + } + + /** @throws Exception If failed. */ + public void testMixed2() throws Exception { + // Random sequence names. + String seqName = UUID.randomUUID().toString(); + + final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); + + runSequenceClosure(new GridInUnsafeClosure<IgniteAtomicSequence>() { + @Override public void apply(IgniteAtomicSequence t) { + t.getAndAdd(2); + t.addAndGet(3); + t.addAndGet(5); + t.getAndAdd(7); + } + }, seq, ITERATION_NUM, THREAD_NUM); + + assertEquals(17 * ITERATION_NUM * THREAD_NUM, seq.get()); + } + + /** + * Executes given closure in a given number of threads given number of times. + * + * @param c Closure to execute. + * @param seq Sequence to pass into closure. + * @param cnt Count of iterations per thread. + * @param threadCnt Thread count. + * @throws Exception If failed. + */ + protected void runSequenceClosure(final GridInUnsafeClosure<IgniteAtomicSequence> c, + final IgniteAtomicSequence seq, final int cnt, final int threadCnt) throws Exception { + multithreaded(new Runnable() { + @Override public void run() { + try { + for (int i = 0; i < cnt; i++) + c.apply(seq); + } + catch (IgniteCheckedException e) { + throw new RuntimeException(e); + } + } + }, threadCnt); + } + + /** + * @param updated Whether use updated values. + * @throws Exception If failed. + */ + @SuppressWarnings("IfMayBeConditional") + private void checkUpdate(boolean updated) throws Exception { + String seqName = UUID.randomUUID().toString(); + + final IgniteAtomicSequence seq = grid(0).atomicSequence(seqName, 0L, true); + + long curVal = 0; + + Random r = new Random(); + + for (int i = 0; i < ITERATION_NUM; i++) { + long delta = r.nextInt(10) + 1; + + long retVal = updated ? seq.addAndGet(delta) : seq.getAndAdd(delta); + + assertEquals(updated ? curVal + delta : curVal, retVal); + + curVal += delta; + } + } + + /** + * Closure that throws exception. + * + * @param <E> Closure argument type. + */ + private abstract static class GridInUnsafeClosure<E> { + public abstract void apply(E p) throws IgniteCheckedException; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/980df4ea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSequenceApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSequenceApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSequenceApiSelfTest.java index ae03348..adc2ab3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSequenceApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSequenceApiSelfTest.java @@ -40,8 +40,4 @@ public class GridCachePartitionedSequenceApiSelfTest extends GridCacheSequenceAp return atomicCfg; } - - @Override protected int gridCount() { - return 3; - } } \ No newline at end of file
