IGNITE-2384 Fixed "Notification missed in continuous query".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/252ba877 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/252ba877 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/252ba877 Branch: refs/heads/ignite-2324 Commit: 252ba877e2e8b357ca326ce23d8fcc83dd0138bd Parents: 9a99633 Author: nikolay_tikhonov <[email protected]> Authored: Mon Jan 18 20:41:53 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon Jan 18 20:42:16 2016 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 3 +- .../internal/GridMessageListenHandler.java | 6 +- .../managers/discovery/CustomEventListener.java | 4 +- .../discovery/GridDiscoveryManager.java | 8 +- .../continuous/CacheContinuousQueryHandler.java | 36 +-- .../continuous/GridContinuousHandler.java | 4 +- .../continuous/GridContinuousProcessor.java | 15 +- ...ContinuousQueryFailoverAbstractSelfTest.java | 80 ++++++ .../CacheContinuousQueryLostPartitionTest.java | 256 +++++++++++++++++++ .../IgniteBinaryCacheQueryTestSuite.java | 2 + 10 files changed, 383 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index be35ba4..69af6cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; @@ -135,7 +136,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void updateCounters(Map<Integer, Long> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 2edfda5..13aeb54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; @@ -108,12 +109,13 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void updateCounters(Map<Integer, Long> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) { // No-op. } /** {@inheritDoc} */ - @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { + @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) + throws IgniteCheckedException { ctx.io().addUserMessageListener(topic, pred); return RegisterStatus.REGISTERED; http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java index ab143fb..21fd842 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java @@ -18,14 +18,16 @@ package org.apache.ignite.internal.managers.discovery; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; /** * Listener interface. */ public interface CustomEventListener<T extends DiscoveryCustomMessage> { /** + * @param topVer Current topology version. * @param snd Sender. * @param msg Message. */ - public void onCustomEvent(ClusterNode snd, T msg); + public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, T msg); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 29e85dd..42f9b6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -507,14 +507,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { verChanged = true; } + } + + nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); + if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) { List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls); if (list != null) { for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) { try { - lsnr.onCustomEvent(node, customMsg); + lsnr.onCustomEvent(nextTopVer, node, customMsg); } catch (Exception e) { U.error(log, "Failed to notify direct custom event listener: " + customMsg, e); @@ -524,8 +528,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); - // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index ad86d65..fa54a6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -148,6 +148,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** */ private Map<Integer, Long> initUpdCntrs; + /** */ + private AffinityTopologyVersion initTopVer; + /** * Required by {@link Externalizable}. */ @@ -170,6 +173,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. * @param taskHash Task name hash code. * @param locCache {@code True} if local cache. + * @param keepBinary Keep binary flag. */ public CacheContinuousQueryHandler( String cacheName, @@ -238,7 +242,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** {@inheritDoc} */ - @Override public void updateCounters(Map<Integer, Long> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) { + this.initTopVer = topVer; this.initUpdCntrs = cntrs; } @@ -382,9 +387,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } else { if (!internal) { - entry.markBackup(); + // Skip init query and expire entries. + if (entry.updateCounter() != -1L) { + entry.markBackup(); - backupQueue.add(entry); + backupQueue.add(entry); + } } } } @@ -483,7 +491,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler e = buf.skipEntry(e); - if (e != null) + if (e != null && !ctx.localNodeId().equals(nodeId)) ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true); } catch (ClusterTopologyCheckedException ex) { @@ -642,15 +650,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return F.asList(e); } - // Initial query entry or evicted entry. - // This events should be fired immediately. - if (e.updateCounter() == -1) + // Initial query entry or evicted entry. These events should be fired immediately. + if (e.updateCounter() == -1L) return F.asList(e); PartitionRecovery rec = rcvs.get(e.partition()); if (rec == null) { - rec = new PartitionRecovery(ctx.log(getClass()), cacheContext(ctx).topology().topologyVersion(), + rec = new PartitionRecovery(ctx.log(getClass()), initTopVer, initUpdCntrs == null ? null : initUpdCntrs.get(e.partition())); PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec); @@ -724,6 +731,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param initCntr Update counters. */ public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) { + assert topVer.topologyVersion() > 0 : topVer; + this.log = log; if (initCntr != null) { @@ -745,17 +754,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler List<CacheContinuousQueryEntry> entries; synchronized (pendingEvts) { - // Received first event. - if (curTop == AffinityTopologyVersion.NONE) { - lastFiredEvt = entry.updateCounter(); - - curTop = entry.topologyVersion(); - - return F.asList(entry); - } - if (curTop.compareTo(entry.topologyVersion()) < 0) { - if (entry.updateCounter() == 1 && !entry.isBackup()) { + if (entry.updateCounter() == 1L && !entry.isBackup()) { entries = new ArrayList<>(pendingEvts.size()); for (CacheContinuousQueryEntry evt : pendingEvts.values()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 900835a..8cd30a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.jetbrains.annotations.Nullable; /** @@ -154,6 +155,7 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { /** * @param cntrs Init state for partition counters. + * @param topVer Topology version. */ - public void updateCounters(Map<Integer, Long> cntrs); + public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index cb028f3..7c7e3e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.discovery.CustomEventListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; @@ -191,7 +192,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class, new CustomEventListener<StartRoutineDiscoveryMessage>() { - @Override public void onCustomEvent(ClusterNode snd, + @Override public void onCustomEvent(AffinityTopologyVersion topVer, + ClusterNode snd, StartRoutineDiscoveryMessage msg) { if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping()) processStartRequest(snd, msg); @@ -200,7 +202,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class, new CustomEventListener<StartRoutineAckDiscoveryMessage>() { - @Override public void onCustomEvent(ClusterNode snd, + @Override public void onCustomEvent(AffinityTopologyVersion topVer, + ClusterNode snd, StartRoutineAckDiscoveryMessage msg) { StartFuture fut = startFuts.remove(msg.routineId()); @@ -228,7 +231,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } - routine.handler().updateCounters(msg.updateCounters()); + routine.handler().updateCounters(topVer, msg.updateCounters()); } fut.onRemoteRegistered(); @@ -246,7 +249,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class, new CustomEventListener<StopRoutineDiscoveryMessage>() { - @Override public void onCustomEvent(ClusterNode snd, + @Override public void onCustomEvent(AffinityTopologyVersion topVer, + ClusterNode snd, StopRoutineDiscoveryMessage msg) { if (!snd.id().equals(ctx.localNodeId())) { UUID routineId = msg.routineId(); @@ -265,7 +269,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class, new CustomEventListener<StopRoutineAckDiscoveryMessage>() { - @Override public void onCustomEvent(ClusterNode snd, + @Override public void onCustomEvent(AffinityTopologyVersion topVer, + ClusterNode snd, StopRoutineAckDiscoveryMessage msg) { StopFuture fut = stopFuts.remove(msg.routineId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 283da80..5de3d0f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -41,6 +41,9 @@ import javax.cache.CacheException; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.expiry.TouchedExpiryPolicy; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; @@ -90,11 +93,13 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * @@ -1278,6 +1283,81 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @throws Exception If failed. */ + public void testBackupQueueEvict() throws Exception { + startGridsMultiThreaded(2); + + client = true; + + Ignite qryClient = startGrid(2); + + CacheEventListener1 lsnr = new CacheEventListener1(false); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = qryClient.cache(null).query(qry); + + final Collection<Object> backupQueue = backupQueue(ignite(0)); + + assertEquals(0, backupQueue.size()); + + long ttl = 100; + + final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl)); + + final IgniteCache<Object, Object> cache0 = ignite(2).cache(null).withExpiryPolicy(expiry); + + final List<Integer> keys = primaryKeys(ignite(1).cache(null), BACKUP_ACK_THRESHOLD); + + CountDownLatch latch = new CountDownLatch(keys.size()); + + lsnr.latch = latch; + + for (Integer key : keys) { + log.info("Put: " + key); + + cache0.put(key, key); + } + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return backupQueue.isEmpty(); + } + }, 2000); + + assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + + boolean wait = waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return cache0.localPeek(keys.get(0)) == null; + } + }, ttl + 1000); + + assertTrue("Entry evicted.", wait); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return backupQueue.isEmpty(); + } + }, 2000); + + assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + + if (backupQueue.size() != 0) { + for (Object o : backupQueue) { + CacheContinuousQueryEntry e = (CacheContinuousQueryEntry)o; + + assertNotSame("Evicted entry added to backup queue.", -1L, e.updateCounter()); + } + } + + cur.close(); + } + + /** + * @throws Exception If failed. + */ public void testBackupQueueCleanupServerQuery() throws Exception { Ignite qryClient = startGridsMultiThreaded(2); http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java new file mode 100644 index 0000000..30613a4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryExpiredListener; +import javax.cache.event.CacheEntryRemovedListener; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static javax.cache.configuration.FactoryBuilder.factoryOf; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; + +/** + * Test from https://issues.apache.org/jira/browse/IGNITE-2384. + */ +public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTest { + /** */ + static public TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Cache name. */ + public static final String CACHE_NAME = "test_cache"; + + /** Cache name. */ + public static final String TX_CACHE_NAME = "tx_test_cache"; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGridsMultiThreaded(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testTxEvent() throws Exception { + testEvent(TX_CACHE_NAME, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicEvent() throws Exception { + testEvent(CACHE_NAME, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxClientEvent() throws Exception { + testEvent(TX_CACHE_NAME, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicClientEvent() throws Exception { + testEvent(CACHE_NAME, true); + } + + /** + * @param cacheName Cache name. + * @throws Exception If failed. + */ + public void testEvent(String cacheName, boolean client) throws Exception { + IgniteCache<Integer, String> cache1 = grid(0).getOrCreateCache(cacheName); + + final AllEventListener<Integer, String> lsnr1 = registerCacheListener(cache1); + + IgniteCache<Integer, String> cache2 = grid(1).getOrCreateCache(cacheName); + + Integer key = primaryKey(cache1); + + cache1.put(key, "1"); + + // Note the issue is only reproducible if the second registration is done right + // here, after the first put() above. + AllEventListener<Integer, String> lsnr20; + + if (client) { + IgniteCache<Integer, String> clnCache = startGrid(3).getOrCreateCache(cacheName); + + lsnr20 = registerCacheListener(clnCache); + } + else + lsnr20 = registerCacheListener(cache2); + + final AllEventListener<Integer, String> lsnr2 = lsnr20; + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr1.createdCnt.get() == 1; + } + }, 2000L) : "Unexpected number of events: " + lsnr1.createdCnt.get(); + + // Sanity check. + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr2.createdCnt.get() == 0; + } + }, 2000L) : "Expected no create events, but got: " + lsnr2.createdCnt.get(); + + // node2 now becomes the primary for the key. + grid(0).close(); + + cache2.put(key, "2"); + + // Sanity check. + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr1.createdCnt.get() == 1; + } + }, 2000L) : "Expected no change here, but got: " + lsnr1.createdCnt.get(); + + // Sanity check. + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr2.updatedCnt.get() == 0; + } + }, 2000L) : "Expected no update events, but got: " + lsnr2.updatedCnt.get(); + + System.out.println(">>>>> " + lsnr2.createdCnt.get()); + + // This assertion fails: 0 events get delivered. + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr2.createdCnt.get() == 1; + } + }, 2000L) : "Expected a single event due to '2', but got: " + lsnr2.createdCnt.get(); + } + + /** + * @param cache Cache. + * @return Event listener. + */ + private AllEventListener<Integer, String> registerCacheListener(IgniteCache<Integer, String> cache) { + AllEventListener<Integer, String> lsnr = new AllEventListener<>(); + + cache.registerCacheEntryListener( + new MutableCacheEntryListenerConfiguration<>(factoryOf(lsnr), null, true, false)); + + return lsnr; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + cfg.setCacheConfiguration(cache(TX_CACHE_NAME), cache(CACHE_NAME)); + + if (name.endsWith("3")) + cfg.setClientMode(true); + + return cfg; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + */ + protected CacheConfiguration<Integer, String> cache(String cacheName) { + CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>(cacheName); + + cfg.setCacheMode(PARTITIONED); + + if (cacheName.equals(CACHE_NAME)) + cfg.setAtomicityMode(ATOMIC); + else + cfg.setAtomicityMode(TRANSACTIONAL); + + cfg.setRebalanceMode(SYNC); + cfg.setWriteSynchronizationMode(PRIMARY_SYNC); + cfg.setBackups(0); + + return cfg; + } + + /** + * Event listener. + */ + public static class AllEventListener<K, V> implements CacheEntryCreatedListener<K, V>, + CacheEntryUpdatedListener<K, V>, CacheEntryRemovedListener<K, V>, CacheEntryExpiredListener<K, V>, + Serializable { + /** */ + final AtomicInteger createdCnt = new AtomicInteger(); + + /** */ + final AtomicInteger updatedCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void onCreated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + createdCnt.incrementAndGet(); + + System.out.printf("onCreate: %s. \n", evts); + } + + /** {@inheritDoc} */ + @Override public void onExpired(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + System.out.printf("onExpired: %s. \n", evts); + } + + /** {@inheritDoc} */ + @Override public void onRemoved(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + System.out.printf("onRemoved: %s. \n", evts); + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + updatedCnt.incrementAndGet(); + + System.out.printf("onUpdated: %s.", evts); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/252ba877/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java index eddfcf4..3bab1f9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java @@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest; @@ -200,6 +201,7 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite { suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class); + suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);
