http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index befd1d7..2fb7fcb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; @@ -400,7 +401,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @param ignite Ignite. - * @param topVer Topology version. + * @param topVer Major topology version. + * @param minorVer Minor topology version. * @throws Exception If failed. */ private void waitRebalanceFinished(Ignite ignite, long topVer, int minorVer) throws Exception { @@ -511,9 +513,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC * @param nodes Count nodes. * @param killedNodeIdx Killed node index. * @param updCntrs Update counters. - * @return {@code True} if counters matches. */ - private boolean checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long> updCntrs) { + private void checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long> updCntrs) { for (int i = 0; i < nodes; i++) { if (i == killedNodeIdx) continue; @@ -527,8 +528,6 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC assertEquals(e.getValue(), act.get(e.getKey()).get2()); } } - - return true; } /** @@ -753,8 +752,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC assert GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { // (SRV_NODES + 1 client node) - 1 primary - backup nodes. - return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */) - - 1 /** Primary node */ - backups; + return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /* client node */) + - 1 /* Primary node */ - backups; } }, 5000L); @@ -1253,6 +1252,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @param expEvts Expected events. * @param lsnr Listener. + * @throws Exception If failed. */ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener3 lsnr, boolean allowLoseEvt) throws Exception { @@ -1347,9 +1347,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC QueryCursor<?> cur = qryClient.cache(DEFAULT_CACHE_NAME).query(qry); - final Collection<Object> backupQueue = backupQueue(ignite(1)); - - assertEquals(0, backupQueue.size()); + assertEquals(0, backupQueue(ignite(1)).size()); IgniteCache<Object, Object> cache0 = ignite(0).cache(DEFAULT_CACHE_NAME); @@ -1367,11 +1365,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return backupQueue.isEmpty(); + return backupQueue(ignite(1)).isEmpty(); } }, 2000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), + backupQueue(ignite(1)).size() < BACKUP_ACK_THRESHOLD); if (!latch.await(5, SECONDS)) fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); @@ -1389,11 +1388,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return backupQueue.isEmpty(); + return backupQueue(ignite(1)).isEmpty(); } }, ACK_FREQ + 2000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty()); + assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), backupQueue(ignite(1)).isEmpty()); if (!latch.await(5, SECONDS)) fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); @@ -1421,9 +1420,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC QueryCursor<?> cur = qryClient.cache(DEFAULT_CACHE_NAME).query(qry); - final Collection<Object> backupQueue = backupQueue(ignite(0)); - - assertEquals(0, backupQueue.size()); + assertEquals(0, backupQueue(ignite(0)).size()); long ttl = 100; @@ -1433,9 +1430,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC final List<Integer> keys = primaryKeys(ignite(1).cache(DEFAULT_CACHE_NAME), BACKUP_ACK_THRESHOLD); - CountDownLatch latch = new CountDownLatch(keys.size()); - - lsnr.latch = latch; + lsnr.latch = new CountDownLatch(keys.size()); for (Integer key : keys) { log.info("Put: " + key); @@ -1445,11 +1440,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return backupQueue.isEmpty(); + return backupQueue(ignite(0)).isEmpty(); } }, 2000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + assertTrue("Backup queue is not cleared: " + backupQueue(ignite(0)), + backupQueue(ignite(0)).size() < BACKUP_ACK_THRESHOLD); boolean wait = waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { @@ -1461,14 +1457,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return backupQueue.isEmpty(); + return backupQueue(ignite(0)).isEmpty(); } }, 2000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + assertTrue("Backup queue is not cleared: " + backupQueue(ignite(0)), backupQueue(ignite(0)).size() < BACKUP_ACK_THRESHOLD); - if (backupQueue.size() != 0) { - for (Object o : backupQueue) { + if (backupQueue(ignite(0)).size() != 0) { + for (Object o : backupQueue(ignite(0))) { CacheContinuousQueryEntry e = (CacheContinuousQueryEntry)o; assertNotSame("Evicted entry added to backup queue.", -1L, e.updateCounter()); @@ -1494,9 +1490,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC QueryCursor<?> cur = cache.query(qry); - final Collection<Object> backupQueue = backupQueue(ignite(1)); - - assertEquals(0, backupQueue.size()); + assertEquals(0, backupQueue(ignite(1)).size()); List<Integer> keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD); @@ -1512,11 +1506,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return backupQueue.isEmpty(); + return backupQueue(ignite(1)).isEmpty(); } }, 3000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), + backupQueue(ignite(1)).size() < BACKUP_ACK_THRESHOLD); if (!latch.await(5, SECONDS)) fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); @@ -1533,20 +1528,25 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC ConcurrentMap<Object, Object> infos = GridTestUtils.getFieldValue(proc, "rmtInfos"); - Collection<Object> backupQueue = null; + Collection<Object> backupQueue = new ArrayList<>(); for (Object info : infos.values()) { GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd"); - if (hnd.isQuery() && DEFAULT_CACHE_NAME.equals(hnd.cacheName())) { - backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class, "backupQueue"); + if (hnd.isQuery() && hnd.cacheName().equals(DEFAULT_CACHE_NAME)) { + Map<Integer, CacheContinuousQueryEventBuffer> map = GridTestUtils.getFieldValue(hnd, + CacheContinuousQueryHandler.class, "entryBufs"); + + for (CacheContinuousQueryEventBuffer buf : map.values()) { + Collection<Object> q = GridTestUtils.getFieldValue(buf, + CacheContinuousQueryEventBuffer.class, "backupQ"); - break; + if (q != null) + backupQueue.addAll(q); + } } } - assertNotNull(backupQueue); - return backupQueue; } @@ -2422,7 +2422,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>(); /** */ - private List<CacheEntryEvent<?, ?>> allEvts; + private final CopyOnWriteArrayList<CacheEntryEvent<?, ?>> allEvts; /** */ @LoggerResource @@ -2432,8 +2432,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC * @param saveAll Save all events flag. */ CacheEventListener1(boolean saveAll) { - if (saveAll) - allEvts = new ArrayList<>(); + allEvts = saveAll ? new CopyOnWriteArrayList<CacheEntryEvent<?, ?>>() : null; } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java index 26c7d41..85d68d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java @@ -262,11 +262,16 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd"); if (hnd.isQuery() && hnd.cacheName().equals(CACHE_NAME)) { - Collection<Object> q = GridTestUtils.getFieldValue(hnd, - CacheContinuousQueryHandler.class, "backupQueue"); + Map<Integer, CacheContinuousQueryEventBuffer> map = GridTestUtils.getFieldValue(hnd, + CacheContinuousQueryHandler.class, "entryBufs"); - if (q != null) - backupQueues.add(q); + for (CacheContinuousQueryEventBuffer buf : map.values()) { + Collection<Object> q = GridTestUtils.getFieldValue(buf, + CacheContinuousQueryEventBuffer.class, "backupQ"); + + if (q != null) + backupQueues.add(q); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java index b91217f..81a7515 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java @@ -35,7 +35,6 @@ import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -138,9 +137,9 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst true, 1, 1L, - new AffinityTopologyVersion(1L)); + new AffinityTopologyVersion(1L), + (byte)0); - e0.filteredEvents(new GridLongList(new long[]{1L, 2L})); e0.markFiltered(); ByteBuffer buf = ByteBuffer.allocate(4096); @@ -156,7 +155,6 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst assertEquals(e0.cacheId(), e1.cacheId()); assertEquals(e0.eventType(), e1.eventType()); assertEquals(e0.isFiltered(), e1.isFiltered()); - assertEquals(GridLongList.asList(e0.filteredEvents()), GridLongList.asList(e1.filteredEvents())); assertEquals(e0.isBackup(), e1.isBackup()); assertEquals(e0.isKeepBinary(), e1.isKeepBinary()); assertEquals(e0.partition(), e1.partition()); http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index 8dd273a..0084cdc 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -21,6 +21,8 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryConcurrentPartitionUpdateTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBufferTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryExecuteInPrimaryTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest; @@ -118,6 +120,9 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(ContinuousQueryPeerClassLoadingTest.class); suite.addTestSuite(ClientReconnectContinuousQueryTest.class); + suite.addTestSuite(CacheContinuousQueryConcurrentPartitionUpdateTest.class); + suite.addTestSuite(CacheContinuousQueryEventBufferTest.class); + suite.addTest(IgniteDistributedJoinTestSuite.suite()); return suite;
