IGNITE-3688: Fixed visiblity issue in GridCacheIoManager.idxClsHandlers.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/278633ec Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/278633ec Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/278633ec Branch: refs/heads/ignite-3220-1 Commit: 278633eced6d8039b5be4a18eefe6c65650aba4f Parents: 5cf3bea Author: Yakov Zhdanov <[email protected]> Authored: Mon Aug 15 14:27:22 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Mon Aug 15 14:27:22 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 24 ++- .../processors/cache/GridCacheMessage.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 176 ++++++++++++++----- 3 files changed, 155 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/278633ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 488a22c..78dddd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -97,7 +98,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { private int retryCnt; /** Indexed class handlers. */ - private Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); + private volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); /** Handler registry. */ private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> @@ -241,7 +242,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { IgniteBiInClosure<UUID, GridCacheMessage> c = null; if (msgIdx >= 0) { - IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers.get(cacheMsg.cacheId()); + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers; + + IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.cacheId()); if (cacheClsHandlers != null) c = cacheClsHandlers[msgIdx]; @@ -262,12 +265,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { append(", cacheDesc=").append(cctx.cache().cacheDescriptor(cacheMsg.cacheId())). append(']'); + msg0.append(U.nl()).append("Registered listeners:"); + + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers; + + for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet()) + msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue())); + if (cctx.kernalContext().isStopping()) { if (log.isDebugEnabled()) log.debug(msg0.toString()); } else - U.warn(log, msg0.toString()); + U.error(log, msg0.toString()); return; } @@ -1062,12 +1072,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { int msgIdx = messageIndex(type); if (msgIdx != -1) { - IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers.get(cacheId); + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers; + + IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheId); if (cacheClsHandlers == null) { cacheClsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX]; - idxClsHandlers.put(cacheId, cacheClsHandlers); + idxClsHandlers0.put(cacheId, cacheClsHandlers); } if (cacheClsHandlers[msgIdx] != null) @@ -1076,6 +1088,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { cacheClsHandlers[msgIdx] = c; + idxClsHandlers = idxClsHandlers0; + return; } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/278633ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index f99d2cd..c5407b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message { private static final long serialVersionUID = 0L; /** Maximum number of cache lookup indexes. */ - public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 256; + public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 5; /** Cache message index field name. */ public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX"; http://git-wip-us.apache.org/repos/asf/ignite/blob/278633ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 3616082..1e45fa7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -251,61 +251,155 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { preldr.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { - @Override public void apply(UUID nodeId, GridNearGetRequest req) { - processNearGetRequest(nodeId, req); - } - }); + ctx.io().addHandler( + ctx.cacheId(), + GridNearGetRequest.class, + new CI2<UUID, GridNearGetRequest>() { + @Override public void apply( + UUID nodeId, + GridNearGetRequest req + ) { + processNearGetRequest( + nodeId, + req); + } + }); - ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { - @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) { - processNearSingleGetRequest(nodeId, req); - } - }); + ctx.io().addHandler( + ctx.cacheId(), + GridNearSingleGetRequest.class, + new CI2<UUID, GridNearSingleGetRequest>() { + @Override public void apply( + UUID nodeId, + GridNearSingleGetRequest req + ) { + processNearSingleGetRequest( + nodeId, + req); + } + }); - ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { - @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { - processNearAtomicUpdateRequest(nodeId, req); - } - }); + ctx.io().addHandler( + ctx.cacheId(), + GridNearAtomicUpdateRequest.class, + new CI2<UUID, GridNearAtomicUpdateRequest>() { + @Override public void apply( + UUID nodeId, + GridNearAtomicUpdateRequest req + ) { + processNearAtomicUpdateRequest( + nodeId, + req); + } - ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() { - @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) { - processNearAtomicUpdateResponse(nodeId, res); - } - }); + @Override public String toString() { + return "GridNearAtomicUpdateRequest handler " + + "[msgIdx=" + GridNearAtomicUpdateRequest.CACHE_MSG_IDX + ']'; + } + }); - ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() { - @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { - processDhtAtomicUpdateRequest(nodeId, req); - } - }); + ctx.io().addHandler(ctx.cacheId(), + GridNearAtomicUpdateResponse.class, + new CI2<UUID, GridNearAtomicUpdateResponse>() { + @Override public void apply( + UUID nodeId, + GridNearAtomicUpdateResponse res + ) { + processNearAtomicUpdateResponse( + nodeId, + res); + } - ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() { - @Override public void apply(UUID nodeId, GridDhtAtomicUpdateResponse res) { - processDhtAtomicUpdateResponse(nodeId, res); - } - }); + @Override public String toString() { + return "GridNearAtomicUpdateResponse handler " + + "[msgIdx=" + GridNearAtomicUpdateResponse.CACHE_MSG_IDX + ']'; + } + }); - ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicDeferredUpdateResponse.class, - new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() { - @Override public void apply(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) { - processDhtAtomicDeferredUpdateResponse(nodeId, res); + ctx.io().addHandler( + ctx.cacheId(), + GridDhtAtomicUpdateRequest.class, + new CI2<UUID, GridDhtAtomicUpdateRequest>() { + @Override public void apply( + UUID nodeId, + GridDhtAtomicUpdateRequest req + ) { + processDhtAtomicUpdateRequest( + nodeId, + req); + } + + @Override public String toString() { + return "GridDhtAtomicUpdateRequest handler " + + "[msgIdx=" + GridDhtAtomicUpdateRequest.CACHE_MSG_IDX + ']'; } }); - if (near == null) { - ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { - @Override public void apply(UUID nodeId, GridNearGetResponse res) { - processNearGetResponse(nodeId, res); + ctx.io().addHandler( + ctx.cacheId(), + GridDhtAtomicUpdateResponse.class, + new CI2<UUID, GridDhtAtomicUpdateResponse>() { + @Override public void apply( + UUID nodeId, + GridDhtAtomicUpdateResponse res + ) { + processDhtAtomicUpdateResponse( + nodeId, + res); + } + + @Override public String toString() { + return "GridDhtAtomicUpdateResponse handler " + + "[msgIdx=" + GridDhtAtomicUpdateResponse.CACHE_MSG_IDX + ']'; } }); - ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { - @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) { - processNearSingleGetResponse(nodeId, res); + ctx.io().addHandler(ctx.cacheId(), + GridDhtAtomicDeferredUpdateResponse.class, + new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() { + @Override public void apply( + UUID nodeId, + GridDhtAtomicDeferredUpdateResponse res + ) { + processDhtAtomicDeferredUpdateResponse( + nodeId, + res); + } + + @Override public String toString() { + return "GridDhtAtomicDeferredUpdateResponse handler " + + "[msgIdx=" + GridDhtAtomicDeferredUpdateResponse.CACHE_MSG_IDX + ']'; } }); + + if (near == null) { + ctx.io().addHandler( + ctx.cacheId(), + GridNearGetResponse.class, + new CI2<UUID, GridNearGetResponse>() { + @Override public void apply( + UUID nodeId, + GridNearGetResponse res + ) { + processNearGetResponse( + nodeId, + res); + } + }); + + ctx.io().addHandler( + ctx.cacheId(), + GridNearSingleGetResponse.class, + new CI2<UUID, GridNearSingleGetResponse>() { + @Override public void apply( + UUID nodeId, + GridNearSingleGetResponse res + ) { + processNearSingleGetResponse( + nodeId, + res); + } + }); } }
