Merge branch 'ignite-5075-cc' into ignite-5075-cc-debug # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/529dec10 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/529dec10 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/529dec10 Branch: refs/heads/ignite-5075-cc-debug Commit: 529dec1018290fb9d05890150f7dd7ca410902e5 Parents: 8ee752d Author: sboikov <[email protected]> Authored: Thu May 25 13:21:12 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 25 13:21:12 2017 +0300 ---------------------------------------------------------------------- .../CacheContinuousQueryEventBuffer.java | 11 - .../continuous/CacheContinuousQueryHandler.java | 259 +------------------ .../CacheContinuousQueryPartitionRecovery.java | 48 +++- 3 files changed, 46 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/529dec10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index 6295b0b..59b92eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -87,17 +87,6 @@ public class CacheContinuousQueryEventBuffer { return null; } - /** */ - private final int part; - - public CacheContinuousQueryEventBuffer() { - part = 0; - } - - public CacheContinuousQueryEventBuffer(int part) { - this.part = part; - } - /** * @return Initial partition counter. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/529dec10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 2f88827..72fdd83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -947,264 +947,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler buf = oldBuf; } - return buf.processEntry(e); - } - - /** - * - */ - private static class PartitionRecovery { - /** Event which means hole in sequence. */ - private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); - - /** */ - private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE; - - /** */ - private IgniteLogger log; - - /** */ - private long lastFiredEvt; - - /** */ - private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE; - - /** */ - private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); - - /** - * @param log Logger. - * @param topVer Topology version. - * @param initCntr Update counters. - */ - PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) { - this.log = log; - - if (initCntr != null) { - assert topVer.topologyVersion() > 0 : topVer; - - this.lastFiredEvt = initCntr; - - curTop = topVer; - } - } - - /** - * Resets cached topology. - */ - void resetTopologyCache() { - curTop = AffinityTopologyVersion.NONE; - } - - /** - * Add continuous entry. - * - * @param cctx Cache context. - * @param cache Cache. - * @param entry Cache continuous query entry. - * @return Collection entries which will be fired. This collection should contains only non-filtered events. - */ - <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries( - CacheContinuousQueryEntry entry, - GridCacheContext cctx, - IgniteCache cache - ) { - assert entry != null; - - if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. - assert entry.updateCounter() == 0L : entry; - - return F.<CacheEntryEvent<? extends K, ? extends V>> - asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); - } - - List<CacheEntryEvent<? extends K, ? extends V>> entries; - - synchronized (pendingEvts) { - if (log.isDebugEnabled()) { - log.debug("Handling event [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - } - - // Received first event. - if (curTop == AffinityTopologyVersion.NONE) { - lastFiredEvt = entry.updateCounter(); - - TestDebugLog.addEntryMessage(entry.partition(), - entry.updateCounter(), - "collect first cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion()); - - curTop = entry.topologyVersion(); - - if (log.isDebugEnabled()) { - log.debug("First event [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + ']'); - } - - return !entry.isFiltered() ? - F.<CacheEntryEvent<? extends K, ? extends V>> - asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) : - Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); - } - - if (curTop.compareTo(entry.topologyVersion()) < 0) { - if (entry.updateCounter() == 1L && !entry.isBackup()) { - entries = new ArrayList<>(pendingEvts.size()); - - for (CacheContinuousQueryEntry evt : pendingEvts.values()) { - if (evt != HOLE && !evt.isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt)); - } - - pendingEvts.clear(); - - curTop = entry.topologyVersion(); - - lastFiredEvt = entry.updateCounter(); - - TestDebugLog.addEntryMessage(entry.partition(), - entry.updateCounter(), - "collect for lost topVer cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion()); - - if (!entry.isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); - - if (log.isDebugEnabled()) - log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - - return entries; - } - - curTop = entry.topologyVersion(); - } - - // Check duplicate. - if (entry.updateCounter() > lastFiredEvt) { - TestDebugLog.addEntryMessage(entry.partition(), - entry.updateCounter(), - "add event last=" + lastFiredEvt + - " cntr=" + entry.updateCounter() + - " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false)) + - " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false)) + - " topVer=" + entry.topologyVersion()); - - pendingEvts.put(entry.updateCounter(), entry); - } - else { - if (log.isDebugEnabled()) - log.debug("Skip duplicate continuous query message: " + entry); - - TestDebugLog.addEntryMessage(entry.partition(), - entry.updateCounter(), - "skip duplicate last=" + lastFiredEvt + - " cntr=" + entry.updateCounter() + - " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false)) + - " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false)) + - " topVer=" + entry.topologyVersion()); - - return Collections.emptyList(); - } - - if (pendingEvts.isEmpty()) { - if (log.isDebugEnabled()) { - log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + ']'); - } - - return Collections.emptyList(); - } - - Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator(); - - entries = new ArrayList<>(); - - if (pendingEvts.size() >= MAX_BUFF_SIZE) { - if (log.isDebugEnabled()) { - log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - } - - LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() + - ", bufSize=" + MAX_BUFF_SIZE + - ", partId=" + entry.partition() + ']'); - - for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) { - Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); - - if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); - - lastFiredEvt = e.getKey(); - - iter.remove(); - } - } - else { - while (iter.hasNext()) { - Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); - - CacheContinuousQueryEntry pending = e.getValue(); - - long filtered = pending.filteredCount(); - - boolean fire = e.getKey() == lastFiredEvt + 1;; - - if (!fire && filtered > 0) - fire = e.getKey() - filtered <= lastFiredEvt + 1; - - if (fire) { - TestDebugLog.addEntryMessage(entry.partition(), - entry.updateCounter(), - "process last=" + lastFiredEvt + - " cntr=" + e.getKey() + - " key=" + (pending.isFiltered() ? "filtered" : pending.key().value(cctx.cacheObjectContext(), false)) + - " val=" + (pending.isFiltered() ? "filtered" : pending.value().value(cctx.cacheObjectContext(), false)) + - " topVer=" + e.getValue().topologyVersion() + - " f=" + pending.filteredCount()); - - lastFiredEvt = e.getKey(); - - if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); - - iter.remove(); - } - else { - TestDebugLog.addEntryMessage(entry.partition(), - entry.updateCounter(), - "stop process last=" + lastFiredEvt + " cntr=" + e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + pending.filteredCount()); - - break; - } - } - } - } - - if (log.isDebugEnabled()) { - log.debug("Will send to listener the following events [entries=" + entries + - ", lastFiredEvt=" + lastFiredEvt + - ", curTop=" + curTop + - ", entUpdCnt=" + entry.updateCounter() + - ", partId=" + entry.partition() + - ", pendingEvts=" + pendingEvts + ']'); - } - - return entries; - } + return buf; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/529dec10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java index 534ce9c..12eaa20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java @@ -31,8 +31,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.spi.communication.tcp.TestDebugLog; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE; + /** * */ @@ -41,7 +44,7 @@ class CacheContinuousQueryPartitionRecovery { private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); /** */ - private final static int MAX_BUFF_SIZE = CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE; + private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE; /** */ private IgniteLogger log; @@ -116,6 +119,10 @@ class CacheContinuousQueryPartitionRecovery { if (curTop == AffinityTopologyVersion.NONE) { lastFiredEvt = entry.updateCounter(); + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "collect first cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion()); + curTop = entry.topologyVersion(); if (log.isDebugEnabled()) { @@ -146,6 +153,10 @@ class CacheContinuousQueryPartitionRecovery { lastFiredEvt = entry.updateCounter(); + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "collect for lost topVer cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion()); + if (!entry.isFiltered()) entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); @@ -163,12 +174,29 @@ class CacheContinuousQueryPartitionRecovery { } // Check duplicate. - if (entry.updateCounter() > lastFiredEvt) + if (entry.updateCounter() > lastFiredEvt) { + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "add event last=" + lastFiredEvt + + " cntr=" + entry.updateCounter() + + " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false)) + + " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false)) + + " topVer=" + entry.topologyVersion()); + pendingEvts.put(entry.updateCounter(), entry); + } else { if (log.isDebugEnabled()) log.debug("Skip duplicate continuous query message: " + entry); + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "skip duplicate last=" + lastFiredEvt + + " cntr=" + entry.updateCounter() + + " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false)) + + " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false)) + + " topVer=" + entry.topologyVersion()); + return Collections.emptyList(); } @@ -225,6 +253,15 @@ class CacheContinuousQueryPartitionRecovery { fire = e.getKey() - filtered <= lastFiredEvt + 1; if (fire) { + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "process last=" + lastFiredEvt + + " cntr=" + e.getKey() + + " key=" + (pending.isFiltered() ? "filtered" : pending.key().value(cctx.cacheObjectContext(), false)) + + " val=" + (pending.isFiltered() ? "filtered" : pending.value().value(cctx.cacheObjectContext(), false)) + + " topVer=" + e.getValue().topologyVersion() + + " f=" + pending.filteredCount()); + lastFiredEvt = e.getKey(); if (e.getValue() != HOLE && !e.getValue().isFiltered()) @@ -232,8 +269,13 @@ class CacheContinuousQueryPartitionRecovery { iter.remove(); } - else + else { + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "stop process last=" + lastFiredEvt + " cntr=" + e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + pending.filteredCount()); + break; + } } } }
