This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 910b7d58837 IGNITE-19115 Fixed handling cache messages for recreated
cache (#10618)
910b7d58837 is described below
commit 910b7d5883711eb31224d03860f92e18470880b9
Author: Slava Koptilin <[email protected]>
AuthorDate: Fri Apr 14 11:29:55 2023 +0300
IGNITE-19115 Fixed handling cache messages for recreated cache (#10618)
---
.../processors/cache/GridCacheAdapter.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 125 ++++--
.../processors/cache/GridCacheMessage.java | 5 +-
.../cache/GridCachePartitionExchangeManager.java | 6 +-
.../processors/cache/GridCacheProcessor.java | 6 +
.../cache/distributed/dht/GridDhtCache.java | 2 -
.../cache/distributed/dht/GridDhtCacheAdapter.java | 8 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 117 ++----
.../distributed/dht/atomic/GridDhtAtomicCache.java | 74 ++--
.../dht/colocated/GridDhtColocatedCache.java | 44 ++-
.../distributed/near/GridNearAtomicCache.java | 16 +-
.../near/GridNearTransactionalCache.java | 30 +-
.../query/GridCacheDistributedQueryManager.java | 34 +-
.../cache/query/GridCacheQueryRequest.java | 6 +-
.../continuous/CacheContinuousQueryManager.java | 17 +-
.../cache/transactions/IgniteTxEntry.java | 33 +-
.../cache/transactions/IgniteTxHandler.java | 48 +--
.../cluster/GridClusterStateProcessor.java | 2 +-
.../distributed/dht/IgniteCacheRecreateTest.java | 421 +++++++++++++++++++++
.../communication/GridCacheMessageSelfTest.java | 6 +-
.../testsuites/IgniteCacheRestartTestSuite2.java | 3 +
21 files changed, 742 insertions(+), 263 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9a5e378ced9..55cd09b4e94 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -646,7 +646,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
*
* @throws IgniteCheckedException If callback failed.
*/
- protected void onKernalStart() throws IgniteCheckedException {
+ public void onKernalStart() throws IgniteCheckedException {
// No-op.
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index cb7bef39cf8..c9457994b16 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -102,6 +103,7 @@ import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.util.IgniteUtils.nl;
/**
@@ -117,6 +119,9 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
/** */
private static final int MAX_STORED_PENDING_MESSAGES = 100;
+ /** Common message handler identifier that does not correspond to any
particular cache. */
+ public static final int COMMON_MESSAGE_HANDLER_ID = 0;
+
/** Delay in milliseconds between retries. */
private long retryDelay;
@@ -334,19 +339,27 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
try {
int msgIdx = cacheMsg.lookupIndex();
+ AffinityTopologyVersion msgTopVer = cacheMsg.topologyVersion();
+
IgniteBiInClosure<UUID, GridCacheMessage> c = null;
if (msgIdx >= 0) {
- Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 =
msgHandlers.idxClsHandlers;
+ Map<Integer, IndexedClassHandler> idxClsHandlers0 =
msgHandlers.idxClsHandlers;
- IgniteBiInClosure[] cacheClsHandlers =
idxClsHandlers0.get(cacheMsg.handlerId());
+ IndexedClassHandler cacheClsHandlers =
idxClsHandlers0.get(cacheMsg.handlerId());
- if (cacheClsHandlers != null)
- c = cacheClsHandlers[msgIdx];
+ if (cacheClsHandlers != null &&
+ (NONE.equals(msgTopVer) ||
!msgTopVer.before(cacheClsHandlers.startTopVer)))
+ c = cacheClsHandlers.hndls[msgIdx];
}
- if (c == null)
- c = msgHandlers.clsHandlers.get(new
ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass()));
+ if (c == null) {
+ RegularClassHandler rHnd = msgHandlers.clsHandlers.get(
+ new ListenerKey(cacheMsg.handlerId(),
cacheMsg.getClass()));
+
+ if (rHnd != null && (NONE.equals(msgTopVer) ||
!msgTopVer.before(rHnd.startTopVer)))
+ c = rHnd.hnd;
+ }
if (c == null) {
if (processMissedHandler(nodeId, cacheMsg))
@@ -365,10 +378,10 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
msg0.append(nl()).append("Registered listeners:");
- Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 =
msgHandlers.idxClsHandlers;
+ Map<Integer, IndexedClassHandler> idxClsHandlers0 =
msgHandlers.idxClsHandlers;
- for (Map.Entry<Integer, IgniteBiInClosure[]> e :
idxClsHandlers0.entrySet())
-
msg0.append(nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
+ for (Map.Entry<Integer, IndexedClassHandler> e :
idxClsHandlers0.entrySet())
+
msg0.append(nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue().hndls));
if (cctx.kernalContext().isStopping()) {
if (log.isDebugEnabled())
@@ -1402,12 +1415,30 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
*/
public <Msg extends GridCacheMessage> void addCacheHandler(
int hndId,
+ AffinityTopologyVersion startTopVer,
Class<Msg> type,
IgniteBiInClosure<UUID, ? super Msg> c
) {
assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type;
- addHandler(hndId, type, c, cacheHandlers);
+ addHandler(hndId, startTopVer, type, c, cacheHandlers);
+ }
+
+ /**
+ * Registers a new message handler for the given message {@code type}.
+ * This method is equivalent to {@link #addCacheHandler(int,
AffinityTopologyVersion, Class, IgniteBiInClosure)} where is
+ * the handler id equals to {@link #COMMON_MESSAGE_HANDLER_ID} and
deployment id is {@code null}.
+ *
+ * @param type Type of message.
+ * @param c Handler.
+ */
+ public <Msg extends GridCacheMessage> void addCacheHandler(
+ Class<Msg> type,
+ IgniteBiInClosure<UUID, ? super Msg> c
+ ) {
+ assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type;
+
+ addHandler(COMMON_MESSAGE_HANDLER_ID, NONE, type, c, cacheHandlers);
}
/**
@@ -1421,7 +1452,9 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
int hndId,
Class<? extends GridCacheMessage> msgCls
) {
- return cacheHandlers.clsHandlers.get(new ListenerKey(hndId, msgCls));
+ RegularClassHandler clsHnd = cacheHandlers.clsHandlers.get(new
ListenerKey(hndId, msgCls));
+
+ return (clsHnd != null) ? clsHnd.hnd : null;
}
/**
@@ -1436,7 +1469,7 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
) {
assert !type.isAssignableFrom(GridCacheIdMessage.class) : type;
- addHandler(hndId, type, c, grpHandlers);
+ addHandler(hndId, NONE, type, c, grpHandlers);
}
/**
@@ -1447,6 +1480,7 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
*/
private <Msg extends GridCacheMessage> void addHandler(
int hndId,
+ AffinityTopologyVersion startTopVer,
Class<Msg> type,
IgniteBiInClosure<UUID, ? super Msg> c,
MessageHandlers msgHandlers
@@ -1454,16 +1488,16 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
int msgIdx = messageIndex(type);
if (msgIdx != -1) {
- Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 =
msgHandlers.idxClsHandlers;
+ Map<Integer, IndexedClassHandler> idxClsHandlers0 =
msgHandlers.idxClsHandlers;
- IgniteBiInClosure[] cacheClsHandlers =
idxClsHandlers0.compute(hndId, (key, clsHandlers) -> {
+ IndexedClassHandler cacheClsHandlers =
idxClsHandlers0.compute(hndId, (key, clsHandlers) -> {
if (clsHandlers == null)
- clsHandlers = new
IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
+ clsHandlers = new IndexedClassHandler(startTopVer);
- if (clsHandlers[msgIdx] != null)
+ if (clsHandlers.hndls[msgIdx] != null ||
!Objects.equals(clsHandlers.startTopVer, startTopVer))
return null;
- clsHandlers[msgIdx] = c;
+ clsHandlers.hndls[msgIdx] = c;
return clsHandlers;
});
@@ -1476,11 +1510,12 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
}
else {
ListenerKey key = new ListenerKey(hndId, type);
+ RegularClassHandler regHnd = new RegularClassHandler(startTopVer,
(IgniteBiInClosure<UUID, GridCacheMessage>)c);
- if (msgHandlers.clsHandlers.putIfAbsent(key,
- (IgniteBiInClosure<UUID, GridCacheMessage>)c) != null)
+ if (msgHandlers.clsHandlers.putIfAbsent(key, regHnd) != null) {
assert false : "Handler for class already registered [hndId="
+ hndId + ", cls=" + type +
", old=" + msgHandlers.clsHandlers.get(key) + ", new=" + c
+ ']';
+ }
}
IgniteLogger log0 = log;
@@ -1694,16 +1729,62 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
X.println(">>> cacheGrpOrderedHandlersSize: " +
grpHandlers.orderedHandlers.size());
}
+ /** */
+ abstract static class MessageHandler {
+ /** Start topology version. */
+ final AffinityTopologyVersion startTopVer;
+
+ /**
+ * Creates a new message handler descriptor.
+ *
+ * @param startTopVer Start affinity topology version.
+ */
+ MessageHandler(AffinityTopologyVersion startTopVer) {
+ this.startTopVer = startTopVer;
+ }
+ }
+
+ /** */
+ static class IndexedClassHandler extends MessageHandler {
+ /** Actual handlers. */
+ final IgniteBiInClosure[] hndls = new
IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
+
+ /**
+ * Creates a new message handler descriptor.
+ *
+ * @param startTopVer Start affinity topology version.
+ */
+ IndexedClassHandler(AffinityTopologyVersion startTopVer) {
+ super(startTopVer);
+ }
+ }
+
+ /** */
+ static class RegularClassHandler extends MessageHandler {
+ /** Actual handler. */
+ IgniteBiInClosure<UUID, GridCacheMessage> hnd;
+
+ /**
+ * Creates a new message handler descriptor.
+ *
+ * @param startTopVer Start affinity topology version.
+ */
+ RegularClassHandler(AffinityTopologyVersion startTopVer,
IgniteBiInClosure<UUID, GridCacheMessage> hnd) {
+ super(startTopVer);
+
+ this.hnd = hnd;
+ }
+ }
+
/**
*
*/
static class MessageHandlers {
/** Indexed class handlers. */
- volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new
ConcurrentHashMap<>();
+ volatile Map<Integer, IndexedClassHandler> idxClsHandlers = new
ConcurrentHashMap<>();
/** Handler registry. */
- ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
- clsHandlers = new ConcurrentHashMap<>();
+ ConcurrentMap<ListenerKey, RegularClassHandler> clsHandlers = new
ConcurrentHashMap<>();
/** Ordered handler registry. */
ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends
GridCacheMessage>> orderedHandlers =
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index a9685a1eb8d..c655dc487ef 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -442,8 +442,11 @@ public abstract class GridCacheMessage implements Message {
assert ctx != null;
if (txEntries != null) {
- for (IgniteTxEntry e : txEntries)
+ for (IgniteTxEntry e : txEntries) {
+ e.prepareUnmarshal(ctx, topologyVersion(), near);
+
e.unmarshal(ctx, near, ldr);
+ }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4115e56fa87..3957151db29 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -413,7 +413,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
cctx.gridEvents().addDiscoveryEventListener(discoLsnr,
EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
EVT_DISCOVERY_CUSTOM_EVT);
- cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class,
+ cctx.io().addCacheHandler(GridDhtPartitionsSingleMessage.class,
new MessageHandler<GridDhtPartitionsSingleMessage>() {
@Override public void onMessage(final ClusterNode node, final
GridDhtPartitionsSingleMessage msg) {
GridDhtPartitionExchangeId exchangeId = msg.exchangeId();
@@ -450,7 +450,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
}
});
- cctx.io().addCacheHandler(0, GridDhtPartitionsFullMessage.class,
+ cctx.io().addCacheHandler(GridDhtPartitionsFullMessage.class,
new MessageHandler<GridDhtPartitionsFullMessage>() {
@Override public void onMessage(ClusterNode node,
GridDhtPartitionsFullMessage msg) {
if (msg.exchangeId() == null) {
@@ -470,7 +470,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
}
});
- cctx.io().addCacheHandler(0, GridDhtPartitionsSingleRequest.class,
+ cctx.io().addCacheHandler(GridDhtPartitionsSingleRequest.class,
new MessageHandler<GridDhtPartitionsSingleRequest>() {
@Override public void onMessage(ClusterNode node,
GridDhtPartitionsSingleRequest msg) {
processSinglePartitionRequest(node, msg);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d8bbc50a7f7..3ad724da2c1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2074,6 +2074,12 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
cacheContext.finishRecovery(cacheStartVer, updatedDescriptor);
+ if (isNearEnabled(cacheContext)) {
+ GridDhtCacheAdapter dht = cacheContext.near().dht();
+
+ dht.context().finishRecovery(cacheStartVer, updatedDescriptor);
+ }
+
if (cacheContext.config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT
&& groupContext.affinityNode())
sharedCtx.coordinators().ensureStarted();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
index 7889163f177..455224bdd51 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
@@ -76,8 +76,6 @@ public class GridDhtCache<K, V> extends
GridDhtTransactionalCacheAdapter<K, V> {
metrics.delegate(ctx.dht().near().metrics0());
ctx.dr().resetMetrics();
-
- super.start();
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 557153c0495..a785bb6bc80 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -387,8 +387,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends
GridDistributedCacheAdap
}
/** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- ctx.io().addCacheHandler(ctx.cacheId(),
GridCacheTtlUpdateRequest.class,
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
+
+ assert !ctx.isRecoveryMode() : "Registering message handlers in
recovery mode [cacheName=" + name() + ']';
+
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridCacheTtlUpdateRequest.class,
(CI2<UUID,
GridCacheTtlUpdateRequest>)this::processTtlUpdateRequest);
ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT,
EVT_NODE_FAILED);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index fcd7ab1d3be..5c2e241e247 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -141,125 +141,80 @@ public abstract class
GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
/** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- super.start();
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
- ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetRequest.class, new
CI2<UUID, GridNearGetRequest>() {
- @Override public void apply(UUID nodeId, GridNearGetRequest req) {
- processNearGetRequest(nodeId, req);
- }
- });
+ assert !ctx.isRecoveryMode() : "Registering message handlers in
recovery mode [cacheName=" + name() + ']';
- ctx.io().addCacheHandler(ctx.cacheId(),
GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
- @Override public void apply(UUID nodeId, GridNearSingleGetRequest
req) {
- processNearSingleGetRequest(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridNearGetRequest.class,
+ (CI2<UUID, GridNearGetRequest>)this::processNearGetRequest);
- ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockRequest.class, new
CI2<UUID, GridNearLockRequest>() {
- @Override public void apply(UUID nodeId, GridNearLockRequest req) {
- processNearLockRequest(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridNearSingleGetRequest.class,
+ (CI2<UUID,
GridNearSingleGetRequest>)this::processNearSingleGetRequest);
- ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockRequest.class, new
CI2<UUID, GridDhtLockRequest>() {
- @Override public void apply(UUID nodeId, GridDhtLockRequest req) {
- processDhtLockRequest(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridNearLockRequest.class,
+ (CI2<UUID, GridNearLockRequest>)this::processNearLockRequest);
- ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockResponse.class, new
CI2<UUID, GridDhtLockResponse>() {
- @Override public void apply(UUID nodeId, GridDhtLockResponse req) {
- processDhtLockResponse(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridDhtLockRequest.class,
+ (CI2<UUID, GridDhtLockRequest>)this::processDhtLockRequest);
- ctx.io().addCacheHandler(ctx.cacheId(), GridNearUnlockRequest.class,
new CI2<UUID, GridNearUnlockRequest>() {
- @Override public void apply(UUID nodeId, GridNearUnlockRequest
req) {
- processNearUnlockRequest(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridDhtLockResponse.class,
+ (CI2<UUID, GridDhtLockResponse>)this::processDhtLockResponse);
- ctx.io().addCacheHandler(ctx.cacheId(), GridDhtUnlockRequest.class,
new CI2<UUID, GridDhtUnlockRequest>() {
- @Override public void apply(UUID nodeId, GridDhtUnlockRequest req)
{
- processDhtUnlockRequest(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridNearUnlockRequest.class,
+ (CI2<UUID, GridNearUnlockRequest>)this::processNearUnlockRequest);
- ctx.io().addCacheHandler(ctx.cacheId(),
GridNearTxQueryEnlistRequest.class, new CI2<UUID,
GridNearTxQueryEnlistRequest>() {
- @Override public void apply(UUID nodeId,
GridNearTxQueryEnlistRequest req) {
- processNearTxQueryEnlistRequest(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridDhtUnlockRequest.class,
+ (CI2<UUID, GridDhtUnlockRequest>)this::processDhtUnlockRequest);
- ctx.io().addCacheHandler(ctx.cacheId(),
GridNearTxQueryEnlistResponse.class, new CI2<UUID,
GridNearTxQueryEnlistResponse>() {
- @Override public void apply(UUID nodeId,
GridNearTxQueryEnlistResponse req) {
- processNearTxQueryEnlistResponse(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridNearTxQueryEnlistRequest.class,
+ (CI2<UUID,
GridNearTxQueryEnlistRequest>)this::processNearTxQueryEnlistRequest);
+
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridNearTxQueryEnlistResponse.class,
+ (CI2<UUID,
GridNearTxQueryEnlistResponse>)this::processNearTxQueryEnlistResponse);
- ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysRequest.class,
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridDhtForceKeysRequest.class,
new MessageHandler<GridDhtForceKeysRequest>() {
@Override public void onMessage(ClusterNode node,
GridDhtForceKeysRequest msg) {
processForceKeysRequest(node, msg);
}
});
- ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysResponse.class,
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridDhtForceKeysResponse.class,
new MessageHandler<GridDhtForceKeysResponse>() {
@Override public void onMessage(ClusterNode node,
GridDhtForceKeysResponse msg) {
processForceKeyResponse(node, msg);
}
});
- ctx.io().addCacheHandler(ctx.cacheId(),
GridNearTxQueryResultsEnlistRequest.class,
- new CI2<UUID, GridNearTxQueryResultsEnlistRequest>() {
- @Override public void apply(UUID nodeId,
GridNearTxQueryResultsEnlistRequest req) {
- processNearTxQueryResultsEnlistRequest(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridNearTxQueryResultsEnlistRequest.class,
+ (CI2<UUID,
GridNearTxQueryResultsEnlistRequest>)this::processNearTxQueryResultsEnlistRequest);
- ctx.io().addCacheHandler(ctx.cacheId(),
GridNearTxQueryResultsEnlistResponse.class,
- new CI2<UUID, GridNearTxQueryResultsEnlistResponse>() {
- @Override public void apply(UUID nodeId,
GridNearTxQueryResultsEnlistResponse req) {
- processNearTxQueryResultsEnlistResponse(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridNearTxQueryResultsEnlistResponse.class,
+ (CI2<UUID,
GridNearTxQueryResultsEnlistResponse>)this::processNearTxQueryResultsEnlistResponse);
- ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistRequest.class,
- new CI2<UUID, GridNearTxEnlistRequest>() {
- @Override public void apply(UUID nodeId,
GridNearTxEnlistRequest req) {
- processNearTxEnlistRequest(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridNearTxEnlistRequest.class,
+ (CI2<UUID,
GridNearTxEnlistRequest>)this::processNearTxEnlistRequest);
- ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistResponse.class,
- new CI2<UUID, GridNearTxEnlistResponse>() {
- @Override public void apply(UUID nodeId,
GridNearTxEnlistResponse msg) {
- processNearTxEnlistResponse(nodeId, msg);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridNearTxEnlistResponse.class,
+ (CI2<UUID,
GridNearTxEnlistResponse>)this::processNearTxEnlistResponse);
- ctx.io().addCacheHandler(ctx.cacheId(),
GridDhtTxQueryEnlistRequest.class,
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridDhtTxQueryEnlistRequest.class,
new CI2<UUID, GridDhtTxQueryEnlistRequest>() {
@Override public void apply(UUID nodeId,
GridDhtTxQueryEnlistRequest msg) {
processDhtTxQueryEnlistRequest(nodeId, msg, false);
}
});
- ctx.io().addCacheHandler(ctx.cacheId(),
GridDhtTxQueryFirstEnlistRequest.class,
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridDhtTxQueryFirstEnlistRequest.class,
new CI2<UUID, GridDhtTxQueryEnlistRequest>() {
@Override public void apply(UUID nodeId,
GridDhtTxQueryEnlistRequest msg) {
processDhtTxQueryEnlistRequest(nodeId, msg, true);
}
});
- ctx.io().addCacheHandler(ctx.cacheId(),
GridDhtTxQueryEnlistResponse.class,
- new CI2<UUID, GridDhtTxQueryEnlistResponse>() {
- @Override public void apply(UUID nodeId,
GridDhtTxQueryEnlistResponse msg) {
- processDhtTxQueryEnlistResponse(nodeId, msg);
- }
- });
+ ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(),
GridDhtTxQueryEnlistResponse.class,
+ (CI2<UUID,
GridDhtTxQueryEnlistResponse>)this::processDhtTxQueryEnlistResponse);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6b00da0ecfa..c3b2da0a9b0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -241,44 +241,26 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- super.start();
-
- assert metrics != null : "Cache metrics instance isn't initialized.";
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
- if (ctx.dht().near() != null)
- metrics.delegate(ctx.dht().near().metrics0());
+ assert !ctx.isRecoveryMode() : "Registering message handlers in
recovery mode [cacheName=" + name() + ']';
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridNearGetRequest.class,
- new CI2<UUID, GridNearGetRequest>() {
- @Override public void apply(
- UUID nodeId,
- GridNearGetRequest req
- ) {
- processNearGetRequest(
- nodeId,
- req);
- }
- });
+ (CI2<UUID, GridNearGetRequest>)this::processNearGetRequest);
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridNearSingleGetRequest.class,
- new CI2<UUID, GridNearSingleGetRequest>() {
- @Override public void apply(
- UUID nodeId,
- GridNearSingleGetRequest req
- ) {
- processNearSingleGetRequest(
- nodeId,
- req);
- }
- });
+ (CI2<UUID,
GridNearSingleGetRequest>)this::processNearSingleGetRequest);
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridNearAtomicAbstractUpdateRequest.class,
new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() {
@Override public void apply(
@@ -298,6 +280,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridNearAtomicUpdateResponse.class,
new CI2<UUID, GridNearAtomicUpdateResponse>() {
@Override public void apply(
@@ -317,6 +300,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridDhtAtomicAbstractUpdateRequest.class,
new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() {
@Override public void apply(
@@ -336,6 +320,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridDhtAtomicUpdateResponse.class,
new CI2<UUID, GridDhtAtomicUpdateResponse>() {
@Override public void apply(
@@ -355,6 +340,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridDhtAtomicDeferredUpdateResponse.class,
new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() {
@Override public void apply(
@@ -374,6 +360,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridDhtAtomicNearResponse.class,
new CI2<UUID, GridDhtAtomicNearResponse>() {
@Override public void apply(UUID uuid,
GridDhtAtomicNearResponse msg) {
@@ -388,6 +375,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridNearAtomicCheckUpdateRequest.class,
new CI2<UUID, GridNearAtomicCheckUpdateRequest>() {
@Override public void apply(UUID uuid,
GridNearAtomicCheckUpdateRequest msg) {
@@ -402,6 +390,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridDhtForceKeysRequest.class,
new MessageHandler<GridDhtForceKeysRequest>() {
@Override public void onMessage(ClusterNode node,
GridDhtForceKeysRequest msg) {
@@ -411,6 +400,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridDhtForceKeysResponse.class,
new MessageHandler<GridDhtForceKeysResponse>() {
@Override public void onMessage(ClusterNode node,
GridDhtForceKeysResponse msg) {
@@ -421,34 +411,26 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
if (near == null) {
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridNearGetResponse.class,
- new CI2<UUID, GridNearGetResponse>() {
- @Override public void apply(
- UUID nodeId,
- GridNearGetResponse res
- ) {
- processNearGetResponse(
- nodeId,
- res);
- }
- });
+ (CI2<UUID, GridNearGetResponse>)this::processNearGetResponse);
ctx.io().addCacheHandler(
ctx.cacheId(),
+ ctx.startTopologyVersion(),
GridNearSingleGetResponse.class,
- new CI2<UUID, GridNearSingleGetResponse>() {
- @Override public void apply(
- UUID nodeId,
- GridNearSingleGetResponse res
- ) {
- processNearSingleGetResponse(
- nodeId,
- res);
- }
- });
+ (CI2<UUID,
GridNearSingleGetResponse>)this::processNearSingleGetResponse);
}
}
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ assert metrics != null : "Cache metrics instance isn't initialized.";
+
+ if (ctx.dht().near() != null)
+ metrics.delegate(ctx.dht().near().metrics0());
+ }
+
/**
* @param near Near cache.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index f6601a79e8e..4da88bef33a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -120,26 +120,32 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
}
/** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- super.start();
-
- ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new
CI2<UUID, GridNearGetResponse>() {
- @Override public void apply(UUID nodeId, GridNearGetResponse res) {
- processNearGetResponse(nodeId, res);
- }
- });
-
- ctx.io().addCacheHandler(ctx.cacheId(),
GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
- @Override public void apply(UUID nodeId, GridNearSingleGetResponse
res) {
- processNearSingleGetResponse(nodeId, res);
- }
- });
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
+
+ assert !ctx.isRecoveryMode() : "Registering message handlers in
recovery mode [cacheName=" + name() + ']';
+
+ ctx.io().addCacheHandler(
+ ctx.cacheId(),
+ ctx.startTopologyVersion(),
+ GridNearGetResponse.class,
+ (CI2<UUID, GridNearGetResponse>)this::processNearGetResponse);
+
+ ctx.io().addCacheHandler(
+ ctx.cacheId(),
+ ctx.startTopologyVersion(),
+ GridNearSingleGetResponse.class,
+ (CI2<UUID,
GridNearSingleGetResponse>)this::processNearSingleGetResponse);
+
+ ctx.io().addCacheHandler(
+ ctx.cacheId(),
+ ctx.startTopologyVersion(),
+ GridNearLockResponse.class,
+ (CI2<UUID, GridNearLockResponse>)this::processNearLockResponse);
+ }
- ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class,
new CI2<UUID, GridNearLockResponse>() {
- @Override public void apply(UUID nodeId, GridNearLockResponse res)
{
- processNearLockResponse(nodeId, res);
- }
- });
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 6068f0fe6e0..2a3cf58b139 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -104,14 +104,16 @@ public class GridNearAtomicCache<K, V> extends
GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- super.start();
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
- ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new
CI2<UUID, GridNearGetResponse>() {
- @Override public void apply(UUID nodeId, GridNearGetResponse res) {
- processGetResponse(nodeId, res);
- }
- });
+ assert !ctx.isRecoveryMode() : "Registering message handlers in
recovery mode [cacheName=" + name() + ']';
+
+ ctx.io().addCacheHandler(
+ ctx.cacheId(),
+ ctx.startTopologyVersion(),
+ GridNearGetResponse.class,
+ (CI2<UUID, GridNearGetResponse>)this::processGetResponse);
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 5887b8d95d8..26789218f92 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -86,20 +86,22 @@ public class GridNearTransactionalCache<K, V> extends
GridNearCacheAdapter<K, V>
}
/** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- super.start();
-
- ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new
CI2<UUID, GridNearGetResponse>() {
- @Override public void apply(UUID nodeId, GridNearGetResponse res) {
- processGetResponse(nodeId, res);
- }
- });
-
- ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class,
new CI2<UUID, GridNearLockResponse>() {
- @Override public void apply(UUID nodeId, GridNearLockResponse res)
{
- processLockResponse(nodeId, res);
- }
- });
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
+
+ assert !ctx.isRecoveryMode() : "Registering message handlers in
recovery mode [cacheName=" + name() + ']';
+
+ ctx.io().addCacheHandler(
+ ctx.cacheId(),
+ ctx.startTopologyVersion(),
+ GridNearGetResponse.class,
+ (CI2<UUID, GridNearGetResponse>)this::processGetResponse);
+
+ ctx.io().addCacheHandler(
+ ctx.cacheId(),
+ ctx.startTopologyVersion(),
+ GridNearLockResponse.class,
+ (CI2<UUID, GridNearLockResponse>)this::processLockResponse);
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 0c6dbe3e1b9..f7b3f7a8b8f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -54,6 +54,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.Nullable;
+
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
@@ -98,26 +99,27 @@ public class GridCacheDistributedQueryManager<K, V> extends
GridCacheQueryManage
};
/** Event listener. */
- private GridLocalEventListener lsnr;
+ private GridLocalEventListener lsnr = new GridLocalEventListener() {
+ /** {@inheritDoc} */
+ @Override public void onEvent(Event evt) {
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
- /** {@inheritDoc} */
- @Override public void start0() throws IgniteCheckedException {
- super.start0();
+ for (GridCacheDistributedQueryFuture fut : futs.values())
+ fut.onNodeLeft(discoEvt.eventNode().id());
+ }
+ };
- cctx.io().addCacheHandler(cctx.cacheId(), GridCacheQueryRequest.class,
new CI2<UUID, GridCacheQueryRequest>() {
- @Override public void apply(UUID nodeId, GridCacheQueryRequest
req) {
- processQueryRequest(nodeId, req);
- }
- });
+ /** {@inheritDoc} */
+ @Override public void onKernalStart0() throws IgniteCheckedException {
+ super.onKernalStart0();
- lsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+ assert !cctx.isRecoveryMode() : "Registering message handlers in
recovery mode [cacheName=" + cctx.name() + ']';
- for (GridCacheDistributedQueryFuture fut : futs.values())
- fut.onNodeLeft(discoEvt.eventNode().id());
- }
- };
+ cctx.io().addCacheHandler(
+ cctx.cacheId(),
+ cctx.startTopologyVersion(),
+ GridCacheQueryRequest.class,
+ (CI2<UUID, GridCacheQueryRequest>)this::processQueryRequest);
cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 0912db3c876..55e86fe9d24 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -195,7 +195,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
qry.includeMetadata(),
qry.keepBinary(),
qry.taskHash(),
- cctx.startTopologyVersion(),
+ cctx.affinity().affinityTopologyVersion(),
qry.mvccSnapshot(),
// Force deployment anyway if scan query is used.
cctx.deploymentEnabled() || deployFilterOrTransformer,
@@ -220,7 +220,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
false,
qry.keepBinary(),
qry.taskHash(),
- cctx.startTopologyVersion(),
+ cctx.affinity().affinityTopologyVersion(),
// Force deployment anyway if scan query is used.
cctx.deploymentEnabled() || (qry.scanFilter() != null &&
cctx.gridDeploy().enabled()),
qry.isDataPageScanEnabled());
@@ -236,7 +236,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
return new GridCacheQueryRequest(cctx.cacheId(),
reqId,
fieldsQry,
- cctx.startTopologyVersion(),
+ cctx.affinity().affinityTopologyVersion(),
cctx.deploymentEnabled());
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 6d5253c2e90..8fbd4c979ce 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -158,11 +158,22 @@ public class CacheContinuousQueryManager<K, V> extends
GridCacheManagerAdapter<K
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
+ super.start0();
+
// Append cache name to the topic.
topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" +
cctx.name());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void onKernalStart0() throws IgniteCheckedException {
+ assert !cctx.isRecoveryMode() : "Registering message handlers in
recovery mode [cacheName=" + cctx.name() + ']';
if (cctx.affinityNode()) {
- cctx.io().addCacheHandler(cctx.cacheId(),
CacheContinuousQueryBatchAck.class,
+ cctx.io().addCacheHandler(
+ cctx.cacheId(),
+ cctx.startTopologyVersion(),
+ CacheContinuousQueryBatchAck.class,
new CI2<UUID, CacheContinuousQueryBatchAck>() {
@Override public void apply(UUID uuid,
CacheContinuousQueryBatchAck msg) {
CacheContinuousQueryListener lsnr =
lsnrs.get(msg.routineId());
@@ -174,11 +185,7 @@ public class CacheContinuousQueryManager<K, V> extends
GridCacheManagerAdapter<K
cancelableTask = cctx.time().schedule(new BackupCleaner(lsnrs,
cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
}
- }
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override protected void onKernalStart0() throws IgniteCheckedException {
Iterable<CacheEntryListenerConfiguration> cfgs =
cctx.config().getCacheEntryListenerConfigurations();
if (cfgs != null) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 3780d842fc0..14647675e3d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
@@ -951,23 +952,23 @@ public class IgniteTxEntry implements
GridPeerDeployAware, Message {
}
/**
- * Unmarshalls entry.
+ * Prepares this entry to unmarshall. In particular, this method
initialize a cache context.
*
* @param ctx Cache context.
+ * @param topVer Topology version that is used to validate a cache context.
+ * If this parameter is {@code null} then validation will be
skipped.
* @param near Near flag.
- * @param clsLdr Class loader.
* @throws IgniteCheckedException If un-marshalling failed.
*/
- public void unmarshal(
+ public void prepareUnmarshal(
GridCacheSharedContext<?, ?> ctx,
- boolean near,
- ClassLoader clsLdr
+ AffinityTopologyVersion topVer,
+ boolean near
) throws IgniteCheckedException {
-
if (this.ctx == null) {
GridCacheContext<?, ?> cacheCtx = ctx.cacheContext(cacheId);
- if (cacheCtx == null)
+ if (cacheCtx == null || (topVer != null &&
topVer.before(cacheCtx.startTopologyVersion())))
throw new CacheInvalidStateException(
"Failed to perform cache operation (cache is stopped),
cacheId=" + cacheId);
@@ -978,6 +979,24 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
this.ctx = cacheCtx;
}
+ }
+
+ /**
+ * Unmarshalls entry.
+ *
+ * @param ctx Cache context.
+ * @param near Near flag.
+ * @param clsLdr Class loader.
+ * @throws IgniteCheckedException If un-marshalling failed.
+ */
+ public void unmarshal(
+ GridCacheSharedContext<?, ?> ctx,
+ boolean near,
+ ClassLoader clsLdr
+ ) throws IgniteCheckedException {
+
+ if (this.ctx == null)
+ prepareUnmarshal(ctx, null, near);
CacheObjectValueContext coctx = this.ctx.cacheObjectContext();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 506b589b8e4..4f8ef80316e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -108,6 +108,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static
org.apache.ignite.internal.processors.cache.GridCacheIoManager.COMMON_MESSAGE_HANDLER_ID;
import static
org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
import static
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
@@ -134,9 +135,6 @@ import static
org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
* Isolated logic to process cache messages.
*/
public class IgniteTxHandler {
- /** */
- private static final int TX_MSG_HND_ID = 0;
-
/** Logger. */
private IgniteLogger log;
@@ -222,80 +220,68 @@ public class IgniteTxHandler {
txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY);
txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY);
- ctx.io().addCacheHandler(TX_MSG_HND_ID,
GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
+ ctx.io().addCacheHandler(GridNearTxPrepareRequest.class, new CI2<UUID,
GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processNearTxPrepareRequest(nodeId,
(GridNearTxPrepareRequest)msg);
}
});
- ctx.io().addCacheHandler(TX_MSG_HND_ID,
GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
+ ctx.io().addCacheHandler(GridNearTxPrepareResponse.class, new
CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processNearTxPrepareResponse(nodeId,
(GridNearTxPrepareResponse)msg);
}
});
- ctx.io().addCacheHandler(TX_MSG_HND_ID, GridNearTxFinishRequest.class,
new CI2<UUID, GridCacheMessage>() {
+ ctx.io().addCacheHandler(GridNearTxFinishRequest.class, new CI2<UUID,
GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processNearTxFinishRequest(nodeId,
(GridNearTxFinishRequest)msg);
}
});
- ctx.io().addCacheHandler(TX_MSG_HND_ID,
GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
+ ctx.io().addCacheHandler(GridNearTxFinishResponse.class, new CI2<UUID,
GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processNearTxFinishResponse(nodeId,
(GridNearTxFinishResponse)msg);
}
});
- ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxPrepareRequest.class,
new CI2<UUID, GridCacheMessage>() {
+ ctx.io().addCacheHandler(GridDhtTxPrepareRequest.class, new CI2<UUID,
GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxPrepareRequest(nodeId,
(GridDhtTxPrepareRequest)msg);
}
});
- ctx.io().addCacheHandler(TX_MSG_HND_ID,
GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
+ ctx.io().addCacheHandler(GridDhtTxPrepareResponse.class, new CI2<UUID,
GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxPrepareResponse(nodeId,
(GridDhtTxPrepareResponse)msg);
}
});
- ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxFinishRequest.class,
new CI2<UUID, GridCacheMessage>() {
+ ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, new CI2<UUID,
GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg);
}
});
- ctx.io().addCacheHandler(TX_MSG_HND_ID,
GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
+ ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, new
CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxOnePhaseCommitAckRequest(nodeId,
(GridDhtTxOnePhaseCommitAckRequest)msg);
}
});
- ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxFinishResponse.class,
new CI2<UUID, GridCacheMessage>() {
+ ctx.io().addCacheHandler(GridDhtTxFinishResponse.class, new CI2<UUID,
GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxFinishResponse(nodeId,
(GridDhtTxFinishResponse)msg);
}
});
- ctx.io().addCacheHandler(TX_MSG_HND_ID,
GridCacheTxRecoveryRequest.class,
- new CI2<UUID, GridCacheTxRecoveryRequest>() {
- @Override public void apply(UUID nodeId,
GridCacheTxRecoveryRequest req) {
- processCheckPreparedTxRequest(nodeId, req);
- }
- });
+ ctx.io().addCacheHandler(GridCacheTxRecoveryRequest.class,
+ (CI2<UUID,
GridCacheTxRecoveryRequest>)this::processCheckPreparedTxRequest);
- ctx.io().addCacheHandler(TX_MSG_HND_ID,
GridCacheTxRecoveryResponse.class,
- new CI2<UUID, GridCacheTxRecoveryResponse>() {
- @Override public void apply(UUID nodeId,
GridCacheTxRecoveryResponse res) {
- processCheckPreparedTxResponse(nodeId, res);
- }
- });
+ ctx.io().addCacheHandler(GridCacheTxRecoveryResponse.class,
+ (CI2<UUID,
GridCacheTxRecoveryResponse>)this::processCheckPreparedTxResponse);
- ctx.io().addCacheHandler(TX_MSG_HND_ID,
IncrementalSnapshotAwareMessage.class,
- new CI2<UUID, IncrementalSnapshotAwareMessage>() {
- @Override public void apply(UUID nodeId,
IncrementalSnapshotAwareMessage msg) {
- processIncrementalSnapshotAwareMessage(nodeId, msg);
- }
- });
+ ctx.io().addCacheHandler(IncrementalSnapshotAwareMessage.class,
+ (CI2<UUID,
IncrementalSnapshotAwareMessage>)this::processIncrementalSnapshotAwareMessage);
}
/** */
@@ -310,7 +296,7 @@ public class IgniteTxHandler {
GridCacheMessage cacheMsg = msg.payload();
ctx.io()
- .cacheHandler(TX_MSG_HND_ID, cacheMsg.getClass())
+ .cacheHandler(COMMON_MESSAGE_HANDLER_ID, cacheMsg.getClass())
.apply(nodeId, cacheMsg);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 14ac86ef782..da485b02294 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -892,7 +892,7 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
sharedCtx = cacheProc.context();
sharedCtx.io().addCacheHandler(
- 0, GridChangeGlobalStateMessageResponse.class,
+ GridChangeGlobalStateMessageResponse.class,
this::processChangeGlobalStateResponse
);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheRecreateTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheRecreateTest.java
new file mode 100755
index 00000000000..aca4448f420
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheRecreateTest.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
+import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Tests handling of ppending cache messages/operations when the required
cache was re created.
+ * */
+public class IgniteCacheRecreateTest extends GridCommonAbstractTest {
+ /** Cache name to be used in tests. */
+ private static final String CACHE_NAME = "test-recreate-cache";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(1);
+
+ startClientGrid(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ grid(0).destroyCache(CACHE_NAME);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAtomicPutAndCacheRecreate() throws Exception {
+ testCacheOperationAndCacheRecreate(
+ ATOMIC,
+ GridNearAtomicAbstractUpdateRequest.class,
+ (cache, keys) -> cache.put(keys.get(0), 42));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAtomicGetAndCacheRecreate() throws Exception {
+ testCacheOperationAndCacheRecreate(
+ ATOMIC,
+ GridNearSingleGetRequest.class,
+ (cache, keys) -> cache.get(keys.get(0)));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAtomicPutAllAndCacheRecreate() throws Exception {
+ testCacheOperationAndCacheRecreate(
+ ATOMIC,
+ GridNearAtomicFullUpdateRequest.class,
+ (cache, keys) -> {
+ Map<Integer, Integer> vals = new TreeMap<>();
+ vals.put(keys.get(0), 24);
+ vals.put(keys.get(1), 42);
+
+ cache.putAll(vals);
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAtomicGetAllAndCacheRecreate() throws Exception {
+ testCacheOperationAndCacheRecreate(
+ ATOMIC,
+ GridNearGetRequest.class,
+ (cache, keys) -> {
+ Set<Integer> vals = new TreeSet<>();
+ vals.add(keys.get(0));
+ vals.add(keys.get(1));
+
+ cache.getAll(vals);
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testImplicitInvokeAndCacheRecreate() throws Exception {
+ testCacheOperationAndCacheRecreate(
+ ATOMIC,
+ GridNearAtomicAbstractUpdateRequest.class,
+ (cache, keys) -> {
+ cache.invoke(keys.get(0), new CacheEntryProcessor<Integer,
Integer, Integer>() {
+ @Override public Integer process(
+ MutableEntry<Integer, Integer> entry,
+ Object... arguments
+ ) throws EntryProcessorException {
+ if (entry.exists())
+ return entry.getValue();
+
+ entry.setValue(123);
+
+ return entry.getValue();
+ }
+ });
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testImplicitOptimisticTxPutAndCacheRecreate() throws Exception
{
+ testCacheOperationAndCacheRecreate(
+ TRANSACTIONAL,
+ GridNearTxPrepareRequest.class,
+ (cache, keys) -> cache.put(keys.get(0), 42));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testImplicitOptimisticTxGetAndCacheRecreate() throws Exception
{
+ testCacheOperationAndCacheRecreate(
+ TRANSACTIONAL,
+ GridNearSingleGetRequest.class,
+ (cache, keys) -> cache.get(keys.get(0)));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testImplicitOptimisticTxPutAllAndCacheRecreate() throws
Exception {
+ testCacheOperationAndCacheRecreate(
+ TRANSACTIONAL,
+ GridNearTxPrepareRequest.class,
+ (cache, keys) -> {
+ Map<Integer, Integer> vals = new TreeMap<>();
+ vals.put(keys.get(0), 24);
+ vals.put(keys.get(1), 42);
+
+ cache.putAll(vals);
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testImplicitTxInvokeAndCacheRecreate() throws Exception {
+ testCacheOperationAndCacheRecreate(
+ TRANSACTIONAL,
+ GridNearTxPrepareRequest.class,
+ (cache, keys) -> {
+ cache.invoke(keys.get(0), new CacheEntryProcessor<Integer,
Integer, Integer>() {
+ @Override public Integer process(
+ MutableEntry<Integer, Integer> entry,
+ Object... arguments
+ ) throws EntryProcessorException {
+ if (entry.exists())
+ return entry.getValue();
+
+ entry.setValue(123);
+
+ return entry.getValue();
+ }
+ });
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPessimisticTxPutAndCacheRecreate() throws Exception {
+ testCacheOperationAndCacheRecreate(
+ TRANSACTIONAL,
+ GridNearLockRequest.class,
+ (cache, keys) -> {
+ try (Transaction tx =
grid(1).transactions().txStart(PESSIMISTIC, SERIALIZABLE)) {
+ cache.put(keys.get(0), 42);
+
+ tx.commit();
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPessimisticTxPutAllAndCacheRecreate() throws Exception {
+ testCacheOperationAndCacheRecreate(
+ TRANSACTIONAL,
+ GridNearLockRequest.class,
+ (cache, keys) -> {
+ try (Transaction tx =
grid(1).transactions().txStart(PESSIMISTIC, SERIALIZABLE)) {
+ Map<Integer, Integer> vals = new TreeMap<>();
+ vals.put(keys.get(0), 24);
+ vals.put(keys.get(1), 42);
+
+ cache.putAll(vals);
+
+ tx.commit();
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPessimisticTxGetAndCacheRecreate() throws Exception {
+ testCacheOperationAndCacheRecreate(
+ TRANSACTIONAL,
+ GridNearLockRequest.class,
+ (cache, keys) -> {
+ try (Transaction tx =
grid(1).transactions().txStart(PESSIMISTIC, SERIALIZABLE)) {
+ cache.get(keys.get(0));
+
+ tx.commit();
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAtomicScanAndCacheRecreate() throws Exception {
+ testCacheOperationAndCacheRecreate(
+ ATOMIC,
+ GridCacheQueryRequest.class,
+ (cache, keys) -> {
+ ScanQuery<Integer, Integer> scanQuery = new ScanQuery<>();
+ scanQuery.setPageSize(1);
+
+ try (QueryCursor qry = cache.query(scanQuery)) {
+ for (Object o : qry.getAll()) {
+ IgniteBiTuple<Integer, Integer> tuple =
(IgniteBiTuple<Integer, Integer>)o;
+
+ throw new RuntimeException("Succesfully read
unexpected value [k=" + tuple.getKey() + ", v=" + tuple.getValue());
+ }
+ }
+ });
+ }
+
+ /**
+ *
+ * @param mode Cache atomicity mode.
+ * @param clazz Cache message type to be blocked before re-creating a
cache.
+ * @param cacheOp Cache operation.
+ * @throws Exception If failed.
+ */
+ private void testCacheOperationAndCacheRecreate(
+ CacheAtomicityMode mode,
+ Class<? extends GridCacheIdMessage> clazz,
+ IgniteBiInClosure<IgniteCache<Integer, Integer>, List<Integer>> cacheOp
+ ) throws Exception {
+ IgniteEx g0 = grid(0);
+ IgniteEx client = grid(1);
+
+ // Initial loading.
+ IgniteCache<Integer, Integer> clientCache = createCache(client, mode);
+ for (int i = 0; i < 100; i++)
+ clientCache.put(i, i);
+
+ TestRecordingCommunicationSpi clientSpi =
TestRecordingCommunicationSpi.spi(client);
+ TestRecordingCommunicationSpi crdSpi =
TestRecordingCommunicationSpi.spi(g0);
+
+ // Block cache operation.
+ clientSpi.blockMessages((node, msg) -> {
+ if (clazz.isAssignableFrom(msg.getClass())) {
+ GridCacheIdMessage msg0 = (GridCacheIdMessage)msg;
+
+ if (msg0.cacheId() == 0 || msg0.cacheId() ==
CU.cacheId(CACHE_NAME))
+ return true;
+ }
+
+ return false;
+ });
+
+ // Block notifying the client node about upcoming changes.
+ crdSpi.blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionsFullMessage)
+ return true;
+
+ return false;
+ });
+
+ List<Integer> primaryKeys = primaryKeys(g0.cache(CACHE_NAME), 2, 1);
+
+ // Initiate cache operation.
+ IgniteInternalFuture<?> updFut = runAsync(() ->
cacheOp.apply(clientCache, primaryKeys));
+
+ // Wait for operation is initiated on the client node.
+ clientSpi.waitForBlocked();
+
+ // Destoy the existing cache and re-create it once again in order to
deliver the blocked cache message to the server node
+ // when the reqired cache is destroyed and new cache handlers are
registered.
+ g0.destroyCache(clientCache.getName());
+
+ // Create a new cache with the same name.
+ IgniteCache newCache = createCache(g0, mode);
+
+ // Upload new values.
+ for (int i = 0; i < 100; i++)
+ newCache.put(i, i + 1_000);
+
+ // Unblock cache operation.
+ clientSpi.stopBlock();
+
+ try {
+ updFut.get(10, TimeUnit.SECONDS);
+
+ fail("Exception was not thrown.");
+ }
+ catch (Exception e) {
+ assertTrue("Unexpected exception [err=" + e + ']', X.hasCause(e,
CacheException.class));
+ }
+ finally {
+ crdSpi.stopBlock();
+ }
+ }
+
+ /**
+ * Creates a cache using the given node as initiator node and the given
atomicity mode.
+ *
+ * @param ignite Node to be used to initiate creating a new cache.
+ * @param mode Cache atomicity mode.
+ * @return Ignite cache.
+ */
+ private IgniteCache<Integer, Integer> createCache(IgniteEx ignite,
CacheAtomicityMode mode) {
+ CacheConfiguration<Integer, Integer> cfg = new
CacheConfiguration<>(CACHE_NAME);
+
+ cfg.setBackups(1)
+ .setReadFromBackup(false)
+ .setAtomicityMode(mode)
+ .setAffinity(new RendezvousAffinityFunction(false, 32));
+
+ return ignite.getOrCreateCache(cfg);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index b6c14912874..09dc5e841b5 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -121,14 +121,16 @@ public class GridCacheMessageSelfTest extends
GridCommonAbstractTest {
IgniteEx ignite1 = grid(1);
ignite0.context().cache().context().io().addCacheHandler(
- 0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
+ TestBadMessage.class,
+ new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage
msg) {
throw new RuntimeException("Test bad message
exception");
}
});
ignite1.context().cache().context().io().addCacheHandler(
- 0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
+ TestBadMessage.class,
+ new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage
msg) {
throw new RuntimeException("Test bad message
exception");
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
index 354c26c048d..2a9c2c6208f 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
@@ -23,6 +23,7 @@ import
org.apache.ignite.internal.processors.cache.IgniteCachePutAllRestartTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateNodeRestartTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicNodeRestartTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheGetRestartTest;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheRecreateTest;
import
org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -41,6 +42,8 @@ import org.junit.runners.Suite;
IgniteBinaryMetadataUpdateNodeRestartTest.class,
+ IgniteCacheRecreateTest.class,
+
IgniteCacheGetRestartTest.class
})
public class IgniteCacheRestartTestSuite2 {