Ignite-perftest - Prepare optimizations for merge.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e58604a4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e58604a4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e58604a4 Branch: refs/heads/ignite-perftest-merge Commit: e58604a4aa6c2c0ec9a756ac40a5aae4af5621bc Parents: d12674a Author: Alexey Goncharuk <[email protected]> Authored: Fri Nov 13 13:15:05 2015 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Nov 13 13:15:05 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalGatewayImpl.java | 8 +- .../managers/communication/GridIoManager.java | 52 +- .../processors/cache/GridCacheGateway.java | 25 +- .../processors/cache/GridCacheIoManager.java | 34 +- .../processors/cache/GridCacheMapEntry.java | 16 +- .../processors/cache/GridCacheMvcc.java | 8 - .../distributed/dht/GridDhtLockFuture.java | 3 +- .../dht/GridDhtPartitionTopologyImpl.java | 3 - .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../cache/distributed/dht/GridDhtTxRemote.java | 3 +- ...arOptimisticSerializableTxPrepareFuture.java | 4 +- .../cache/distributed/near/GridNearTxLocal.java | 12 - .../clock/GridClockSyncProcessor.java | 2 +- .../internal/util/GridSpinReadWriteLock.java | 522 +++++++++---------- .../ignite/internal/util/nio/GridNioServer.java | 12 +- .../util/nio/GridSelectorNioSessionImpl.java | 42 +- 16 files changed, 356 insertions(+), 392 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java index dbf2f73..fe8c580 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java @@ -73,13 +73,13 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { if (stackTrace == null) stackTrace = stackTrace(); -// rwLock.readLock(); + rwLock.readLock(); GridKernalState state = this.state.get(); if (state != GridKernalState.STARTED) { // Unlock just acquired lock. -// rwLock.readUnlock(); + rwLock.readUnlock(); if (state == GridKernalState.DISCONNECTED) { assert reconnectFut != null; @@ -96,7 +96,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { if (stackTrace == null) stackTrace = stackTrace(); -// rwLock.readLock(); + rwLock.readLock(); if (state.get() == GridKernalState.DISCONNECTED) throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName); @@ -104,7 +104,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { /** {@inheritDoc} */ @Override public void readUnlock() { -// rwLock.readUnlock(); + rwLock.readUnlock(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index a14a05a..b8af8da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -167,10 +167,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private final Marshaller marsh; /** Busy lock. */ -// private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); + private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); /** Lock to sync maps access. */ -// private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); /** Fully started flag. When set to true, can send and receive messages. */ private volatile boolean started; @@ -396,7 +396,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } // Clean up delayed and ordered messages (need exclusive lock). -// lock.writeLock().lock(); + lock.writeLock().lock(); try { ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId); @@ -406,7 +406,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa "(sender node left topology): " + waitList); } finally { -// lock.writeLock().unlock(); + lock.writeLock().unlock(); } break; @@ -424,7 +424,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // 1. Process wait list. Collection<Collection<DelayedMessage>> delayedMsgs = new ArrayList<>(); -// lock.writeLock().lock(); + lock.writeLock().lock(); try { started = true; @@ -442,7 +442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } finally { -// lock.writeLock().unlock(); + lock.writeLock().unlock(); } // After write lock released. @@ -501,19 +501,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa boolean interrupted = false; // Busy wait is intentional. -// while (true) { -// try { -// if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS)) -// break; -// else -// Thread.sleep(200); -// } -// catch (InterruptedException ignore) { -// // Preserve interrupt status & ignore. -// // Note that interrupted flag is cleared. -// interrupted = true; -// } -// } + while (true) { + try { + if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS)) + break; + else + Thread.sleep(200); + } + catch (InterruptedException ignore) { + // Preserve interrupt status & ignore. + // Note that interrupted flag is cleared. + interrupted = true; + } + } try { if (interrupted) @@ -529,7 +529,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa stopping = true; } finally { -// busyLock.writeUnlock(); + busyLock.writeUnlock(); } } @@ -553,7 +553,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa assert nodeId != null; assert msg != null; -// busyLock.readLock(); + busyLock.readLock(); try { if (stopping) { @@ -581,7 +581,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } if (!started) { -// lock.readLock().lock(); + lock.readLock().lock(); try { if (!started) { // Sets to true in write lock, so double checking. @@ -601,7 +601,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } finally { -// lock.readLock().unlock(); + lock.readLock().unlock(); } } @@ -649,7 +649,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa U.error(log, "Failed to process message (will ignore): " + msg, e); } finally { -// busyLock.readUnlock(); + busyLock.readUnlock(); } } @@ -2001,7 +2001,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; } -// busyLock.readLock(); + busyLock.readLock(); try { if (stopping) { @@ -2077,7 +2077,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } finally { -// busyLock.readUnlock(); + busyLock.readUnlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 8a1f0c3..1562d70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -63,9 +63,9 @@ public class GridCacheGateway<K, V> { if (ctx.deploymentEnabled()) ctx.deploy().onEnter(); -// rwLock.readLock(); + rwLock.readLock(); -// checkState(true, true); + checkState(true, true); } /** @@ -106,11 +106,9 @@ public class GridCacheGateway<K, V> { onEnter(); // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop. -// rwLock.readLock(); -// -// return checkState(true, false); + rwLock.readLock(); - return true; + return checkState(true, false); } /** @@ -121,8 +119,7 @@ public class GridCacheGateway<K, V> { public boolean enterIfNotStoppedNoLock() { onEnter(); -// return checkState(false, false); - return true; + return checkState(false, false); } /** @@ -145,7 +142,7 @@ public class GridCacheGateway<K, V> { leaveNoLock(); } finally { -// rwLock.readUnlock(); + rwLock.readUnlock(); } } @@ -171,9 +168,9 @@ public class GridCacheGateway<K, V> { onEnter(); -// rwLock.readLock(); -// -// checkState(true, true); + rwLock.readLock(); + + checkState(true, true); // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. @@ -181,7 +178,7 @@ public class GridCacheGateway<K, V> { return setOperationContextPerCall(opCtx); } catch (Throwable e) { -// rwLock.readUnlock(); + rwLock.readUnlock(); throw e; } @@ -222,7 +219,7 @@ public class GridCacheGateway<K, V> { leaveNoLock(prev); } finally { -// rwLock.readUnlock(); + rwLock.readUnlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 1a118a7..2334780 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -101,7 +101,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { private boolean stopping; /** Mutex. */ -// private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); + private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); /** Deployment enabled. */ private boolean depEnabled; @@ -218,19 +218,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { boolean interrupted = false; // Busy wait is intentional. -// while (true) { -// try { -// if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS)) -// break; -// else -// Thread.sleep(200); -// } -// catch (InterruptedException ignore) { -// // Preserve interrupt status & ignore. -// // Note that interrupted flag is cleared. -// interrupted = true; -// } -// } + while (true) { + try { + if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS)) + break; + else + Thread.sleep(200); + } + catch (InterruptedException ignore) { + // Preserve interrupt status & ignore. + // Note that interrupted flag is cleared. + interrupted = true; + } + } if (interrupted) Thread.currentThread().interrupt(); @@ -239,7 +239,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { stopping = true; } finally { -// rw.writeUnlock(); + rw.writeUnlock(); } } @@ -251,7 +251,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg, final IgniteBiInClosure<UUID, GridCacheMessage> c) { -// rw.readLock(); + rw.readLock(); try { if (stopping) { @@ -282,7 +282,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (depEnabled) cctx.deploy().ignoreOwnership(false); -// rw.readUnlock(); + rw.readUnlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index b40ab6a..df9f5c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -298,17 +298,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject val0 = val; - if (val0 == null) { - if (hasOffHeapPointer()) { - IgniteBiTuple<byte[], Byte> t = valueBytes0(); + if (val0 == null && hasOffHeapPointer()) { + IgniteBiTuple<byte[], Byte> t = valueBytes0(); - return cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); - } - } - else if (val0 instanceof CacheObjectImpl) { - CacheObjectImpl im = (CacheObjectImpl)val0; - - val0 = new CacheObjectImpl(im.val, im.valBytes); + return cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); } return val0; @@ -2854,8 +2847,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** {@inheritDoc} */ @Override public KeyCacheObject key() { -// return key; - return new KeyCacheObjectImpl(((KeyCacheObjectImpl)key).val, ((KeyCacheObjectImpl)key).valBytes); + return key; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java index 543923a..adcbf92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java @@ -1370,12 +1370,4 @@ public final class GridCacheMvcc { @Override public String toString() { // Synchronize to ensure one-thread at a time. return S.toString(GridCacheMvcc.class, this); } - - public static void main(String[] args) { - ArrayList<String> col1 = new ArrayList<>(5); - - for (int i = 0; i < 5; i++) { - col1.add("" + i); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index d86a11d..219d841 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -380,10 +380,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> * @return Lock candidate. * @throws GridCacheEntryRemovedException If entry was removed. * @throws GridDistributedLockCancelledException If lock is canceled. - * @throws IgniteCheckedException If failed. */ @Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry entry) - throws GridCacheEntryRemovedException, GridDistributedLockCancelledException, IgniteCheckedException { + throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { if (log.isDebugEnabled()) log.debug("Adding entry: " + entry); http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 0afe70b..6bd283a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -686,9 +686,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); - if (true) - return affNodes; - lock.readLock().lock(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index dbe69f8..e2939b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -985,7 +985,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } } } - catch (GridCacheEntryRemovedException e) { + catch (GridCacheEntryRemovedException ignore) { assert false : "Got removed exception on entry with dht local candidate: " + entries; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 1050086..e268a88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import javax.cache.processor.EntryProcessor; @@ -209,7 +208,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { this.rmtFutId = rmtFutId; readMap = Collections.emptyMap(); - writeMap = new LinkedHashMap<>(U.capacity(txSize), 0.75f); + writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1); topologyVersion(topVer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 1ca90dd..9c6cb88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -389,9 +389,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim for (GridDistributedTxMapping m : mappings.values()) { assert !m.empty(); - MiniFuture fut = new MiniFuture(m); - - add(fut); + add(new MiniFuture(m)); } Collection<IgniteInternalFuture<?>> futs = (Collection)futures(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 278f6df..c88546b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -482,18 +482,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** - * @param nodeId Node ID. - * @param dhtVer DHT version. - * @param writeVer Write version. - */ - void addDhtVersion(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion writeVer) { - GridDistributedTxMapping m = mappings.get(nodeId); - - if (m != null) - m.dhtVersion(dhtVer, writeVer); - } - - /** * @param nodeId Undo mapping. */ @Override public boolean removeMapping(UUID nodeId) { http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java index a33c35e..b5c89cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java @@ -289,7 +289,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter { long now = clockSrc.currentTimeMillis(); - if (snap == null) + if (snap == null) return now; Long delta = snap.deltas().get(ctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java index 115fd80..a1fa892 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java @@ -78,44 +78,44 @@ public class GridSpinReadWriteLock { */ @SuppressWarnings("BusyWait") public void readLock() { -// int cnt = readLockEntryCnt.get(); -// -// // Read lock reentry or acquiring read lock while holding write lock. -// if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) { -// assert state > 0 || state == -1; -// -// readLockEntryCnt.set(cnt + 1); -// -// return; -// } -// -// boolean interrupted = false; -// -// while (true) { -// int cur = state; -// -// assert cur >= -1; -// -// if (cur == -1 || pendingWLocks > 0) { -// try { -// Thread.sleep(10); -// } -// catch (InterruptedException ignored) { -// interrupted = true; -// } -// -// continue; -// } -// -// if (compareAndSet(STATE_OFFS, cur, cur + 1)) { -// if (interrupted) -// Thread.currentThread().interrupt(); -// -// break; -// } -// } -// -// readLockEntryCnt.set(1); + int cnt = readLockEntryCnt.get(); + + // Read lock reentry or acquiring read lock while holding write lock. + if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) { + assert state > 0 || state == -1; + + readLockEntryCnt.set(cnt + 1); + + return; + } + + boolean interrupted = false; + + while (true) { + int cur = state; + + assert cur >= -1; + + if (cur == -1 || pendingWLocks > 0) { + try { + Thread.sleep(10); + } + catch (InterruptedException ignored) { + interrupted = true; + } + + continue; + } + + if (compareAndSet(STATE_OFFS, cur, cur + 1)) { + if (interrupted) + Thread.currentThread().interrupt(); + + break; + } + } + + readLockEntryCnt.set(1); } /** @@ -124,62 +124,60 @@ public class GridSpinReadWriteLock { * @return {@code true} if acquired. */ public boolean tryReadLock() { -// int cnt = readLockEntryCnt.get(); -// -// // Read lock reentry or acquiring read lock while holding write lock. -// if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) { -// assert state > 0 || state == -1; -// -// readLockEntryCnt.set(cnt + 1); -// -// return true; -// } -// -// while (true) { -// int cur = state; -// -// if (cur == -1 || pendingWLocks > 0) -// return false; -// -// if (compareAndSet(STATE_OFFS, cur, cur + 1)) { -// readLockEntryCnt.set(1); -// -// return true; -// } -// } - - return true; + int cnt = readLockEntryCnt.get(); + + // Read lock reentry or acquiring read lock while holding write lock. + if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) { + assert state > 0 || state == -1; + + readLockEntryCnt.set(cnt + 1); + + return true; + } + + while (true) { + int cur = state; + + if (cur == -1 || pendingWLocks > 0) + return false; + + if (compareAndSet(STATE_OFFS, cur, cur + 1)) { + readLockEntryCnt.set(1); + + return true; + } + } } /** * Read unlock. */ public void readUnlock() { -// int cnt = readLockEntryCnt.get(); -// -// if (cnt == 0) -// throw new IllegalMonitorStateException(); -// -// // Read unlock when holding write lock is performed here. -// if (cnt > 1 || Thread.currentThread().getId() == writeLockOwner) { -// assert state > 0 || state == -1; -// -// readLockEntryCnt.set(cnt - 1); -// -// return; -// } -// -// while (true) { -// int cur = state; -// -// assert cur > 0; -// -// if (compareAndSet(STATE_OFFS, cur, cur - 1)) { -// readLockEntryCnt.set(0); -// -// return; -// } -// } + int cnt = readLockEntryCnt.get(); + + if (cnt == 0) + throw new IllegalMonitorStateException(); + + // Read unlock when holding write lock is performed here. + if (cnt > 1 || Thread.currentThread().getId() == writeLockOwner) { + assert state > 0 || state == -1; + + readLockEntryCnt.set(cnt - 1); + + return; + } + + while (true) { + int cur = state; + + assert cur > 0; + + if (compareAndSet(STATE_OFFS, cur, cur - 1)) { + readLockEntryCnt.set(0); + + return; + } + } } /** @@ -187,95 +185,95 @@ public class GridSpinReadWriteLock { */ @SuppressWarnings("BusyWait") public void writeLock() { -// long threadId = Thread.currentThread().getId(); -// -// if (threadId == writeLockOwner) { -// assert state == -1; -// -// writeLockEntryCnt++; -// -// return; -// } -// -// // Increment pending write locks. -// while (true) { -// int pendingWLocks0 = pendingWLocks; -// -// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) -// break; -// } -// -// boolean interrupted = false; -// -// while (!compareAndSet(STATE_OFFS, 0, -1)) { -// try { -// Thread.sleep(10); -// } -// catch (InterruptedException ignored) { -// interrupted = true; -// } -// } -// -// // Decrement pending write locks. -// while (true) { -// int pendingWLocks0 = pendingWLocks; -// -// assert pendingWLocks0 > 0; -// -// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) -// break; -// } -// -// if (interrupted) -// Thread.currentThread().interrupt(); -// -// assert writeLockOwner == -1; -// -// writeLockOwner = threadId; -// writeLockEntryCnt = 1; + long threadId = Thread.currentThread().getId(); + + if (threadId == writeLockOwner) { + assert state == -1; + + writeLockEntryCnt++; + + return; + } + + // Increment pending write locks. + while (true) { + int pendingWLocks0 = pendingWLocks; + + if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) + break; + } + + boolean interrupted = false; + + while (!compareAndSet(STATE_OFFS, 0, -1)) { + try { + Thread.sleep(10); + } + catch (InterruptedException ignored) { + interrupted = true; + } + } + + // Decrement pending write locks. + while (true) { + int pendingWLocks0 = pendingWLocks; + + assert pendingWLocks0 > 0; + + if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) + break; + } + + if (interrupted) + Thread.currentThread().interrupt(); + + assert writeLockOwner == -1; + + writeLockOwner = threadId; + writeLockEntryCnt = 1; } /** * Acquires write lock without sleeping between unsuccessful attempts. */ public void writeLock0() { -// long threadId = Thread.currentThread().getId(); -// -// if (threadId == writeLockOwner) { -// assert state == -1; -// -// writeLockEntryCnt++; -// -// return; -// } -// -// // Increment pending write locks. -// while (true) { -// int pendingWLocks0 = pendingWLocks; -// -// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) -// break; -// } -// -// for (;;) { -// if (compareAndSet(STATE_OFFS, 0, -1)) -// break; -// } -// -// // Decrement pending write locks. -// while (true) { -// int pendingWLocks0 = pendingWLocks; -// -// assert pendingWLocks0 > 0; -// -// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) -// break; -// } -// -// assert writeLockOwner == -1; -// -// writeLockOwner = threadId; -// writeLockEntryCnt = 1; + long threadId = Thread.currentThread().getId(); + + if (threadId == writeLockOwner) { + assert state == -1; + + writeLockEntryCnt++; + + return; + } + + // Increment pending write locks. + while (true) { + int pendingWLocks0 = pendingWLocks; + + if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) + break; + } + + for (;;) { + if (compareAndSet(STATE_OFFS, 0, -1)) + break; + } + + // Decrement pending write locks. + while (true) { + int pendingWLocks0 = pendingWLocks; + + assert pendingWLocks0 > 0; + + if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) + break; + } + + assert writeLockOwner == -1; + + writeLockOwner = threadId; + writeLockEntryCnt = 1; } /** @@ -291,28 +289,26 @@ public class GridSpinReadWriteLock { * @return {@code True} if write lock has been acquired. */ public boolean tryWriteLock() { -// long threadId = Thread.currentThread().getId(); -// -// if (threadId == writeLockOwner) { -// assert state == -1; -// -// writeLockEntryCnt++; -// -// return true; -// } -// -// if (compareAndSet(STATE_OFFS, 0, -1)) { -// assert writeLockOwner == -1; -// -// writeLockOwner = threadId; -// writeLockEntryCnt = 1; -// -// return true; -// } -// -// return false; - - return true; + long threadId = Thread.currentThread().getId(); + + if (threadId == writeLockOwner) { + assert state == -1; + + writeLockEntryCnt++; + + return true; + } + + if (compareAndSet(STATE_OFFS, 0, -1)) { + assert writeLockOwner == -1; + + writeLockOwner = threadId; + writeLockEntryCnt = 1; + + return true; + } + + return false; } /** @@ -323,83 +319,81 @@ public class GridSpinReadWriteLock { */ @SuppressWarnings("BusyWait") public boolean tryWriteLock(long timeout, TimeUnit unit) throws InterruptedException { -// long threadId = Thread.currentThread().getId(); -// -// if (threadId == writeLockOwner) { -// assert state == -1; -// -// writeLockEntryCnt++; -// -// return true; -// } -// -// try { -// // Increment pending write locks. -// while (true) { -// int pendingWLocks0 = pendingWLocks; -// -// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) -// break; -// } -// -// long end = U.currentTimeMillis() + unit.toMillis(timeout); -// -// while (true) { -// if (compareAndSet(STATE_OFFS, 0, -1)) { -// assert writeLockOwner == -1; -// -// writeLockOwner = threadId; -// writeLockEntryCnt = 1; -// -// return true; -// } -// -// Thread.sleep(10); -// -// if (end <= U.currentTimeMillis()) -// return false; -// } -// } -// finally { -// // Decrement pending write locks. -// while (true) { -// int pendingWLocks0 = pendingWLocks; -// -// assert pendingWLocks0 > 0; -// -// if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) -// break; -// } -// } - - return true; + long threadId = Thread.currentThread().getId(); + + if (threadId == writeLockOwner) { + assert state == -1; + + writeLockEntryCnt++; + + return true; + } + + try { + // Increment pending write locks. + while (true) { + int pendingWLocks0 = pendingWLocks; + + if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1)) + break; + } + + long end = U.currentTimeMillis() + unit.toMillis(timeout); + + while (true) { + if (compareAndSet(STATE_OFFS, 0, -1)) { + assert writeLockOwner == -1; + + writeLockOwner = threadId; + writeLockEntryCnt = 1; + + return true; + } + + Thread.sleep(10); + + if (end <= U.currentTimeMillis()) + return false; + } + } + finally { + // Decrement pending write locks. + while (true) { + int pendingWLocks0 = pendingWLocks; + + assert pendingWLocks0 > 0; + + if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1)) + break; + } + } } /** * Releases write lock. */ public void writeUnlock() { -// long threadId = Thread.currentThread().getId(); -// -// if (threadId != writeLockOwner) -// throw new IllegalMonitorStateException(); -// -// if (writeLockEntryCnt > 1) { -// writeLockEntryCnt--; -// -// return; -// } -// -// writeLockEntryCnt = 0; -// writeLockOwner = -1; -// -// // Current thread holds write and read locks and is releasing -// // write lock now. -// int update = readLockEntryCnt.get() > 0 ? 1 : 0; -// -// boolean b = compareAndSet(STATE_OFFS, -1, update); -// -// assert b; + long threadId = Thread.currentThread().getId(); + + if (threadId != writeLockOwner) + throw new IllegalMonitorStateException(); + + if (writeLockEntryCnt > 1) { + writeLockEntryCnt--; + + return; + } + + writeLockEntryCnt = 0; + writeLockOwner = -1; + + // Current thread holds write and read locks and is releasing + // write lock now. + int update = readLockEntryCnt.get() > 0 ? 1 : 0; + + boolean b = compareAndSet(STATE_OFFS, -1, update); + + assert b; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index fb17cd7..f1aa4a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -424,7 +424,7 @@ public class GridNioServer<T> { assert ses != null; assert fut != null; - boolean wakeup = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut); + int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut); IgniteInClosure<IgniteException> ackClosure; @@ -432,17 +432,17 @@ public class GridNioServer<T> { fut.ackClosure(ackClosure); if (ses.closed()) { - fut.connectionClosed(); + if (ses.removeFuture(fut)) + fut.connectionClosed(); } - else if (wakeup) + else if (msgCnt == 1) // Change from 0 to 1 means that worker thread should be waken up. clientWorkers.get(ses.selectorIndex()).offer(fut); IgniteBiInClosure<GridNioSession, Integer> lsnr0 = msgQueueLsnr; if (lsnr0 != null) - // TODO ignite-perftest pass correct queue size. - lsnr0.apply(ses, 0); + lsnr0.apply(ses, msgCnt); } /** @@ -1383,7 +1383,7 @@ public class GridNioServer<T> { long now = U.currentTimeMillis(); - if (U.currentTimeMillis() - lastIdleCheck > 5000) { + if (now - lastIdleCheck > 5000) { lastIdleCheck = now; checkIdle(selector.keys()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e58604a4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index a2b7565..6b1f6a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -21,14 +21,14 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentLinkedDeque8; /** * Session implementation bound to selector API and socket API. @@ -37,7 +37,7 @@ import org.jetbrains.annotations.Nullable; */ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** Pending write requests. */ - private final ConcurrentLinkedQueue<GridNioFuture<?>> queue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>(); /** Selection key associated with this session. */ @GridToStringExclude @@ -47,7 +47,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { private final int selectorIdx; /** Size counter. */ - private final AtomicBoolean wakeupSelector = new AtomicBoolean(); + private final AtomicInteger queueSize = new AtomicInteger(); /** Semaphore. */ @GridToStringExclude @@ -163,14 +163,14 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @param writeFut Write request. * @return Updated size of the queue. */ - boolean offerSystemFuture(GridNioFuture<?> writeFut) { + int offerSystemFuture(GridNioFuture<?> writeFut) { writeFut.messageThread(true); - boolean res = queue.offer(writeFut); + boolean res = queue.offerFirst(writeFut); assert res : "Future was not added to queue"; - return !wakeupSelector.get() && wakeupSelector.compareAndSet(false, true); + return queueSize.incrementAndGet(); } /** @@ -183,7 +183,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * @param writeFut Write request to add. * @return Updated size of the queue. */ - boolean offerFuture(GridNioFuture<?> writeFut) { + int offerFuture(GridNioFuture<?> writeFut) { boolean msgThread = GridNioBackPressureControl.threadProcessingMessage(); if (sem != null && !msgThread) @@ -195,7 +195,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { assert res : "Future was not added to queue"; - return !wakeupSelector.get() && wakeupSelector.compareAndSet(false, true); + return queueSize.incrementAndGet(); } /** @@ -208,7 +208,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { assert add; - wakeupSelector.set(false); + boolean set = queueSize.compareAndSet(0, futs.size()); + + assert set; } /** @@ -217,13 +219,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { @Nullable GridNioFuture<?> pollFuture() { GridNioFuture<?> last = queue.poll(); - if (last == null) { - wakeupSelector.set(false); - - last = queue.poll(); - } - if (last != null) { + queueSize.decrementAndGet(); + if (sem != null && !last.messageThread()) sem.release(); @@ -248,12 +246,22 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } /** + * @param fut Future. + * @return {@code True} if future was removed from queue. + */ + boolean removeFuture(GridNioFuture<?> fut) { + assert closed(); + + return queue.removeLastOccurrence(fut); + } + + /** * Gets number of write requests in a queue that have not been processed yet. * * @return Number of write requests. */ int writeQueueSize() { - return queue.size(); + return queueSize.get(); } /** {@inheritDoc} */
