This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 910b7d58837 IGNITE-19115 Fixed handling cache messages for recreated cache (#10618) 910b7d58837 is described below commit 910b7d5883711eb31224d03860f92e18470880b9 Author: Slava Koptilin <slava.kopti...@gmail.com> AuthorDate: Fri Apr 14 11:29:55 2023 +0300 IGNITE-19115 Fixed handling cache messages for recreated cache (#10618) --- .../processors/cache/GridCacheAdapter.java | 2 +- .../processors/cache/GridCacheIoManager.java | 125 ++++-- .../processors/cache/GridCacheMessage.java | 5 +- .../cache/GridCachePartitionExchangeManager.java | 6 +- .../processors/cache/GridCacheProcessor.java | 6 + .../cache/distributed/dht/GridDhtCache.java | 2 - .../cache/distributed/dht/GridDhtCacheAdapter.java | 8 +- .../dht/GridDhtTransactionalCacheAdapter.java | 117 ++---- .../distributed/dht/atomic/GridDhtAtomicCache.java | 74 ++-- .../dht/colocated/GridDhtColocatedCache.java | 44 ++- .../distributed/near/GridNearAtomicCache.java | 16 +- .../near/GridNearTransactionalCache.java | 30 +- .../query/GridCacheDistributedQueryManager.java | 34 +- .../cache/query/GridCacheQueryRequest.java | 6 +- .../continuous/CacheContinuousQueryManager.java | 17 +- .../cache/transactions/IgniteTxEntry.java | 33 +- .../cache/transactions/IgniteTxHandler.java | 48 +-- .../cluster/GridClusterStateProcessor.java | 2 +- .../distributed/dht/IgniteCacheRecreateTest.java | 421 +++++++++++++++++++++ .../communication/GridCacheMessageSelfTest.java | 6 +- .../testsuites/IgniteCacheRestartTestSuite2.java | 3 + 21 files changed, 742 insertions(+), 263 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9a5e378ced9..55cd09b4e94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -646,7 +646,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * * @throws IgniteCheckedException If callback failed. */ - protected void onKernalStart() throws IgniteCheckedException { + public void onKernalStart() throws IgniteCheckedException { // No-op. } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index cb7bef39cf8..c9457994b16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -102,6 +103,7 @@ import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.util.IgniteUtils.nl; /** @@ -117,6 +119,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** */ private static final int MAX_STORED_PENDING_MESSAGES = 100; + /** Common message handler identifier that does not correspond to any particular cache. */ + public static final int COMMON_MESSAGE_HANDLER_ID = 0; + /** Delay in milliseconds between retries. */ private long retryDelay; @@ -334,19 +339,27 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { try { int msgIdx = cacheMsg.lookupIndex(); + AffinityTopologyVersion msgTopVer = cacheMsg.topologyVersion(); + IgniteBiInClosure<UUID, GridCacheMessage> c = null; if (msgIdx >= 0) { - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; + Map<Integer, IndexedClassHandler> idxClsHandlers0 = msgHandlers.idxClsHandlers; - IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId()); + IndexedClassHandler cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId()); - if (cacheClsHandlers != null) - c = cacheClsHandlers[msgIdx]; + if (cacheClsHandlers != null && + (NONE.equals(msgTopVer) || !msgTopVer.before(cacheClsHandlers.startTopVer))) + c = cacheClsHandlers.hndls[msgIdx]; } - if (c == null) - c = msgHandlers.clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass())); + if (c == null) { + RegularClassHandler rHnd = msgHandlers.clsHandlers.get( + new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass())); + + if (rHnd != null && (NONE.equals(msgTopVer) || !msgTopVer.before(rHnd.startTopVer))) + c = rHnd.hnd; + } if (c == null) { if (processMissedHandler(nodeId, cacheMsg)) @@ -365,10 +378,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { msg0.append(nl()).append("Registered listeners:"); - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; + Map<Integer, IndexedClassHandler> idxClsHandlers0 = msgHandlers.idxClsHandlers; - for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet()) - msg0.append(nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue())); + for (Map.Entry<Integer, IndexedClassHandler> e : idxClsHandlers0.entrySet()) + msg0.append(nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue().hndls)); if (cctx.kernalContext().isStopping()) { if (log.isDebugEnabled()) @@ -1402,12 +1415,30 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ public <Msg extends GridCacheMessage> void addCacheHandler( int hndId, + AffinityTopologyVersion startTopVer, Class<Msg> type, IgniteBiInClosure<UUID, ? super Msg> c ) { assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type; - addHandler(hndId, type, c, cacheHandlers); + addHandler(hndId, startTopVer, type, c, cacheHandlers); + } + + /** + * Registers a new message handler for the given message {@code type}. + * This method is equivalent to {@link #addCacheHandler(int, AffinityTopologyVersion, Class, IgniteBiInClosure)} where is + * the handler id equals to {@link #COMMON_MESSAGE_HANDLER_ID} and deployment id is {@code null}. + * + * @param type Type of message. + * @param c Handler. + */ + public <Msg extends GridCacheMessage> void addCacheHandler( + Class<Msg> type, + IgniteBiInClosure<UUID, ? super Msg> c + ) { + assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type; + + addHandler(COMMON_MESSAGE_HANDLER_ID, NONE, type, c, cacheHandlers); } /** @@ -1421,7 +1452,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { int hndId, Class<? extends GridCacheMessage> msgCls ) { - return cacheHandlers.clsHandlers.get(new ListenerKey(hndId, msgCls)); + RegularClassHandler clsHnd = cacheHandlers.clsHandlers.get(new ListenerKey(hndId, msgCls)); + + return (clsHnd != null) ? clsHnd.hnd : null; } /** @@ -1436,7 +1469,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { ) { assert !type.isAssignableFrom(GridCacheIdMessage.class) : type; - addHandler(hndId, type, c, grpHandlers); + addHandler(hndId, NONE, type, c, grpHandlers); } /** @@ -1447,6 +1480,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ private <Msg extends GridCacheMessage> void addHandler( int hndId, + AffinityTopologyVersion startTopVer, Class<Msg> type, IgniteBiInClosure<UUID, ? super Msg> c, MessageHandlers msgHandlers @@ -1454,16 +1488,16 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { int msgIdx = messageIndex(type); if (msgIdx != -1) { - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; + Map<Integer, IndexedClassHandler> idxClsHandlers0 = msgHandlers.idxClsHandlers; - IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.compute(hndId, (key, clsHandlers) -> { + IndexedClassHandler cacheClsHandlers = idxClsHandlers0.compute(hndId, (key, clsHandlers) -> { if (clsHandlers == null) - clsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX]; + clsHandlers = new IndexedClassHandler(startTopVer); - if (clsHandlers[msgIdx] != null) + if (clsHandlers.hndls[msgIdx] != null || !Objects.equals(clsHandlers.startTopVer, startTopVer)) return null; - clsHandlers[msgIdx] = c; + clsHandlers.hndls[msgIdx] = c; return clsHandlers; }); @@ -1476,11 +1510,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } else { ListenerKey key = new ListenerKey(hndId, type); + RegularClassHandler regHnd = new RegularClassHandler(startTopVer, (IgniteBiInClosure<UUID, GridCacheMessage>)c); - if (msgHandlers.clsHandlers.putIfAbsent(key, - (IgniteBiInClosure<UUID, GridCacheMessage>)c) != null) + if (msgHandlers.clsHandlers.putIfAbsent(key, regHnd) != null) { assert false : "Handler for class already registered [hndId=" + hndId + ", cls=" + type + ", old=" + msgHandlers.clsHandlers.get(key) + ", new=" + c + ']'; + } } IgniteLogger log0 = log; @@ -1694,16 +1729,62 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { X.println(">>> cacheGrpOrderedHandlersSize: " + grpHandlers.orderedHandlers.size()); } + /** */ + abstract static class MessageHandler { + /** Start topology version. */ + final AffinityTopologyVersion startTopVer; + + /** + * Creates a new message handler descriptor. + * + * @param startTopVer Start affinity topology version. + */ + MessageHandler(AffinityTopologyVersion startTopVer) { + this.startTopVer = startTopVer; + } + } + + /** */ + static class IndexedClassHandler extends MessageHandler { + /** Actual handlers. */ + final IgniteBiInClosure[] hndls = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX]; + + /** + * Creates a new message handler descriptor. + * + * @param startTopVer Start affinity topology version. + */ + IndexedClassHandler(AffinityTopologyVersion startTopVer) { + super(startTopVer); + } + } + + /** */ + static class RegularClassHandler extends MessageHandler { + /** Actual handler. */ + IgniteBiInClosure<UUID, GridCacheMessage> hnd; + + /** + * Creates a new message handler descriptor. + * + * @param startTopVer Start affinity topology version. + */ + RegularClassHandler(AffinityTopologyVersion startTopVer, IgniteBiInClosure<UUID, GridCacheMessage> hnd) { + super(startTopVer); + + this.hnd = hnd; + } + } + /** * */ static class MessageHandlers { /** Indexed class handlers. */ - volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new ConcurrentHashMap<>(); + volatile Map<Integer, IndexedClassHandler> idxClsHandlers = new ConcurrentHashMap<>(); /** Handler registry. */ - ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> - clsHandlers = new ConcurrentHashMap<>(); + ConcurrentMap<ListenerKey, RegularClassHandler> clsHandlers = new ConcurrentHashMap<>(); /** Ordered handler registry. */ ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers = diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index a9685a1eb8d..c655dc487ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -442,8 +442,11 @@ public abstract class GridCacheMessage implements Message { assert ctx != null; if (txEntries != null) { - for (IgniteTxEntry e : txEntries) + for (IgniteTxEntry e : txEntries) { + e.prepareUnmarshal(ctx, topologyVersion(), near); + e.unmarshal(ctx, near, ldr); + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 4115e56fa87..3957151db29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -413,7 +413,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); - cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class, + cctx.io().addCacheHandler(GridDhtPartitionsSingleMessage.class, new MessageHandler<GridDhtPartitionsSingleMessage>() { @Override public void onMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { GridDhtPartitionExchangeId exchangeId = msg.exchangeId(); @@ -450,7 +450,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } }); - cctx.io().addCacheHandler(0, GridDhtPartitionsFullMessage.class, + cctx.io().addCacheHandler(GridDhtPartitionsFullMessage.class, new MessageHandler<GridDhtPartitionsFullMessage>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { if (msg.exchangeId() == null) { @@ -470,7 +470,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } }); - cctx.io().addCacheHandler(0, GridDhtPartitionsSingleRequest.class, + cctx.io().addCacheHandler(GridDhtPartitionsSingleRequest.class, new MessageHandler<GridDhtPartitionsSingleRequest>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleRequest msg) { processSinglePartitionRequest(node, msg); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index d8bbc50a7f7..3ad724da2c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2074,6 +2074,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { cacheContext.finishRecovery(cacheStartVer, updatedDescriptor); + if (isNearEnabled(cacheContext)) { + GridDhtCacheAdapter dht = cacheContext.near().dht(); + + dht.context().finishRecovery(cacheStartVer, updatedDescriptor); + } + if (cacheContext.config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT && groupContext.affinityNode()) sharedCtx.coordinators().ensureStarted(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java index 7889163f177..455224bdd51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java @@ -76,8 +76,6 @@ public class GridDhtCache<K, V> extends GridDhtTransactionalCacheAdapter<K, V> { metrics.delegate(ctx.dht().near().metrics0()); ctx.dr().resetMetrics(); - - super.start(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 557153c0495..a785bb6bc80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -387,8 +387,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - ctx.io().addCacheHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + assert !ctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + name() + ']'; + + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridCacheTtlUpdateRequest.class, (CI2<UUID, GridCacheTtlUpdateRequest>)this::processTtlUpdateRequest); ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index fcd7ab1d3be..5c2e241e247 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -141,125 +141,80 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - super.start(); + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); - ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { - @Override public void apply(UUID nodeId, GridNearGetRequest req) { - processNearGetRequest(nodeId, req); - } - }); + assert !ctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + name() + ']'; - ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { - @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) { - processNearSingleGetRequest(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearGetRequest.class, + (CI2<UUID, GridNearGetRequest>)this::processNearGetRequest); - ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() { - @Override public void apply(UUID nodeId, GridNearLockRequest req) { - processNearLockRequest(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearSingleGetRequest.class, + (CI2<UUID, GridNearSingleGetRequest>)this::processNearSingleGetRequest); - ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() { - @Override public void apply(UUID nodeId, GridDhtLockRequest req) { - processDhtLockRequest(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearLockRequest.class, + (CI2<UUID, GridNearLockRequest>)this::processNearLockRequest); - ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() { - @Override public void apply(UUID nodeId, GridDhtLockResponse req) { - processDhtLockResponse(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtLockRequest.class, + (CI2<UUID, GridDhtLockRequest>)this::processDhtLockRequest); - ctx.io().addCacheHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() { - @Override public void apply(UUID nodeId, GridNearUnlockRequest req) { - processNearUnlockRequest(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtLockResponse.class, + (CI2<UUID, GridDhtLockResponse>)this::processDhtLockResponse); - ctx.io().addCacheHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() { - @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) { - processDhtUnlockRequest(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearUnlockRequest.class, + (CI2<UUID, GridNearUnlockRequest>)this::processNearUnlockRequest); - ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryEnlistRequest.class, new CI2<UUID, GridNearTxQueryEnlistRequest>() { - @Override public void apply(UUID nodeId, GridNearTxQueryEnlistRequest req) { - processNearTxQueryEnlistRequest(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtUnlockRequest.class, + (CI2<UUID, GridDhtUnlockRequest>)this::processDhtUnlockRequest); - ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryEnlistResponse.class, new CI2<UUID, GridNearTxQueryEnlistResponse>() { - @Override public void apply(UUID nodeId, GridNearTxQueryEnlistResponse req) { - processNearTxQueryEnlistResponse(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxQueryEnlistRequest.class, + (CI2<UUID, GridNearTxQueryEnlistRequest>)this::processNearTxQueryEnlistRequest); + + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxQueryEnlistResponse.class, + (CI2<UUID, GridNearTxQueryEnlistResponse>)this::processNearTxQueryEnlistResponse); - ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysRequest.class, + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtForceKeysRequest.class, new MessageHandler<GridDhtForceKeysRequest>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { processForceKeysRequest(node, msg); } }); - ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysResponse.class, + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtForceKeysResponse.class, new MessageHandler<GridDhtForceKeysResponse>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { processForceKeyResponse(node, msg); } }); - ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryResultsEnlistRequest.class, - new CI2<UUID, GridNearTxQueryResultsEnlistRequest>() { - @Override public void apply(UUID nodeId, GridNearTxQueryResultsEnlistRequest req) { - processNearTxQueryResultsEnlistRequest(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxQueryResultsEnlistRequest.class, + (CI2<UUID, GridNearTxQueryResultsEnlistRequest>)this::processNearTxQueryResultsEnlistRequest); - ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryResultsEnlistResponse.class, - new CI2<UUID, GridNearTxQueryResultsEnlistResponse>() { - @Override public void apply(UUID nodeId, GridNearTxQueryResultsEnlistResponse req) { - processNearTxQueryResultsEnlistResponse(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxQueryResultsEnlistResponse.class, + (CI2<UUID, GridNearTxQueryResultsEnlistResponse>)this::processNearTxQueryResultsEnlistResponse); - ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistRequest.class, - new CI2<UUID, GridNearTxEnlistRequest>() { - @Override public void apply(UUID nodeId, GridNearTxEnlistRequest req) { - processNearTxEnlistRequest(nodeId, req); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxEnlistRequest.class, + (CI2<UUID, GridNearTxEnlistRequest>)this::processNearTxEnlistRequest); - ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistResponse.class, - new CI2<UUID, GridNearTxEnlistResponse>() { - @Override public void apply(UUID nodeId, GridNearTxEnlistResponse msg) { - processNearTxEnlistResponse(nodeId, msg); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearTxEnlistResponse.class, + (CI2<UUID, GridNearTxEnlistResponse>)this::processNearTxEnlistResponse); - ctx.io().addCacheHandler(ctx.cacheId(), GridDhtTxQueryEnlistRequest.class, + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtTxQueryEnlistRequest.class, new CI2<UUID, GridDhtTxQueryEnlistRequest>() { @Override public void apply(UUID nodeId, GridDhtTxQueryEnlistRequest msg) { processDhtTxQueryEnlistRequest(nodeId, msg, false); } }); - ctx.io().addCacheHandler(ctx.cacheId(), GridDhtTxQueryFirstEnlistRequest.class, + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtTxQueryFirstEnlistRequest.class, new CI2<UUID, GridDhtTxQueryEnlistRequest>() { @Override public void apply(UUID nodeId, GridDhtTxQueryEnlistRequest msg) { processDhtTxQueryEnlistRequest(nodeId, msg, true); } }); - ctx.io().addCacheHandler(ctx.cacheId(), GridDhtTxQueryEnlistResponse.class, - new CI2<UUID, GridDhtTxQueryEnlistResponse>() { - @Override public void apply(UUID nodeId, GridDhtTxQueryEnlistResponse msg) { - processDhtTxQueryEnlistResponse(nodeId, msg); - } - }); + ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtTxQueryEnlistResponse.class, + (CI2<UUID, GridDhtTxQueryEnlistResponse>)this::processDhtTxQueryEnlistResponse); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 6b00da0ecfa..c3b2da0a9b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -241,44 +241,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - super.start(); - - assert metrics != null : "Cache metrics instance isn't initialized."; + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); - if (ctx.dht().near() != null) - metrics.delegate(ctx.dht().near().metrics0()); + assert !ctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + name() + ']'; ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridNearGetRequest.class, - new CI2<UUID, GridNearGetRequest>() { - @Override public void apply( - UUID nodeId, - GridNearGetRequest req - ) { - processNearGetRequest( - nodeId, - req); - } - }); + (CI2<UUID, GridNearGetRequest>)this::processNearGetRequest); ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridNearSingleGetRequest.class, - new CI2<UUID, GridNearSingleGetRequest>() { - @Override public void apply( - UUID nodeId, - GridNearSingleGetRequest req - ) { - processNearSingleGetRequest( - nodeId, - req); - } - }); + (CI2<UUID, GridNearSingleGetRequest>)this::processNearSingleGetRequest); ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridNearAtomicAbstractUpdateRequest.class, new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() { @Override public void apply( @@ -298,6 +280,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() { @Override public void apply( @@ -317,6 +300,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridDhtAtomicAbstractUpdateRequest.class, new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() { @Override public void apply( @@ -336,6 +320,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() { @Override public void apply( @@ -355,6 +340,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridDhtAtomicDeferredUpdateResponse.class, new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() { @Override public void apply( @@ -374,6 +360,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() { @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) { @@ -388,6 +375,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridNearAtomicCheckUpdateRequest.class, new CI2<UUID, GridNearAtomicCheckUpdateRequest>() { @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) { @@ -402,6 +390,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridDhtForceKeysRequest.class, new MessageHandler<GridDhtForceKeysRequest>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { @@ -411,6 +400,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridDhtForceKeysResponse.class, new MessageHandler<GridDhtForceKeysResponse>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { @@ -421,34 +411,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (near == null) { ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridNearGetResponse.class, - new CI2<UUID, GridNearGetResponse>() { - @Override public void apply( - UUID nodeId, - GridNearGetResponse res - ) { - processNearGetResponse( - nodeId, - res); - } - }); + (CI2<UUID, GridNearGetResponse>)this::processNearGetResponse); ctx.io().addCacheHandler( ctx.cacheId(), + ctx.startTopologyVersion(), GridNearSingleGetResponse.class, - new CI2<UUID, GridNearSingleGetResponse>() { - @Override public void apply( - UUID nodeId, - GridNearSingleGetResponse res - ) { - processNearSingleGetResponse( - nodeId, - res); - } - }); + (CI2<UUID, GridNearSingleGetResponse>)this::processNearSingleGetResponse); } } + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + assert metrics != null : "Cache metrics instance isn't initialized."; + + if (ctx.dht().near() != null) + metrics.delegate(ctx.dht().near().metrics0()); + } + /** * @param near Near cache. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index f6601a79e8e..4da88bef33a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -120,26 +120,32 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - super.start(); - - ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { - @Override public void apply(UUID nodeId, GridNearGetResponse res) { - processNearGetResponse(nodeId, res); - } - }); - - ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { - @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) { - processNearSingleGetResponse(nodeId, res); - } - }); + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + assert !ctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + name() + ']'; + + ctx.io().addCacheHandler( + ctx.cacheId(), + ctx.startTopologyVersion(), + GridNearGetResponse.class, + (CI2<UUID, GridNearGetResponse>)this::processNearGetResponse); + + ctx.io().addCacheHandler( + ctx.cacheId(), + ctx.startTopologyVersion(), + GridNearSingleGetResponse.class, + (CI2<UUID, GridNearSingleGetResponse>)this::processNearSingleGetResponse); + + ctx.io().addCacheHandler( + ctx.cacheId(), + ctx.startTopologyVersion(), + GridNearLockResponse.class, + (CI2<UUID, GridNearLockResponse>)this::processNearLockResponse); + } - ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { - @Override public void apply(UUID nodeId, GridNearLockResponse res) { - processNearLockResponse(nodeId, res); - } - }); + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 6068f0fe6e0..2a3cf58b139 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -104,14 +104,16 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - super.start(); + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); - ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { - @Override public void apply(UUID nodeId, GridNearGetResponse res) { - processGetResponse(nodeId, res); - } - }); + assert !ctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + name() + ']'; + + ctx.io().addCacheHandler( + ctx.cacheId(), + ctx.startTopologyVersion(), + GridNearGetResponse.class, + (CI2<UUID, GridNearGetResponse>)this::processGetResponse); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 5887b8d95d8..26789218f92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -86,20 +86,22 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> } /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - super.start(); - - ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { - @Override public void apply(UUID nodeId, GridNearGetResponse res) { - processGetResponse(nodeId, res); - } - }); - - ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { - @Override public void apply(UUID nodeId, GridNearLockResponse res) { - processLockResponse(nodeId, res); - } - }); + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + assert !ctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + name() + ']'; + + ctx.io().addCacheHandler( + ctx.cacheId(), + ctx.startTopologyVersion(), + GridNearGetResponse.class, + (CI2<UUID, GridNearGetResponse>)this::processGetResponse); + + ctx.io().addCacheHandler( + ctx.cacheId(), + ctx.startTopologyVersion(), + GridNearLockResponse.class, + (CI2<UUID, GridNearLockResponse>)this::processLockResponse); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 0c6dbe3e1b9..f7b3f7a8b8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -54,6 +54,7 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteReducer; import org.jetbrains.annotations.Nullable; + import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; @@ -98,26 +99,27 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage }; /** Event listener. */ - private GridLocalEventListener lsnr; + private GridLocalEventListener lsnr = new GridLocalEventListener() { + /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - /** {@inheritDoc} */ - @Override public void start0() throws IgniteCheckedException { - super.start0(); + for (GridCacheDistributedQueryFuture fut : futs.values()) + fut.onNodeLeft(discoEvt.eventNode().id()); + } + }; - cctx.io().addCacheHandler(cctx.cacheId(), GridCacheQueryRequest.class, new CI2<UUID, GridCacheQueryRequest>() { - @Override public void apply(UUID nodeId, GridCacheQueryRequest req) { - processQueryRequest(nodeId, req); - } - }); + /** {@inheritDoc} */ + @Override public void onKernalStart0() throws IgniteCheckedException { + super.onKernalStart0(); - lsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + assert !cctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + cctx.name() + ']'; - for (GridCacheDistributedQueryFuture fut : futs.values()) - fut.onNodeLeft(discoEvt.eventNode().id()); - } - }; + cctx.io().addCacheHandler( + cctx.cacheId(), + cctx.startTopologyVersion(), + GridCacheQueryRequest.class, + (CI2<UUID, GridCacheQueryRequest>)this::processQueryRequest); cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 0912db3c876..55e86fe9d24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -195,7 +195,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac qry.includeMetadata(), qry.keepBinary(), qry.taskHash(), - cctx.startTopologyVersion(), + cctx.affinity().affinityTopologyVersion(), qry.mvccSnapshot(), // Force deployment anyway if scan query is used. cctx.deploymentEnabled() || deployFilterOrTransformer, @@ -220,7 +220,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac false, qry.keepBinary(), qry.taskHash(), - cctx.startTopologyVersion(), + cctx.affinity().affinityTopologyVersion(), // Force deployment anyway if scan query is used. cctx.deploymentEnabled() || (qry.scanFilter() != null && cctx.gridDeploy().enabled()), qry.isDataPageScanEnabled()); @@ -236,7 +236,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac return new GridCacheQueryRequest(cctx.cacheId(), reqId, fieldsQry, - cctx.startTopologyVersion(), + cctx.affinity().affinityTopologyVersion(), cctx.deploymentEnabled()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 6d5253c2e90..8fbd4c979ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -158,11 +158,22 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { + super.start0(); + // Append cache name to the topic. topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void onKernalStart0() throws IgniteCheckedException { + assert !cctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + cctx.name() + ']'; if (cctx.affinityNode()) { - cctx.io().addCacheHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class, + cctx.io().addCacheHandler( + cctx.cacheId(), + cctx.startTopologyVersion(), + CacheContinuousQueryBatchAck.class, new CI2<UUID, CacheContinuousQueryBatchAck>() { @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) { CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId()); @@ -174,11 +185,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K cancelableTask = cctx.time().schedule(new BackupCleaner(lsnrs, cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ); } - } - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected void onKernalStart0() throws IgniteCheckedException { Iterable<CacheEntryListenerConfiguration> cfgs = cctx.config().getCacheEntryListenerConfigurations(); if (cfgs != null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 3780d842fc0..14647675e3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; @@ -951,23 +952,23 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** - * Unmarshalls entry. + * Prepares this entry to unmarshall. In particular, this method initialize a cache context. * * @param ctx Cache context. + * @param topVer Topology version that is used to validate a cache context. + * If this parameter is {@code null} then validation will be skipped. * @param near Near flag. - * @param clsLdr Class loader. * @throws IgniteCheckedException If un-marshalling failed. */ - public void unmarshal( + public void prepareUnmarshal( GridCacheSharedContext<?, ?> ctx, - boolean near, - ClassLoader clsLdr + AffinityTopologyVersion topVer, + boolean near ) throws IgniteCheckedException { - if (this.ctx == null) { GridCacheContext<?, ?> cacheCtx = ctx.cacheContext(cacheId); - if (cacheCtx == null) + if (cacheCtx == null || (topVer != null && topVer.before(cacheCtx.startTopologyVersion()))) throw new CacheInvalidStateException( "Failed to perform cache operation (cache is stopped), cacheId=" + cacheId); @@ -978,6 +979,24 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { this.ctx = cacheCtx; } + } + + /** + * Unmarshalls entry. + * + * @param ctx Cache context. + * @param near Near flag. + * @param clsLdr Class loader. + * @throws IgniteCheckedException If un-marshalling failed. + */ + public void unmarshal( + GridCacheSharedContext<?, ?> ctx, + boolean near, + ClassLoader clsLdr + ) throws IgniteCheckedException { + + if (this.ctx == null) + prepareUnmarshal(ctx, null, near); CacheObjectValueContext coctx = this.ctx.cacheObjectContext(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 506b589b8e4..4f8ef80316e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -108,6 +108,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL; +import static org.apache.ignite.internal.processors.cache.GridCacheIoManager.COMMON_MESSAGE_HANDLER_ID; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; @@ -134,9 +135,6 @@ import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; * Isolated logic to process cache messages. */ public class IgniteTxHandler { - /** */ - private static final int TX_MSG_HND_ID = 0; - /** Logger. */ private IgniteLogger log; @@ -222,80 +220,68 @@ public class IgniteTxHandler { txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY); txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg); } }); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg); } }); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg); } }); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg); } }); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg); } }); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg); } }); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg); } }); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg); } }); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg); } }); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridCacheTxRecoveryRequest.class, - new CI2<UUID, GridCacheTxRecoveryRequest>() { - @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) { - processCheckPreparedTxRequest(nodeId, req); - } - }); + ctx.io().addCacheHandler(GridCacheTxRecoveryRequest.class, + (CI2<UUID, GridCacheTxRecoveryRequest>)this::processCheckPreparedTxRequest); - ctx.io().addCacheHandler(TX_MSG_HND_ID, GridCacheTxRecoveryResponse.class, - new CI2<UUID, GridCacheTxRecoveryResponse>() { - @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) { - processCheckPreparedTxResponse(nodeId, res); - } - }); + ctx.io().addCacheHandler(GridCacheTxRecoveryResponse.class, + (CI2<UUID, GridCacheTxRecoveryResponse>)this::processCheckPreparedTxResponse); - ctx.io().addCacheHandler(TX_MSG_HND_ID, IncrementalSnapshotAwareMessage.class, - new CI2<UUID, IncrementalSnapshotAwareMessage>() { - @Override public void apply(UUID nodeId, IncrementalSnapshotAwareMessage msg) { - processIncrementalSnapshotAwareMessage(nodeId, msg); - } - }); + ctx.io().addCacheHandler(IncrementalSnapshotAwareMessage.class, + (CI2<UUID, IncrementalSnapshotAwareMessage>)this::processIncrementalSnapshotAwareMessage); } /** */ @@ -310,7 +296,7 @@ public class IgniteTxHandler { GridCacheMessage cacheMsg = msg.payload(); ctx.io() - .cacheHandler(TX_MSG_HND_ID, cacheMsg.getClass()) + .cacheHandler(COMMON_MESSAGE_HANDLER_ID, cacheMsg.getClass()) .apply(nodeId, cacheMsg); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 14ac86ef782..da485b02294 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -892,7 +892,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I sharedCtx = cacheProc.context(); sharedCtx.io().addCacheHandler( - 0, GridChangeGlobalStateMessageResponse.class, + GridChangeGlobalStateMessageResponse.class, this::processChangeGlobalStateResponse ); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheRecreateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheRecreateTest.java new file mode 100755 index 00000000000..aca4448f420 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheRecreateTest.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import javax.cache.CacheException; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.junit.Test; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * Tests handling of ppending cache messages/operations when the required cache was re created. + * */ +public class IgniteCacheRecreateTest extends GridCommonAbstractTest { + /** Cache name to be used in tests. */ + private static final String CACHE_NAME = "test-recreate-cache"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(1); + + startClientGrid(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + grid(0).destroyCache(CACHE_NAME); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testAtomicPutAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + ATOMIC, + GridNearAtomicAbstractUpdateRequest.class, + (cache, keys) -> cache.put(keys.get(0), 42)); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testAtomicGetAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + ATOMIC, + GridNearSingleGetRequest.class, + (cache, keys) -> cache.get(keys.get(0))); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testAtomicPutAllAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + ATOMIC, + GridNearAtomicFullUpdateRequest.class, + (cache, keys) -> { + Map<Integer, Integer> vals = new TreeMap<>(); + vals.put(keys.get(0), 24); + vals.put(keys.get(1), 42); + + cache.putAll(vals); + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testAtomicGetAllAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + ATOMIC, + GridNearGetRequest.class, + (cache, keys) -> { + Set<Integer> vals = new TreeSet<>(); + vals.add(keys.get(0)); + vals.add(keys.get(1)); + + cache.getAll(vals); + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testImplicitInvokeAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + ATOMIC, + GridNearAtomicAbstractUpdateRequest.class, + (cache, keys) -> { + cache.invoke(keys.get(0), new CacheEntryProcessor<Integer, Integer, Integer>() { + @Override public Integer process( + MutableEntry<Integer, Integer> entry, + Object... arguments + ) throws EntryProcessorException { + if (entry.exists()) + return entry.getValue(); + + entry.setValue(123); + + return entry.getValue(); + } + }); + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testImplicitOptimisticTxPutAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + TRANSACTIONAL, + GridNearTxPrepareRequest.class, + (cache, keys) -> cache.put(keys.get(0), 42)); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testImplicitOptimisticTxGetAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + TRANSACTIONAL, + GridNearSingleGetRequest.class, + (cache, keys) -> cache.get(keys.get(0))); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testImplicitOptimisticTxPutAllAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + TRANSACTIONAL, + GridNearTxPrepareRequest.class, + (cache, keys) -> { + Map<Integer, Integer> vals = new TreeMap<>(); + vals.put(keys.get(0), 24); + vals.put(keys.get(1), 42); + + cache.putAll(vals); + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testImplicitTxInvokeAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + TRANSACTIONAL, + GridNearTxPrepareRequest.class, + (cache, keys) -> { + cache.invoke(keys.get(0), new CacheEntryProcessor<Integer, Integer, Integer>() { + @Override public Integer process( + MutableEntry<Integer, Integer> entry, + Object... arguments + ) throws EntryProcessorException { + if (entry.exists()) + return entry.getValue(); + + entry.setValue(123); + + return entry.getValue(); + } + }); + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPessimisticTxPutAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + TRANSACTIONAL, + GridNearLockRequest.class, + (cache, keys) -> { + try (Transaction tx = grid(1).transactions().txStart(PESSIMISTIC, SERIALIZABLE)) { + cache.put(keys.get(0), 42); + + tx.commit(); + } + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPessimisticTxPutAllAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + TRANSACTIONAL, + GridNearLockRequest.class, + (cache, keys) -> { + try (Transaction tx = grid(1).transactions().txStart(PESSIMISTIC, SERIALIZABLE)) { + Map<Integer, Integer> vals = new TreeMap<>(); + vals.put(keys.get(0), 24); + vals.put(keys.get(1), 42); + + cache.putAll(vals); + + tx.commit(); + } + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPessimisticTxGetAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + TRANSACTIONAL, + GridNearLockRequest.class, + (cache, keys) -> { + try (Transaction tx = grid(1).transactions().txStart(PESSIMISTIC, SERIALIZABLE)) { + cache.get(keys.get(0)); + + tx.commit(); + } + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testAtomicScanAndCacheRecreate() throws Exception { + testCacheOperationAndCacheRecreate( + ATOMIC, + GridCacheQueryRequest.class, + (cache, keys) -> { + ScanQuery<Integer, Integer> scanQuery = new ScanQuery<>(); + scanQuery.setPageSize(1); + + try (QueryCursor qry = cache.query(scanQuery)) { + for (Object o : qry.getAll()) { + IgniteBiTuple<Integer, Integer> tuple = (IgniteBiTuple<Integer, Integer>)o; + + throw new RuntimeException("Succesfully read unexpected value [k=" + tuple.getKey() + ", v=" + tuple.getValue()); + } + } + }); + } + + /** + * + * @param mode Cache atomicity mode. + * @param clazz Cache message type to be blocked before re-creating a cache. + * @param cacheOp Cache operation. + * @throws Exception If failed. + */ + private void testCacheOperationAndCacheRecreate( + CacheAtomicityMode mode, + Class<? extends GridCacheIdMessage> clazz, + IgniteBiInClosure<IgniteCache<Integer, Integer>, List<Integer>> cacheOp + ) throws Exception { + IgniteEx g0 = grid(0); + IgniteEx client = grid(1); + + // Initial loading. + IgniteCache<Integer, Integer> clientCache = createCache(client, mode); + for (int i = 0; i < 100; i++) + clientCache.put(i, i); + + TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(client); + TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(g0); + + // Block cache operation. + clientSpi.blockMessages((node, msg) -> { + if (clazz.isAssignableFrom(msg.getClass())) { + GridCacheIdMessage msg0 = (GridCacheIdMessage)msg; + + if (msg0.cacheId() == 0 || msg0.cacheId() == CU.cacheId(CACHE_NAME)) + return true; + } + + return false; + }); + + // Block notifying the client node about upcoming changes. + crdSpi.blockMessages((node, msg) -> { + if (msg instanceof GridDhtPartitionsFullMessage) + return true; + + return false; + }); + + List<Integer> primaryKeys = primaryKeys(g0.cache(CACHE_NAME), 2, 1); + + // Initiate cache operation. + IgniteInternalFuture<?> updFut = runAsync(() -> cacheOp.apply(clientCache, primaryKeys)); + + // Wait for operation is initiated on the client node. + clientSpi.waitForBlocked(); + + // Destoy the existing cache and re-create it once again in order to deliver the blocked cache message to the server node + // when the reqired cache is destroyed and new cache handlers are registered. + g0.destroyCache(clientCache.getName()); + + // Create a new cache with the same name. + IgniteCache newCache = createCache(g0, mode); + + // Upload new values. + for (int i = 0; i < 100; i++) + newCache.put(i, i + 1_000); + + // Unblock cache operation. + clientSpi.stopBlock(); + + try { + updFut.get(10, TimeUnit.SECONDS); + + fail("Exception was not thrown."); + } + catch (Exception e) { + assertTrue("Unexpected exception [err=" + e + ']', X.hasCause(e, CacheException.class)); + } + finally { + crdSpi.stopBlock(); + } + } + + /** + * Creates a cache using the given node as initiator node and the given atomicity mode. + * + * @param ignite Node to be used to initiate creating a new cache. + * @param mode Cache atomicity mode. + * @return Ignite cache. + */ + private IgniteCache<Integer, Integer> createCache(IgniteEx ignite, CacheAtomicityMode mode) { + CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(CACHE_NAME); + + cfg.setBackups(1) + .setReadFromBackup(false) + .setAtomicityMode(mode) + .setAffinity(new RendezvousAffinityFunction(false, 32)); + + return ignite.getOrCreateCache(cfg); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java index b6c14912874..09dc5e841b5 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java @@ -121,14 +121,16 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { IgniteEx ignite1 = grid(1); ignite0.context().cache().context().io().addCacheHandler( - 0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() { + TestBadMessage.class, + new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { throw new RuntimeException("Test bad message exception"); } }); ignite1.context().cache().context().io().addCacheHandler( - 0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() { + TestBadMessage.class, + new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { throw new RuntimeException("Test bad message exception"); } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java index 354c26c048d..2a9c2c6208f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCachePutAllRestartTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheGetRestartTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheRecreateTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -41,6 +42,8 @@ import org.junit.runners.Suite; IgniteBinaryMetadataUpdateNodeRestartTest.class, + IgniteCacheRecreateTest.class, + IgniteCacheGetRestartTest.class }) public class IgniteCacheRestartTestSuite2 {