Repository: ignite Updated Branches: refs/heads/ignite-426-2-reb 805997714 -> a184d8412
IGNITE-426 Fixed tests. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a184d841 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a184d841 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a184d841 Branch: refs/heads/ignite-426-2-reb Commit: a184d8412679c635ae6b92742db52126f3425d7d Parents: 8059977 Author: Tikhonov Nikolay <[email protected]> Authored: Wed Nov 4 16:28:13 2015 +0300 Committer: Tikhonov Nikolay <[email protected]> Committed: Wed Nov 4 16:28:13 2015 +0300 ---------------------------------------------------------------------- ...ContinuousQueryFailoverAbstractSelfTest.java | 241 +------------------ 1 file changed, 10 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a184d841/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index c0d22e3..2c71bc2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -1251,225 +1251,6 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @throws Exception If failed. */ - public void testFailover() throws Exception { - this.backups = 3; - - final int SRV_NODES = 4; - - startGridsMultiThreaded(SRV_NODES); - - client = true; - - final Ignite qryCln = startGrid(SRV_NODES); - - client = false; - - final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null); - - final CacheEventListener2 lsnr = new CacheEventListener2(); - - ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - - qry.setLocalListener(lsnr); - - QueryCursor<?> cur = qryClnCache.query(qry); - - final AtomicBoolean stop = new AtomicBoolean(); - - final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>(); - - boolean processorPut = false; - - IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - final int idx = SRV_NODES + 1; - - while (!stop.get() && !err) { - log.info("Start node: " + idx); - - startGrid(idx); - - awaitPartitionMapExchange(); - - Thread.sleep(200); - - log.info("Stop node: " + idx); - - try { - stopGrid(idx); - - awaitPartitionMapExchange(); - - Thread.sleep(200); - } - catch (Exception e) { - log.warning("Failed to stop nodes.", e); - } - - CountDownLatch latch = new CountDownLatch(1); - - assertTrue(checkLatch.compareAndSet(null, latch)); - - if (!stop.get()) { - log.info("Wait for event check."); - - assertTrue(latch.await(1, MINUTES)); - } - } - - return null; - } - }); - - final Map<Integer, Integer> vals = new HashMap<>(); - - final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>(); - - try { - long stopTime = System.currentTimeMillis() + 60_000; - - final int PARTS = qryCln.affinity(null).partitions(); - - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - while (System.currentTimeMillis() < stopTime) { - Integer key = rnd.nextInt(PARTS); - - Integer prevVal = vals.get(key); - Integer val = vals.get(key); - - if (val == null) - val = 0; - else - val = val + 1; - - if (processorPut && prevVal != null) { - if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) { - try (Transaction tx = qryCln.transactions().txStart()) { - qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() { - @Override public Void process(MutableEntry<Object, Object> e, - Object... arg) throws EntryProcessorException { - e.setValue(arg[0]); - - return null; - } - }, val); - - tx.commit(); - } - catch (CacheException | ClusterTopologyException e) { - log.warning("Failed put. [Key=" + key + ", val=" + val + "]"); - - continue; - } - } - else - qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() { - @Override public Void process(MutableEntry<Object, Object> e, - Object... arg) throws EntryProcessorException { - e.setValue(arg[0]); - - return null; - } - }, val); - } - else { - if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) { - try (Transaction tx = qryCln.transactions().txStart()) { - qryClnCache.put(key, val); - - tx.commit(); - } - catch (CacheException | ClusterTopologyException e) { - log.warning("Failed put. [Key=" + key + ", val=" + val + "]"); - - continue; - } - } - else - qryClnCache.put(key, val); - } - - processorPut = !processorPut; - - vals.put(key, val); - - List<T2<Integer, Integer>> keyEvts = expEvts.get(key); - - if (keyEvts == null) { - keyEvts = new ArrayList<>(); - - expEvts.put(key, keyEvts); - } - - keyEvts.add(new T2<>(val, prevVal)); - - CountDownLatch latch = checkLatch.get(); - - if (latch != null) { - log.info("Check events."); - - checkLatch.set(null); - - boolean success = false; - - try { - if (err) - break; - - boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return checkEvents(false, expEvts, lsnr); - } - }, 10_000); - - if (!check) - assertTrue(checkEvents(true, expEvts, lsnr)); - - success = true; - - log.info("Events checked."); - } - finally { - if (!success) - err = true; - - latch.countDown(); - } - } - } - } - finally { - stop.set(true); - } - - CountDownLatch latch = checkLatch.get(); - - if (latch != null) - latch.countDown(); - - restartFut.get(); - - boolean check = true; - - if (!expEvts.isEmpty()) - check = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return checkEvents(false, expEvts, lsnr); - } - }, 10_000); - - if (!check) - assertTrue(checkEvents(true, expEvts, lsnr)); - - cur.close(); - - assertFalse("Unexpected error during test, see log for details.", err); - } - - /** - * @throws Exception If failed. - */ public void testFailoverStartStopBackup() throws Exception { failoverStartStopFilter(2); } @@ -1828,18 +1609,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC startGrid(idx); - awaitPartitionMapExchange(); - - Thread.sleep(100); + Thread.sleep(300); try { log.info("Stop node: " + idx); stopGrid(idx); - awaitPartitionMapExchange(); - - Thread.sleep(100); + Thread.sleep(300); } catch (Exception e) { log.warning("Failed to stop nodes.", e); @@ -1848,13 +1625,15 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() { @Override public void run() { try { - GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - int size = 0; + int size0 = 0; + + for (List<T3<Object, Object, Object>> evt : expEvts) + size0 += evt.size(); - for (List<T3<Object, Object, Object>> evt : expEvts) - size += evt.size(); + final int size = size0; + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { return lsnr.size() <= size; } }, 2000L); @@ -1885,7 +1664,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC assertTrue(checkBarrier.compareAndSet(null, bar)); if (!stop.get() && !err) - bar.await(5, MINUTES); + bar.await(1, MINUTES); } return null;
