Repository: ignite Updated Branches: refs/heads/ignite-2791 fa97a07c0 -> 716cce428
IGNITE-2791 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/716cce42 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/716cce42 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/716cce42 Branch: refs/heads/ignite-2791 Commit: 716cce4289d066fad9a534e27942fb61167132da Parents: fa97a07 Author: nikolay_tikhonov <[email protected]> Authored: Mon Mar 21 15:50:10 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon Mar 21 15:50:10 2016 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 2 +- .../internal/GridMessageListenHandler.java | 2 +- .../continuous/CacheContinuousQueryHandler.java | 68 +++++--- .../continuous/CacheContinuousQueryManager.java | 10 ++ .../continuous/GridContinuousHandler.java | 2 +- .../continuous/GridContinuousProcessor.java | 39 +---- .../StartRoutineAckDiscoveryMessage.java | 22 ++- .../StartRoutineDiscoveryMessage.java | 20 ++- .../GridCacheContinuousQueryConcurrentTest.java | 164 ++++++++++++++++++- 9 files changed, 259 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index e2b1184..bc43195 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -136,7 +136,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrs) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 402365c..089091b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -125,7 +125,7 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrs) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 10fbd89..23cd48c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; @@ -72,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgnitePredicate; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; @@ -146,10 +148,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private transient int cacheId; /** */ - private Map<Integer, Long> initUpdCntrs; + private transient Map<UUID, Map<Integer, Long>> initUpdCntrs; /** */ - private AffinityTopologyVersion initTopVer; + private transient AffinityTopologyVersion initTopVer; /** */ private transient boolean ignoreClsNotFound; @@ -264,7 +266,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrs) { this.initTopVer = topVer; this.initUpdCntrs = cntrs; } @@ -296,20 +298,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert !skipPrimaryCheck || loc; - final GridCacheContext<K, V> cctx = cacheContext(ctx); - - if (!internal && cctx != null && initUpdCntrs != null) { - Map<Integer, Long> map = cctx.topology().updateCounters(); - - for (Map.Entry<Integer, Long> e : map.entrySet()) { - Long cntr0 = initUpdCntrs.get(e.getKey()); - Long cntr1 = e.getValue(); - - if (cntr0 == null || cntr1 > cntr0) - initUpdCntrs.put(e.getKey(), cntr1); - } - } - CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { @Override public void onExecution() { if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -561,6 +549,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler entry.prepareMarshal(cctx); } + /** + * Wait topology. + */ + public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException { + cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get(); + + for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++) + getOrCreatePartitionRecovery(ctx, partId); + } + /** {@inheritDoc} */ @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { // No-op. @@ -668,19 +666,45 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (e.updateCounter() == -1L) return F.asList(e); - PartitionRecovery rec = rcvs.get(e.partition()); + PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition()); + + return rec.collectEntries(e); + } + + /** + * @param ctx Context. + * @param partId Partition id. + * @return Partition recovery. + */ + @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) { + PartitionRecovery rec = rcvs.get(partId); if (rec == null) { - rec = new PartitionRecovery(ctx.log(getClass()), initTopVer, - initUpdCntrs == null ? null : initUpdCntrs.get(e.partition())); + Long partCntr = null; - PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec); + if (initTopVer != null && !initTopVer.equals(AffinityTopologyVersion.NONE)) { + GridCacheAffinityManager aff = cacheContext(ctx).affinity(); + + for (ClusterNode node : aff.nodes(partId, initTopVer)) { + Map<Integer, Long> map = initUpdCntrs.get(node.id()); + + if (map != null) { + partCntr = map.get(partId); + + break; + } + } + } + + rec = new PartitionRecovery(ctx.log(getClass()), initTopVer, partCntr); + + PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec); if (oldRec != null) rec = oldRec; } - return rec.collectEntries(e); + return rec; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 353043f..2847063 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -649,6 +649,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { autoUnsubscribe, pred).get(); + try { + if (hnd.isQuery() && cctx.userCache()) + hnd.waitTopologyFuture(cctx.kernalContext()); + } + catch (IgniteCheckedException e) { + log.warning("Failed to start continuous query.", e); + + cctx.kernalContext().continuous().stopRoutine(id); + } + if (notifyExisting) { final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator(); http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 8cd30a8..2ab75d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -157,5 +157,5 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { * @param cntrs Init state for partition counters. * @param topVer Topology version. */ - public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs); + public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrs); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index c1f4d22..f29d413 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -219,32 +219,19 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Update partition counters. if (routine != null && routine.handler().isQuery()) { - Map<Integer, Long> cntrs = msg.updateCounters(); + Map<UUID, Map<Integer, Long>> cnrtsPerNode = msg.updateCountersPerNode(); GridCacheAdapter<Object, Object> interCache = ctx.cache().internalCache(routine.handler().cacheName()); - if (interCache != null && cntrs != null && interCache.context() != null + if (interCache != null && cnrtsPerNode != null && interCache.context() != null && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) { GridCacheContext<Object, Object> cctx = interCache.context(); - Map<Integer, Long> map = cctx.topology().updateCounters(); - - for (Map.Entry<Integer, Long> e : map.entrySet()) { - if (!ctx.cache().context().exchange().hasPendingExchange() && - !cctx.affinity().primary(cctx.localNode(), e.getKey(), - cctx.affinity().affinityTopologyVersion())) - continue; - - Long cntr0 = cntrs.get(e.getKey()); - Long cntr1 = e.getValue(); - - if (cntr0 == null || cntr1 > cntr0) - cntrs.put(e.getKey(), cntr1); - } + cnrtsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters()); } - routine.handler().updateCounters(topVer, cntrs); + routine.handler().updateCounters(topVer, cnrtsPerNode); } fut.onRemoteRegistered(); @@ -929,22 +916,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (proc != null) { GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); - if (cache != null && !cache.isLocal()) { - GridCacheContext cctx = cache.context(); - - Map<Integer, Long> cntrs = cache.context().topology().updateCounters(); - - AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - - if (!ctx.cache().context().exchange().hasPendingExchange()) { - for (Integer partId : new ArrayList<>(cntrs.keySet())) { - if (!cctx.affinity().primary(cctx.localNode(), partId, topVer)) - cntrs.remove(partId); - } - } - - req.addUpdateCounters(cntrs); - } + if (cache != null && !cache.isLocal()) + req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java index 9644372..ca34b27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -36,18 +37,28 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { private final Map<UUID, IgniteCheckedException> errs; /** */ + @GridToStringExclude private final Map<Integer, Long> updateCntrs; + /** */ + @GridToStringExclude + private final Map<UUID, Map<Integer, Long>> updateCntrsPerNode; + /** * @param routineId Routine id. * @param errs Errs. + * @param cntrs Partition counters. + * @param cntrsPerNode Partition counters per node. */ - public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs, - Map<Integer, Long> cntrs) { + public StartRoutineAckDiscoveryMessage(UUID routineId, + Map<UUID, IgniteCheckedException> errs, + Map<Integer, Long> cntrs, + Map<UUID, Map<Integer, Long>> cntrsPerNode) { super(routineId); this.errs = new HashMap<>(errs); this.updateCntrs = cntrs; + this.updateCntrsPerNode = cntrsPerNode; } /** {@inheritDoc} */ @@ -63,6 +74,13 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { } /** + * @return Update counters for partitions per each node. + */ + public Map<UUID, Map<Integer, Long>> updateCountersPerNode() { + return updateCntrsPerNode; + } + + /** * @return Errs. */ public Map<UUID, IgniteCheckedException> errs() { http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index ff037d4..4df9599 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -40,6 +40,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private Map<Integer, Long> updateCntrs; + /** */ + private Map<UUID, Map<Integer, Long>> updateCntrsPerNode; + /** Keep binary flag. */ private boolean keepBinary; @@ -72,7 +75,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** * @param cntrs Update counters. */ - public void addUpdateCounters(Map<Integer, Long> cntrs) { + private void addUpdateCounters(Map<Integer, Long> cntrs) { if (updateCntrs == null) updateCntrs = new HashMap<>(); @@ -86,6 +89,19 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { } /** + * @param nodeId Local node ID. + * @param cntrs Update counters. + */ + public void addUpdateCounters(UUID nodeId, Map<Integer, Long> cntrs) { + //addUpdateCounters(cntrs); + + if (updateCntrsPerNode == null) + updateCntrsPerNode = new HashMap<>(); + + assert updateCntrsPerNode.put(nodeId, cntrs) == null; + } + + /** * @return Errs. */ public Map<UUID, IgniteCheckedException> errs() { @@ -106,7 +122,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** {@inheritDoc} */ @Override public DiscoveryCustomMessage ackMessage() { - return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs); + return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java index 8803e8e..4995c6f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.configuration.CacheEntryListenerConfiguration; @@ -38,17 +39,22 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.TimeUnit.MINUTES; import static javax.cache.configuration.FactoryBuilder.factoryOf; /** @@ -59,7 +65,7 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** */ - private static final int NODES = 2; + private static final int NODES = 3; /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { @@ -81,6 +87,11 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + cfg.setPeerClassLoadingEnabled(false); + + if (gridName.endsWith(String.valueOf(NODES))) + cfg.setClientMode(ThreadLocalRandom.current().nextBoolean()); + return cfg; } @@ -94,15 +105,43 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe /** * @throws Exception If failed. */ + public void testRestartReplicatedTx() throws Exception { + testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 2)); + } + + /** + * @throws Exception If failed. + */ + public void testRestartReplicated() throws Exception { + testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2)); + } + + /** + * @throws Exception If failed. + */ + public void testRestartPartition() throws Exception { + testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2)); + } + + /** + * @throws Exception If failed. + */ + public void testRestartPartitionTx() throws Exception { + testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 2)); + } + + /** + * @throws Exception If failed. + */ public void testReplicatedAtomic() throws Exception { - testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 1)); + testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2)); } /** * @throws Exception If failed. */ public void testPartitionTx() throws Exception { - testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 1)); + testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 2)); } /** @@ -135,7 +174,7 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe try { final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 30; i++) { log.info("Start iteration: " + i); final int i0 = i; @@ -155,7 +194,97 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe if (log.isDebugEnabled()) log.debug("Started cont query count: " + count); - if (++count == conQryCnt) + if (++count >= conQryCnt) + latch.countDown(); + } + + return futures; + } + }); + + assert U.await(latch, 1, MINUTES); + + cache.put(i, "v"); + + stop.set(true); + + List<IgniteFuture<String>> contQries = fut.get(); + + for (IgniteFuture<String> contQry : contQries) + contQry.get(2, TimeUnit.SECONDS); + } + } + finally { + execSrv.shutdownNow(); + + grid(0).destroyCache(ccfg.getName()); + } + } + + /** + * @throws Exception If failed. + */ + public void testRestartRegistration(CacheConfiguration ccfg) throws Exception { + ExecutorService execSrv = newSingleThreadExecutor(); + + final AtomicBoolean stopRes = new AtomicBoolean(false); + + IgniteInternalFuture<?> restartFut = null; + + try { + final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg); + + restartFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stopRes.get()) { + startGrid(NODES); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return grid(0).cluster().nodes().size() == NODES + 1; + } + }, 5000L); + + Thread.sleep(300); + + stopGrid(NODES); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return grid(0).cluster().nodes().size() == NODES; + } + }, 5000L); + + Thread.sleep(300); + } + + return null; + } + }); + + U.sleep(100); + + for (int i = 0; i < 30; i++) { + log.info("Start iteration: " + i); + + final int i0 = i; + final AtomicBoolean stop = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); + final int conQryCnt = 50; + + Future<List<IgniteFuture<String>>> fut = execSrv.submit( + new Callable<List<IgniteFuture<String>>>() { + @Override public List<IgniteFuture<String>> call() throws Exception { + int count = 0; + List<IgniteFuture<String>> futures = new ArrayList<>(); + + while (!stop.get()) { + futures.add(waitForKey(i0, cache, count)); + + if (log.isDebugEnabled()) + log.debug("Started cont query count: " + count); + + if (++count >= conQryCnt) latch.countDown(); } @@ -167,16 +296,28 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe cache.put(i, "v"); + assertEquals("v", cache.get(i)); + stop.set(true); List<IgniteFuture<String>> contQries = fut.get(); - for (int j = 0; j < contQries.size(); j++) - contQries.get(j).get(2, TimeUnit.SECONDS); + for (IgniteFuture<String> contQry : contQries) + contQry.get(5, TimeUnit.SECONDS); } } finally { execSrv.shutdownNow(); + + grid(0).destroyCache(ccfg.getName()); + + if (restartFut != null) { + stopRes.set(true); + + restartFut.get(); + + stopGrid(NODES); + } } } @@ -201,7 +342,13 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe promise.listen(new IgniteInClosure<IgniteFuture<String>>() { @Override public void apply(IgniteFuture<String> future) { - cache.deregisterCacheEntryListener(cfg); + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + cache.deregisterCacheEntryListener(cfg); + + return null; + } + }); } }); @@ -262,6 +409,7 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe cfg.setAtomicityMode(atomicMode); cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); cfg.setBackups(backups); + cfg.setReadFromBackup(false); return cfg; }
