Repository: ignite Updated Branches: refs/heads/ignite-2791 4731a6bb4 -> 01d470f68
IGNITE-2791 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01d470f6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01d470f6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01d470f6 Branch: refs/heads/ignite-2791 Commit: 01d470f6808ede93b00c7a32929ba871006e6b91 Parents: 4731a6b Author: nikolay_tikhonov <[email protected]> Authored: Mon Mar 21 17:54:09 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon Mar 21 17:54:09 2016 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 3 +- .../internal/GridMessageListenHandler.java | 3 +- .../continuous/CacheContinuousQueryHandler.java | 40 ++++++++++++++------ .../continuous/CacheContinuousQueryManager.java | 2 + .../continuous/GridContinuousHandler.java | 4 +- .../continuous/GridContinuousProcessor.java | 9 +++-- 6 files changed, 42 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index bc43195..19bf1a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -136,7 +136,8 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode, + Map<Integer, Long> cntrs) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 089091b..0ac6877 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -125,7 +125,8 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode, + Map<Integer, Long> cntrs) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index e794f46..6243af7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -148,7 +148,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private transient int cacheId; /** */ - private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrs; + private transient volatile Map<Integer, Long> initUpdCntrs; + + /** */ + private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode; /** */ private transient volatile AffinityTopologyVersion initTopVer; @@ -266,7 +269,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode, + Map<Integer, Long> cntrs) { + this.initUpdCntrsPerNode = cntrsPerNode; this.initUpdCntrs = cntrs; this.initTopVer = topVer; } @@ -553,10 +558,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * Wait topology. */ public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException { - cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get(); + GridCacheContext<K, V> cctx = cacheContext(ctx); - for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++) - getOrCreatePartitionRecovery(ctx, partId); + if (!cctx.isLocal()) { + cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get(); + + for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++) + getOrCreatePartitionRecovery(ctx, partId); + } } /** {@inheritDoc} */ @@ -684,18 +693,25 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler AffinityTopologyVersion initTopVer0 = initTopVer; - if (initTopVer0 != null && !initTopVer0.equals(AffinityTopologyVersion.NONE)) { - GridCacheAffinityManager aff = cacheContext(ctx).affinity(); + if (initTopVer0 != null) { + GridCacheContext<K, V> cctx = cacheContext(ctx); - for (ClusterNode node : aff.nodes(partId, initTopVer)) { - Map<Integer, Long> map = initUpdCntrs.get(node.id()); + GridCacheAffinityManager aff = cctx.affinity(); - if (map != null) { - partCntr = map.get(partId); + if (initUpdCntrsPerNode != null) { + for (ClusterNode node : aff.nodes(partId, initTopVer)) { + Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id()); - break; + if (map != null) { + partCntr = map.get(partId); + + break; + } } } + else if (initUpdCntrs != null) { + partCntr = initUpdCntrs.get(partId); + } } rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntr); http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 2847063..869a51b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -657,6 +657,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { log.warning("Failed to start continuous query.", e); cctx.kernalContext().continuous().stopRoutine(id); + + throw new IgniteCheckedException("Failed to start continuous query.", e); } if (notifyExisting) { http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 2ab75d5..46e87af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -154,8 +154,10 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { public String cacheName(); /** + * @param cntrsPerNode Init state partition counters for node. * @param cntrs Init state for partition counters. * @param topVer Topology version. */ - public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrs); + public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrsPerNode, + Map<Integer, Long> cntrs); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 80ab092..f2d6e1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -219,17 +219,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Update partition counters. if (routine != null && routine.handler().isQuery()) { - Map<UUID, Map<Integer, Long>> cnrtsPerNode = msg.updateCountersPerNode(); + Map<UUID, Map<Integer, Long>> cntrsPerNode = msg.updateCountersPerNode(); + Map<Integer, Long> cntrs = msg.updateCounters(); GridCacheAdapter<Object, Object> interCache = ctx.cache().internalCache(routine.handler().cacheName()); GridCacheContext cctx = interCache != null ? interCache.context() : null; - if (cctx != null && cnrtsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) - cnrtsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters()); + if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) + cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters()); - routine.handler().updateCounters(topVer, cnrtsPerNode); + routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); } fut.onRemoteRegistered();
