Repository: ignite Updated Branches: refs/heads/ignite-426-2-reb 9dd18c735 -> 0834da6de
IGNITE-426 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0834da6d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0834da6d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0834da6d Branch: refs/heads/ignite-426-2-reb Commit: 0834da6ded01135a4b50a143f76892bca69059e4 Parents: 9dd18c7 Author: nikolay_tikhonov <[email protected]> Authored: Mon Nov 2 21:03:23 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon Nov 2 21:03:23 2015 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryHandler.java | 87 ++++++++++------- ...acheContinuousQueryFailoverAbstractTest.java | 98 +++++++++++++++----- 2 files changed, 128 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0834da6d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index cb0ba5a..1df5963 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -29,12 +29,13 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; import java.util.TreeMap; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; @@ -54,7 +55,6 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; -import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; @@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; +import org.apache.ignite.internal.util.GridConcurrentSkipListSet; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; @@ -752,10 +753,27 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { */ private static class HoleBuffer { /** */ - private final TreeSet<Long> buf = new TreeSet<>(); + private final NavigableSet<Long> buf = new GridConcurrentSkipListSet<>(); /** */ - private long lastFiredEvt; + private AtomicLong lastFiredCntr = new AtomicLong(); + + /** + * @param newVal New value. + * @return Old value if previous value less than new value otherwise {@code -1}. + */ + private long setLastFiredCounter(long newVal) { + long prevVal = lastFiredCntr.get(); + + while (prevVal < newVal) { + if (lastFiredCntr.compareAndSet(prevVal, newVal)) + return prevVal; + else + prevVal = lastFiredCntr.get(); + } + + return prevVal >= newVal ? -1 : prevVal; + } /** * Add continuous entry. @@ -766,50 +784,51 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) { assert e != null; - synchronized (buf) { - // Handle filtered events. - if (e.isFiltered()) { - if (lastFiredEvt > e.updateIndex() || e.updateIndex() == 1) - return e; - + if (e.isFiltered()) { + if (lastFiredCntr.get() > e.updateIndex() || e.updateIndex() == 1) + return e; + else { buf.add(e.updateIndex()); - return null; + // Double check. If another thread sent a event with counter higher than this event. + if (lastFiredCntr.get() > e.updateIndex() && buf.contains(e.updateIndex())) { + buf.remove(e.updateIndex()); + + return e; + } + else + return null; } + } + else { + long prevVal = setLastFiredCounter(e.updateIndex()); + + if (prevVal == -1) + return e; else { - if (lastFiredEvt < e.updateIndex()) - lastFiredEvt = e.updateIndex(); + NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, e.updateIndex(), true); - // Doesn't have filtered and delayed events. - if (buf.isEmpty() || buf.first() > e.updateIndex()) - return e; - else { - GridLongList filteredEvts = new GridLongList(buf.size()); + GridLongList filteredEvts = new GridLongList(10); - int size = 0; + int size = 0; - Iterator<Long> iter = buf.iterator(); + Iterator<Long> iter = prevHoles.iterator(); - while (iter.hasNext()) { - long idx = iter.next(); + while (iter.hasNext()) { + long idx = iter.next(); - if (idx < e.updateIndex()) { - filteredEvts.add(idx); + filteredEvts.add(idx); - iter.remove(); + iter.remove(); - ++size; - } - else - break; - } + ++size; + } - filteredEvts.truncate(size, true); + filteredEvts.truncate(size, true); - e.filteredEvents(filteredEvts); + e.filteredEvents(filteredEvts); - return e; - } + return e; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0834da6d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index b31b842..95781e0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -30,9 +30,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; @@ -91,7 +88,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; -import org.eclipse.jetty.util.ConcurrentHashSet; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -900,17 +896,25 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo */ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr, boolean lostAllow) throws Exception { - boolean b = GridTestUtils.waitForCondition(new PA() { + GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { return expEvts.size() == lsnr.size(); } }, 2000L); + Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size()); + + for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet()) + prevMap.put(e.getKey(), new ArrayList<>(e.getValue())); + List<T3<Object, Object, Object>> lostEvents = new ArrayList<>(); for (T3<Object, Object, Object> exp : expEvts) { List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1()); + if (F.eq(exp.get2(), exp.get3())) + continue; + if (rcvdEvts == null || rcvdEvts.isEmpty()) { lostEvents.add(exp); @@ -949,8 +953,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo boolean found = false; for (T3<Object, Object, Object> lostEvt : lostEvents) { - if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2()) - /*&& equalOldValue(e, lostEvt)*/) { + if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) { found = true; lostEvents.remove(lostEvt); @@ -972,12 +975,20 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo for (T3<Object, Object, Object> e : lostEvents) log.error("Lost event: " + e); - for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) - if (!e.isEmpty()) - log.error("Duplicate event: " + e); - } + for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) { + if (!e.isEmpty()) { + for (CacheEntryEvent<?, ?> event : e) { + List<CacheEntryEvent<?, ?>> entries = new ArrayList<>(); - assertFalse("Received duplicate events, see log for details.", !lostEvents.isEmpty()); + for (CacheEntryEvent<?, ?> ev0 : prevMap.get(event.getKey())) { + if (F.eq(event.getValue(), ev0.getValue()) && F.eq(event.getOldValue(), + ev0.getOldValue())) + entries.add(ev0); + } + } + } + } + } } if (!lostAllow && !lostEvents.isEmpty()) { @@ -1736,19 +1747,23 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo log.info("Stop node: " + idx); + awaitPartitionMapExchange(); + + Thread.sleep(400); + stopGrid(idx); awaitPartitionMapExchange(); - Thread.sleep(200); + Thread.sleep(400); log.info("Start node: " + idx); startGrid(idx); - CountDownLatch latch = new CountDownLatch(1); + Thread.sleep(200); - awaitPartitionMapExchange(); + CountDownLatch latch = new CountDownLatch(1); assertTrue(checkLatch.compareAndSet(null, latch)); @@ -1968,7 +1983,10 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final int PARTS = THREAD; - final List<T3<Object, Object, Object>> expEvts = new CopyOnWriteArrayList<>(); + final List<List<T3<Object, Object, Object>>> expEvts = new ArrayList<>(THREAD + 5); + + for (int i = 0; i < THREAD; i++) + expEvts.add(i, new ArrayList<T3<Object, Object, Object>>()); final AtomicReference<CyclicBarrier> checkBarrier = new AtomicReference<>(); @@ -2001,7 +2019,26 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() { @Override public void run() { try { - checkEvents(expEvts, lsnr, false); + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + int size = 0; + + for (List<T3<Object, Object, Object>> evt : expEvts) + size += evt.size(); + + return lsnr.size() <= size; + } + }, 2000L); + + List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>(); + + for (List<T3<Object, Object, Object>> evt : expEvts) + expEvts0.addAll(evt); + + checkEvents(expEvts0, lsnr, false); + + for (List<T3<Object, Object, Object>> evt : expEvts) + evt.clear(); } catch (Exception e) { log.error("Failed.", e); @@ -2018,8 +2055,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo assertTrue(checkBarrier.compareAndSet(null, bar)); - if (stop.get() && !err) - bar.await(5, SECONDS); + if (!stop.get() && !err) + bar.await(5, MINUTES); } return null; @@ -2030,11 +2067,17 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final AtomicInteger valCntr = new AtomicInteger(0); - GridTestUtils.runMultiThreaded(new Runnable() { - final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final AtomicInteger threadSeq = new AtomicInteger(0); + GridTestUtils.runMultiThreaded(new Runnable() { @Override public void run() { try { + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + final int threadId = threadSeq.getAndIncrement(); + + log.error("Thread id: " + threadId); + while (System.currentTimeMillis() < stopTime && !stop.get() && !err) { Integer key = rnd.nextInt(PARTS); @@ -2042,7 +2085,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo Integer prevVal = (Integer)qryClnCache.getAndPut(key, val); - expEvts.add(new T3<>((Object)key, (Object)val, (Object)prevVal)); + expEvts.get(threadId).add(new T3<>((Object)key, (Object)val, (Object)prevVal)); CyclicBarrier bar = checkBarrier.get(); @@ -2065,7 +2108,16 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo restartFut.get(); - checkEvents(expEvts, lsnr, true); + List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>(); + + for (List<T3<Object, Object, Object>> evt : expEvts) { + expEvts0.addAll(evt); + + evt.clear(); + } + + if (!expEvts0.isEmpty()) + checkEvents(expEvts0, lsnr, true); cur.close();
