IGNITE-2822 Fixed not effective approach.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da47901f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da47901f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da47901f Branch: refs/heads/ignite-2630 Commit: da47901f07fd8c7fa47b3238bb37e083c93dfdc4 Parents: 21f5d0f Author: Tikhonov Nikolay <tikhonovnico...@gmail.com> Authored: Wed Apr 6 19:29:24 2016 +0300 Committer: Tikhonov Nikolay <tikhonovnico...@gmail.com> Committed: Wed Apr 6 19:29:24 2016 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryHandler.java | 84 ++++++++------------ 1 file changed, 33 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/da47901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 767697a..3576424 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -363,27 +363,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (primary || skipPrimaryCheck) { if (loc) { if (!locCache) { - Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx, entry); + Collection<CacheEntryEvent<? extends K, ? extends V>> entries = handleEvent(ctx, entry); if (!entries.isEmpty()) { - final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - - Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries, - new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { - @Override public CacheEntryEvent<? extends K, ? extends V> apply( - CacheContinuousQueryEntry e) { - return new CacheContinuousQueryEvent<>(cache, cctx, e); - } - }, - new IgnitePredicate<CacheContinuousQueryEntry>() { - @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.isFiltered(); - } - } - ); - - if (!F.isEmpty(evts)) - locLsnr.onUpdated(evts); + locLsnr.onUpdated(entries); if (!internal && !skipPrimaryCheck) sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); @@ -607,7 +590,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler final GridCacheContext cctx = cacheContext(ctx); - Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>(); + Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(); for (CacheContinuousQueryEntry e : entries) { GridCacheDeploymentManager depMgr = cctx.deploy(); @@ -636,24 +619,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - - if (!entries0.isEmpty()) { - Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, - new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { - @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { - return new CacheContinuousQueryEvent<>(cache, cctx, e); - } - }, - new IgnitePredicate<CacheContinuousQueryEntry>() { - @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.isFiltered(); - } - } - ); - - locLsnr.onUpdated(evts); - } + if (!entries0.isEmpty()) + locLsnr.onUpdated(entries0); } /** @@ -661,24 +628,30 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param e entry. * @return Entry collection. */ - private Collection<CacheContinuousQueryEntry> handleEvent(GridKernalContext ctx, + private Collection<CacheEntryEvent<? extends K, ? extends V>> handleEvent(GridKernalContext ctx, CacheContinuousQueryEntry e) { assert e != null; + GridCacheContext<K, V> cctx = cacheContext(ctx); + + final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); + if (internal) { if (e.isFiltered()) return Collections.emptyList(); else - return F.asList(e); + return F.<CacheEntryEvent<? extends K, ? extends V>>asList( + new CacheContinuousQueryEvent<K, V>(cache, cctx, e)); } // Initial query entry or evicted entry. These events should be fired immediately. if (e.updateCounter() == -1L) - return F.asList(e); + return F.<CacheEntryEvent<? extends K, ? extends V>>asList( + new CacheContinuousQueryEvent<K, V>(cache, cctx, e)); PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition()); - return rec.collectEntries(e); + return rec.collectEntries(cctx, cache, e); } /** @@ -802,19 +775,24 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** * Add continuous entry. * + * @param cctx Cache context. + * @param cache Cache. * @param entry Cache continuous query entry. - * @return Collection entries which will be fired. + * @return Collection entries which will be fired. This collection should contains only non-filtered events. */ - public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) { + public <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(GridCacheContext cctx, + IgniteCache cache, + CacheContinuousQueryEntry entry) { assert entry != null; if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. assert entry.updateCounter() == 0L : entry; - return F.asList(entry); + return F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); } - List<CacheContinuousQueryEntry> entries; + List<CacheEntryEvent<? extends K, ? extends V>> entries; synchronized (pendingEvts) { // Received first event. @@ -823,7 +801,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler curTop = entry.topologyVersion(); - return F.asList(entry); + return !entry.isFiltered() ? + F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) : + Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); } if (curTop.compareTo(entry.topologyVersion()) < 0) { @@ -832,7 +813,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler for (CacheContinuousQueryEntry evt : pendingEvts.values()) { if (evt != HOLE && !evt.isFiltered()) - entries.add(evt); + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt)); } pendingEvts.clear(); @@ -841,7 +822,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler lastFiredEvt = entry.updateCounter(); - entries.add(entry); + if (!entry.isFiltered()) + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); return entries; } @@ -880,7 +862,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(e.getValue()); + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); lastFiredEvt = e.getKey(); @@ -896,7 +878,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler ++lastFiredEvt; if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(e.getValue()); + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); iter.remove(); }