Repository: ignite Updated Branches: refs/heads/ignite-put-experimental [created] 207a4c917
WIP. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee363b9f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee363b9f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee363b9f Branch: refs/heads/ignite-put-experimental Commit: ee363b9fc0f5035d3f976a25e58426979578e6d8 Parents: 865e376 Author: thatcoach <[email protected]> Authored: Sat Mar 19 23:50:53 2016 +0300 Committer: thatcoach <[email protected]> Committed: Sat Mar 19 23:50:53 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 55 ++++++++-------- .../deployment/GridDeploymentLocalStore.java | 39 ++++++------ .../processors/cache/GridCacheMvccManager.java | 40 ++++++------ .../processors/cache/GridCacheUtils.java | 56 ++++++++-------- .../dht/atomic/GridDhtAtomicCache.java | 43 +++++++------ .../ignite/internal/util/lang/GridFunc.java | 52 +++++++-------- .../ignite/internal/util/nio/GridNioServer.java | 59 ++++++++--------- .../util/nio/GridSelectorNioSessionImpl.java | 15 +++-- .../communication/tcp/TcpCommunicationSpi.java | 67 ++++++++++---------- 9 files changed, 216 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 9ffbf4e..c4ca984 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -17,27 +17,6 @@ package org.apache.ignite.internal.managers.communication; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -81,7 +60,27 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; @@ -124,7 +123,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private final Object sysLsnrsMux = new Object(); /** Disconnect listeners. */ - private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>(); + private final Collection<GridDisconnectListener> disconnectLsnrs = new LinkedBlockingQueue<>(); /** Map of {@link IoPool}-s injected by Ignite plugins. */ private final IoPool[] ioPools = new IoPool[128]; @@ -164,7 +163,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private final long discoDelay; /** Cache for messages that were received prior to discovery. */ - private final ConcurrentMap<UUID, ConcurrentLinkedDeque8<DelayedMessage>> waitMap = + private final ConcurrentMap<UUID, LinkedBlockingDeque<DelayedMessage>> waitMap = new ConcurrentHashMap8<>(); /** Communication message listener. */ @@ -418,7 +417,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa lock.writeLock().lock(); try { - ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId); + LinkedBlockingDeque<DelayedMessage> waitList = waitMap.remove(nodeId); if (log.isDebugEnabled()) log.debug("Removed messages from discovery startup delay list " + @@ -448,9 +447,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa try { started = true; - for (Entry<UUID, ConcurrentLinkedDeque8<DelayedMessage>> e : waitMap.entrySet()) { + for (Entry<UUID, LinkedBlockingDeque<DelayedMessage>> e : waitMap.entrySet()) { if (ctx.discovery().node(e.getKey()) != null) { - ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(e.getKey()); + LinkedBlockingDeque<DelayedMessage> waitList = waitMap.remove(e.getKey()); if (log.isDebugEnabled()) log.debug("Processing messages from discovery startup delay list: " + waitList); @@ -610,7 +609,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa log.debug("Adding message to waiting list [senderId=" + nodeId + ", msg=" + msg + ']'); - ConcurrentLinkedDeque8<DelayedMessage> list = + LinkedBlockingDeque<DelayedMessage> list = F.addIfAbsent(waitMap, nodeId, F.<DelayedMessage>newDeque()); assert list != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java index ab45708..024ba00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java @@ -17,15 +17,6 @@ package org.apache.ignite.internal.managers.deployment; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.compute.ComputeTaskName; @@ -46,7 +37,17 @@ import org.apache.ignite.spi.deployment.DeploymentResource; import org.apache.ignite.spi.deployment.DeploymentSpi; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOYED; import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOY_FAILED; @@ -60,7 +61,7 @@ import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED; */ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { /** Deployment cache by class name. */ - private final ConcurrentMap<String, ConcurrentLinkedDeque8<GridDeployment>> cache = + private final ConcurrentMap<String, LinkedBlockingDeque<GridDeployment>> cache = new ConcurrentHashMap8<>(); /** Mutex. */ @@ -110,7 +111,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { Collection<GridDeployment> deps = new ArrayList<>(); synchronized (mux) { - for (ConcurrentLinkedDeque8<GridDeployment> depList : cache.values()) + for (LinkedBlockingDeque<GridDeployment> depList : cache.values()) for (GridDeployment d : depList) if (!deps.contains(d)) deps.add(d); @@ -122,7 +123,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { /** {@inheritDoc} */ @Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) { synchronized (mux) { - for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values()) + for (LinkedBlockingDeque<GridDeployment> deps : cache.values()) for (GridDeployment dep : deps) if (dep.classLoaderId().equals(ldrId)) return dep; @@ -232,7 +233,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { * @return Deployment. */ @Nullable private GridDeployment deployment(String alias) { - ConcurrentLinkedDeque8<GridDeployment> deps = cache.get(alias); + LinkedBlockingDeque<GridDeployment> deps = cache.get(alias); if (deps != null) { GridDeployment dep = deps.peekFirst(); @@ -260,10 +261,10 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { boolean fireEvt = false; try { - ConcurrentLinkedDeque8<GridDeployment> cachedDeps = null; + LinkedBlockingDeque<GridDeployment> cachedDeps = null; // Find existing class loader info. - for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values()) { + for (LinkedBlockingDeque<GridDeployment> deps : cache.values()) { for (GridDeployment d : deps) { if (d.classLoader() == ldr) { // Cache class and alias. @@ -304,7 +305,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { assert fireEvt : "Class was not added to newly created deployment [cls=" + cls + ", depMode=" + depMode + ", dep=" + dep + ']'; - ConcurrentLinkedDeque8<GridDeployment> deps = + LinkedBlockingDeque<GridDeployment> deps = F.addIfAbsent(cache, alias, F.<GridDeployment>newDeque()); if (!deps.isEmpty()) { @@ -512,8 +513,8 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter { Collection<GridDeployment> doomed = new HashSet<>(); synchronized (mux) { - for (Iterator<ConcurrentLinkedDeque8<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) { - ConcurrentLinkedDeque8<GridDeployment> deps = i1.next(); + for (Iterator<LinkedBlockingDeque<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) { + LinkedBlockingDeque<GridDeployment> deps = i1.next(); for (Iterator<GridDeployment> i2 = deps.iterator(); i2.hasNext();) { GridDeployment dep = i2.next(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index afba4bc..b3fcd73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -17,21 +17,8 @@ package org.apache.ignite.internal.processors.cache; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -64,7 +51,20 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_NESTED_LISTENER_CALLS; import static org.apache.ignite.IgniteSystemProperties.getInteger; @@ -116,7 +116,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap(); /** Finish futures. */ - private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>(); + private final LinkedBlockingDeque<FinishLockFuture> finishFuts = new LinkedBlockingDeque<>(); /** Nested listener calls. */ private final ThreadLocal<Integer> nestedLsnrCalls = new ThreadLocal<Integer>() { @@ -233,7 +233,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { else if (log.isDebugEnabled()) log.debug("Failed to find transaction for changed owner: " + owner); - if (!finishFuts.isEmptyx()) { + if (!finishFuts.isEmpty()) { for (FinishLockFuture f : finishFuts) f.recheck(entry); } @@ -964,7 +964,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { X.println(">>> lockedSize: " + locked.size()); X.println(">>> futsSize: " + (mvccFuts.size() + futs.size())); X.println(">>> near2dhtSize: " + near2dht.size()); - X.println(">>> finishFutsSize: " + finishFuts.sizex()); + X.println(">>> finishFutsSize: " + finishFuts.size()); } /** @@ -984,7 +984,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) { Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>(); - if (!finishFuts.isEmptyx()) { + if (!finishFuts.isEmpty()) { for (FinishLockFuture fut : finishFuts) { if (fut.topologyVersion().equals(topVer)) cands.putAll(fut.pendingLocks()); @@ -1096,7 +1096,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { if (exchLog.isDebugEnabled()) exchLog.debug("Rechecking pending locks for completion."); - if (!finishFuts.isEmptyx()) { + if (!finishFuts.isEmpty()) { for (FinishLockFuture fut : finishFuts) fut.recheck(); } @@ -1133,7 +1133,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { Collection<GridCacheMvccCandidate> locs = entry.localCandidates(); if (!F.isEmpty(locs)) { - Collection<GridCacheMvccCandidate> cands = new ConcurrentLinkedQueue<>(); + Collection<GridCacheMvccCandidate> cands = new LinkedBlockingQueue<>(); cands.addAll(F.view(locs, versionFilter())); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 1cdd303..04a8a07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -17,32 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.cache.Cache; -import javax.cache.CacheException; -import javax.cache.configuration.Factory; -import javax.cache.expiry.Duration; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.integration.CacheWriterException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -55,7 +29,6 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.store.CacheStoreSessionListener; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.GridKernalContext; @@ -103,6 +76,33 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; +import javax.cache.Cache; +import javax.cache.CacheException; +import javax.cache.configuration.Factory; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CacheWriterException; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.LOCAL; @@ -797,7 +797,7 @@ public class GridCacheUtils { */ public static <T> IgniteReducer<T, Collection<T>> objectsReducer() { return new IgniteReducer<T, Collection<T>>() { - private final Collection<T> ret = new ConcurrentLinkedQueue<>(); + private final Collection<T> ret = new LinkedBlockingQueue<>(); @Override public boolean collect(T item) { if (item != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index e908c05..eec5271 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -17,24 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.io.Externalizable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -107,7 +89,26 @@ import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; + +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorResult; +import java.io.Externalizable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT; @@ -3175,7 +3176,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private AtomicBoolean guard = new AtomicBoolean(false); /** Response versions. */ - private ConcurrentLinkedDeque8<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>(); + private LinkedBlockingDeque<GridCacheVersion> respVers = new LinkedBlockingDeque<>(); /** Node ID. */ private final UUID nodeId; @@ -3242,7 +3243,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { respVers.add(ver); - if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true)) + if (respVers.size() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true)) snd = true; } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 0678657..e10b4ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -17,29 +17,6 @@ package org.apache.ignite.internal.util.lang; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.ConcurrentModificationException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.NoSuchElementException; -import java.util.RandomAccess; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJobResult; @@ -77,6 +54,31 @@ import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; import org.jsr166.ThreadLocalRandom8; +import javax.cache.Cache; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.NoSuchElementException; +import java.util.RandomAccess; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + /** * Contains factory and utility methods for {@code closures}, {@code predicates}, and {@code tuples}. * It also contains functional style collection comprehensions. @@ -2332,8 +2334,8 @@ public class GridFunc { * time its {@link org.apache.ignite.lang.IgniteOutClosure#apply()} method is called. */ @SuppressWarnings("unchecked") - public static <T> IgniteCallable<ConcurrentLinkedDeque8<T>> newDeque() { - return (IgniteCallable<ConcurrentLinkedDeque8<T>>)DEQUE_FACTORY; + public static <T> IgniteCallable<LinkedBlockingDeque<T>> newDeque() { + return (IgniteCallable<LinkedBlockingDeque<T>>)DEQUE_FACTORY; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 42c7ac7..4cc06a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -17,6 +17,31 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.configuration.ConnectorConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.Nullable; +import sun.nio.ch.DirectBuffer; + import java.io.IOException; import java.lang.reflect.Field; import java.net.InetAddress; @@ -43,31 +68,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.configuration.ConnectorConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.apache.ignite.thread.IgniteThread; -import org.jetbrains.annotations.Nullable; -import sun.nio.ch.DirectBuffer; +import java.util.concurrent.LinkedBlockingQueue; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.ACK_CLOSURE; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER; @@ -1113,7 +1114,7 @@ public class GridNioServer<T> { */ private void writeSslSystem(GridSelectorNioSessionImpl ses, WritableByteChannel sockCh) throws IOException { - ConcurrentLinkedQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY); + LinkedBlockingQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY); assert queue != null; @@ -1248,7 +1249,7 @@ public class GridNioServer<T> { */ private abstract class AbstractNioClientWorker extends GridWorker { /** Queue of change requests on this selector. */ - private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>(); + private final LinkedBlockingQueue<NioOperationFuture> changeReqs = new LinkedBlockingQueue<>(); /** Selector to select read events. */ private Selector selector; @@ -2255,7 +2256,7 @@ public class GridNioServer<T> { /** {@inheritDoc} */ @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException { if (directMode && sslFilter != null) - ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new ConcurrentLinkedQueue<>()); + ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new LinkedBlockingQueue<>()); proceedSessionOpened(ses); } @@ -2276,7 +2277,7 @@ public class GridNioServer<T> { boolean sslSys = sslFilter != null && msg instanceof ByteBuffer; if (sslSys) { - ConcurrentLinkedQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY); + LinkedBlockingQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY); assert queue != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 1241f99..0b68d5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -17,18 +17,19 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.util.Collection; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; /** * Session implementation bound to selector API and socket API. @@ -37,7 +38,7 @@ import org.jsr166.ConcurrentLinkedDeque8; */ class GridSelectorNioSessionImpl extends GridNioSessionImpl { /** Pending write requests. */ - private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>(); + private final LinkedBlockingDeque<GridNioFuture<?>> queue = new LinkedBlockingDeque<>(); /** Selection key associated with this session. */ @GridToStringExclude http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index b283b82..0611f46 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -17,37 +17,6 @@ package org.apache.ignite.spi.communication.tcp; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.ConnectException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SocketChannel; -import java.nio.channels.spi.AbstractInterruptibleChannel; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; @@ -126,9 +95,41 @@ import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import org.jsr166.LongAdder8; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ConnectException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.AbstractInterruptibleChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -803,7 +804,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private ShmemAcceptWorker shmemAcceptWorker; /** Shared memory workers. */ - private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>(); + private final Collection<ShmemWorker> shmemWorkers = new LinkedBlockingDeque<>(); /** Clients. */ private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
