Continuous query compatibility fix (topVer can be null for old CacheContinuousQueryEntry).
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dee61900 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dee61900 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dee61900 Branch: refs/heads/ignite-2407 Commit: dee61900c26b1f2a0a84d5e400001fecad545ada Parents: 10214cc Author: sboikov <sboi...@gridgain.com> Authored: Thu Feb 25 12:54:11 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Feb 25 12:54:11 2016 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryHandler.java | 43 +++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dee61900/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 4397f69..1938edb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -737,6 +737,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) { assert entry != null; + if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. + assert entry.updateCounter() == 0L : entry; + + return F.asList(entry); + } + List<CacheContinuousQueryEntry> entries; synchronized (pendingEvts) { @@ -991,28 +997,25 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler routineId, t.get1()); - Collection<ClusterNode> nodes = new HashSet<>(); - - for (AffinityTopologyVersion topVer : t.get2()) - nodes.addAll(ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)); - - for (ClusterNode node : nodes) { - if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) { - try { - cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); - } - catch (ClusterTopologyCheckedException e) { - IgniteLogger log = ctx.log(getClass()); + for (AffinityTopologyVersion topVer : t.get2()) { + for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) { + if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) { + try { + cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + IgniteLogger log = ctx.log(getClass()); - if (log.isDebugEnabled()) - log.debug("Failed to send acknowledge message, node left " + - "[msg=" + msg + ", node=" + node + ']'); - } - catch (IgniteCheckedException e) { - IgniteLogger log = ctx.log(getClass()); + if (log.isDebugEnabled()) + log.debug("Failed to send acknowledge message, node left " + + "[msg=" + msg + ", node=" + node + ']'); + } + catch (IgniteCheckedException e) { + IgniteLogger log = ctx.log(getClass()); - U.error(log, "Failed to send acknowledge message " + - "[msg=" + msg + ", node=" + node + ']', e); + U.error(log, "Failed to send acknowledge message " + + "[msg=" + msg + ", node=" + node + ']', e); + } } } }