ignite-1.5 Fixed CacheContinuousQueryFailoverAbstractSelfTest to do not hang in case of errors.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4291edca Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4291edca Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4291edca Branch: refs/heads/ignite-2100 Commit: 4291edca326be4eafde331001238a9181333fbfc Parents: 72e5b9a Author: sboikov <[email protected]> Authored: Mon Dec 14 12:01:51 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 14 12:01:51 2015 +0300 ---------------------------------------------------------------------- ...ContinuousQueryFailoverAbstractSelfTest.java | 186 ++++++++++--------- 1 file changed, 100 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4291edca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 08e8adb..5a4ba14 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -333,7 +333,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC List<Integer> keys = testKeys(grid(0).cache(null), 10); for (Integer key : keys) { - IgniteCache cache = null; + IgniteCache<Object, Object> cache = null; if (rnd.nextBoolean()) cache = qryClient.cache(null); @@ -462,7 +462,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC assert lsnr.evts.isEmpty(); - QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry); + QueryCursor<Cache.Entry<Object, Object>> qryCur = clnCache.query(qry); Map<Object, T2<Object, Object>> updates = new HashMap<>(); @@ -505,7 +505,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC checkEvents(expEvts, lsnr, false); - query.close(); + qryCur.close(); } } @@ -538,7 +538,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC IgniteCache<Object, Object> clnCache = qryClient.cache(null); - QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry); + QueryCursor<Cache.Entry<Object, Object>> qryCur = clnCache.query(qry); Ignite igniteSrv = ignite(0); @@ -663,7 +663,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC checkEvents(expEvts, lsnr, false); - query.close(); + qryCur.close(); } /** @@ -992,8 +992,10 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC * @param expEvts Expected events. * @param lsnr Listener. * @param lostAllow If {@code true} than won't assert on lost events. + * @throws Exception If failed. */ - private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr, + private void checkEvents(final List<T3<Object, Object, Object>> expEvts, + final CacheEventListener2 lsnr, boolean lostAllow) throws Exception { checkEvents(expEvts, lsnr, lostAllow, true); } @@ -1002,6 +1004,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC * @param expEvts Expected events. * @param lsnr Listener. * @param lostAllow If {@code true} than won't assert on lost events. + * @param wait Wait flag. + * @throws Exception If failed. */ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr, boolean lostAllow, boolean wait) throws Exception { @@ -1017,7 +1021,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet()) prevMap.put(e.getKey(), new ArrayList<>(e.getValue())); - List<T3<Object, Object, Object>> lostEvents = new ArrayList<>(); + List<T3<Object, Object, Object>> lostEvts = new ArrayList<>(); for (T3<Object, Object, Object> exp : expEvts) { List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1()); @@ -1026,7 +1030,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC continue; if (rcvdEvts == null || rcvdEvts.isEmpty()) { - lostEvents.add(exp); + lostEvts.add(exp); continue; } @@ -1050,7 +1054,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC // Lost event is acceptable. if (!found) - lostEvents.add(exp); + lostEvts.add(exp); } boolean dup = false; @@ -1062,11 +1066,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC for (CacheEntryEvent<?, ?> e : evts) { boolean found = false; - for (T3<Object, Object, Object> lostEvt : lostEvents) { + for (T3<Object, Object, Object> lostEvt : lostEvts) { if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) { found = true; - lostEvents.remove(lostEvt); + lostEvts.remove(lostEvt); break; } @@ -1091,16 +1095,16 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC } } - if (!lostAllow && lostEvents.size() > 100) { - log.error("Lost event cnt: " + lostEvents.size()); + if (!lostAllow && lostEvts.size() > 100) { + log.error("Lost event cnt: " + lostEvts.size()); - for (T3<Object, Object, Object> e : lostEvents) + for (T3<Object, Object, Object> e : lostEvts) log.error("Lost event: " + e); fail("Lose events, see log for details."); } - log.error("Lost event cnt: " + lostEvents.size()); + log.error("Lost event cnt: " + lostEvts.size()); expEvts.clear(); @@ -1126,8 +1130,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC * @param lsnr Listener. */ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener3 lsnr, - boolean allowLoseEvent) throws Exception { - if (!allowLoseEvent) + boolean allowLoseEvt) throws Exception { + if (!allowLoseEvt) assert GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { return lsnr.evts.size() == expEvts.size(); @@ -1140,11 +1144,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC assertNotNull("No event for key: " + exp.get1(), e); assertEquals("Unexpected value: " + e, exp.get2(), e.getValue()); - if (allowLoseEvent) + if (allowLoseEvt) lsnr.evts.remove(exp.get1()); } - if (allowLoseEvent) + if (allowLoseEvt) assert lsnr.evts.isEmpty(); expEvts.clear(); @@ -1385,17 +1389,17 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC awaitPartitionMapExchange(); - List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>(); + List<T3<Object, Object, Object>> afterRestEvts = new ArrayList<>(); for (int j = 0; j < aff.partitions(); j++) { Integer oldVal = (Integer)qryClnCache.get(j); qryClnCache.put(j, i); - afterRestEvents.add(new T3<>((Object)j, (Object)i, (Object)oldVal)); + afterRestEvts.add(new T3<>((Object)j, (Object)i, (Object)oldVal)); } - checkEvents(new ArrayList<>(afterRestEvents), lsnr, false); + checkEvents(new ArrayList<>(afterRestEvts), lsnr, false); log.info("Start node: " + idx); @@ -1406,9 +1410,10 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC } /** + * @param backups Number of backups. * @throws Exception If failed. */ - public void failoverStartStopFilter(int backups) throws Exception { + private void failoverStartStopFilter(int backups) throws Exception { this.backups = backups; final int SRV_NODES = 4; @@ -1629,22 +1634,22 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC dinLsnr.vals.clear(); } - List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>(); + List<T3<Object, Object, Object>> afterRestEvts = new ArrayList<>(); for (int i = 0; i < qryClient.affinity(null).partitions(); i++) { Integer oldVal = (Integer)qryClnCache.get(i); qryClnCache.put(i, i); - afterRestEvents.add(new T3<>((Object)i, (Object)i, (Object)oldVal)); + afterRestEvts.add(new T3<>((Object)i, (Object)i, (Object)oldVal)); } - checkEvents(new ArrayList<>(afterRestEvents), lsnr, false); + checkEvents(new ArrayList<>(afterRestEvts), lsnr, false); cur.close(); if (dinQry != null) { - checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false); + checkEvents(new ArrayList<>(afterRestEvts), dinLsnr, false); dinQry.close(); } @@ -1695,81 +1700,90 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { - while (!stop.get() && !err) { - final int idx = rnd.nextInt(SRV_NODES); - - log.info("Stop node: " + idx); - - stopGrid(idx); - - Thread.sleep(300); - - GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return qryCln.cluster().nodes().size() == SRV_NODES; - } - }, 5000L); + try { + while (!stop.get() && !err) { + final int idx = rnd.nextInt(SRV_NODES); - try { - log.info("Start node: " + idx); + log.info("Stop node: " + idx); - startGrid(idx); + stopGrid(idx); Thread.sleep(300); GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { - return qryCln.cluster().nodes().size() == SRV_NODES + 1; + return qryCln.cluster().nodes().size() == SRV_NODES; } }, 5000L); - } - catch (Exception e) { - log.warning("Failed to stop nodes.", e); - } - CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() { - @Override public void run() { - try { - int size0 = 0; + try { + log.info("Start node: " + idx); - for (List<T3<Object, Object, Object>> evt : expEvts) - size0 += evt.size(); + startGrid(idx); - final int size = size0; + Thread.sleep(300); - GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return lsnr.size() <= size; - } - }, 2000L); + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return qryCln.cluster().nodes().size() == SRV_NODES + 1; + } + }, 5000L); + } + catch (Exception e) { + log.warning("Failed to stop nodes.", e); + } - List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>(); + CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() { + @Override public void run() { + try { + int size0 = 0; - for (List<T3<Object, Object, Object>> evt : expEvts) - expEvts0.addAll(evt); + for (List<T3<Object, Object, Object>> evt : expEvts) + size0 += evt.size(); - checkEvents(expEvts0, lsnr, false, false); + final int size = size0; - for (List<T3<Object, Object, Object>> evt : expEvts) - evt.clear(); - } - catch (Exception e) { - log.error("Failed.", e); + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr.size() <= size; + } + }, 2000L); - err = true; + List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>(); - stop.set(true); - } - finally { - checkBarrier.set(null); + for (List<T3<Object, Object, Object>> evt : expEvts) + expEvts0.addAll(evt); + + checkEvents(expEvts0, lsnr, false, false); + + for (List<T3<Object, Object, Object>> evt : expEvts) + evt.clear(); + } + catch (Exception e) { + log.error("Failed.", e); + + err = true; + + stop.set(true); + } + finally { + checkBarrier.set(null); + } } - } - }); + }); - assertTrue(checkBarrier.compareAndSet(null, bar)); + assertTrue(checkBarrier.compareAndSet(null, bar)); + + if (!stop.get() && !err) + bar.await(1, MINUTES); + } + } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + err = true; - if (!stop.get() && !err) - bar.await(1, MINUTES); + throw e; } return null; @@ -1803,7 +1817,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC CyclicBarrier bar = checkBarrier.get(); if (bar != null) - bar.await(); + bar.await(1, MINUTES); } } catch (Exception e){ @@ -2159,13 +2173,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>(); /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> events) throws CacheEntryListenerException { - for (CacheEntryEvent<?, ?> e : events) { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) throws CacheEntryListenerException { + for (CacheEntryEvent<?, ?> e : evts) { Integer key = (Integer)e.getKey(); keys.add(key); - assert evts.put(key, e) == null; + assert this.evts.put(key, e) == null; } } @@ -2180,8 +2194,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC */ public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> { /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException { - return ((Integer)event.getValue()) >= 0; + @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException { + return ((Integer)evt.getValue()) >= 0; } }
