cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/460ba11a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/460ba11a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/460ba11a Branch: refs/heads/ignite-5075-cc-debug Commit: 460ba11adfdc49f98ba8c87c7e1390f17f720d7d Parents: 8adfe7d Author: sboikov <[email protected]> Authored: Thu May 25 09:27:42 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 25 09:50:49 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 7 ++-- .../CacheContinuousQueryEventBuffer.java | 36 +++++++++++++++++++- .../continuous/CacheContinuousQueryHandler.java | 12 +++++-- ...ContinuousQueryFailoverAbstractSelfTest.java | 14 +++++--- 4 files changed, 59 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 569d638..8a7cb95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1845,9 +1845,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject evtOldVal = cctx.unwrapTemporary(oldVal); if (primary) - TestDebugLog.addEntryMessage(partition(), evtVal.value(cctx.cacheObjectContext(), false), "primary notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false)); + TestDebugLog.addEntryMessage(partition(), topVer, // evtVal.value(cctx.cacheObjectContext(), false) + "primary notify cntr=" + c.updateRes.updateCounter() + + " k=" + key.value(null, false)); else - TestDebugLog.addEntryMessage(key.value(null, false), evtVal.value(cctx.cacheObjectContext(), false), "backup notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false)); + TestDebugLog.addEntryMessage(key.value(null, false), topVer, + "backup notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false)); cctx.continuousQueries().onEntryUpdated(lsnrs, key, http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index a308e39..b1bc7b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.spi.communication.tcp.TestDebugLog; import org.jetbrains.annotations.Nullable; /** @@ -41,6 +42,17 @@ public class CacheContinuousQueryEventBuffer { /** */ private ConcurrentSkipListMap<Long, Object> pending = new ConcurrentSkipListMap<>(); + /** */ + private final int part; + + public CacheContinuousQueryEventBuffer() { + part = 0; + } + + public CacheContinuousQueryEventBuffer(int part) { + this.part = part; + } + /** * @return Initial partition counter. */ @@ -88,6 +100,12 @@ public class CacheContinuousQueryEventBuffer { if (batch == null || cntr < batch.startCntr) { assert entry != null : cntr; + TestDebugLog.addEntryMessage(part, + cntr, + "buffer rcd small start=" + batch.startCntr + + " cntr=" + cntr + + " topVer=" + ((CacheContinuousQueryEntry)entry).topologyVersion()); + return entry; } @@ -95,8 +113,15 @@ public class CacheContinuousQueryEventBuffer { if (cntr <= batch.endCntr) res = batch.processEvent0(null, cntr, entry); - else + else { + TestDebugLog.addEntryMessage(part, + cntr, + "buffer add pending start=" + batch.startCntr + + " cntr=" + cntr + + " topVer=" + ((CacheContinuousQueryEntry)entry).topologyVersion()); + pending.put(cntr, entry); + } Batch batch0 = curBatch.get(); @@ -128,6 +153,8 @@ public class CacheContinuousQueryEventBuffer { if (curCntr == -1) return null; + TestDebugLog.addEntryMessage(part, curCntr, "created batch"); + batch = new Batch(curCntr + 1, 0L, new Object[BUF_SIZE]); if (curBatch.compareAndSet(null, batch)) @@ -205,6 +232,13 @@ public class CacheContinuousQueryEventBuffer { int pos = (int)(cntr - startCntr); synchronized (this) { + TestDebugLog.addEntryMessage(part, + cntr, + "buffer process start=" + startCntr + + ", lastProc=" + lastProc + + " pos=" + pos + + " topVer=" + ((CacheContinuousQueryEntry)evt).topologyVersion()); + evts[pos] = evt; int next = lastProc + 1; http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 01fe5e7..3982815 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -775,8 +775,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue; - if (backupQueue0 != null) + if (backupQueue0 != null) { + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "add backup " + + " topVer=" + entry.topologyVersion()); + backupQueue0.add(entry.forBackupQueue()); + } } return notify; @@ -948,7 +954,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (buf == null) { final int part = e.partition(); - buf = new CacheContinuousQueryEventBuffer() { + buf = new CacheContinuousQueryEventBuffer(part) { @Override protected long currentPartitionCounter() { GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false); @@ -1182,7 +1188,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler boolean fire = e.getKey() == lastFiredEvt + 1;; if (!fire && filtered > 0) - fire = e.getKey() - filtered <= lastFiredEvt; + fire = e.getKey() - filtered <= lastFiredEvt + 1; if (fire) { TestDebugLog.addEntryMessage(entry.partition(), http://git-wip-us.apache.org/repos/asf/ignite/blob/460ba11a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 050af5d..447a203 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -1220,15 +1220,15 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC } } - if (!lostAllow && lostEvts.size() > 100) { + if (!lostAllow && lostEvts.size() > 50) { log.error("Lost event cnt: " + lostEvts.size()); for (T3<Object, Object, Object> e : lostEvts) { - log.error("Lost event: " + ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()) + " " + e); + log.error("Lost event: " + ignite(4).affinity(DEFAULT_CACHE_NAME).partition(e.get1()) + " " + e); - TestDebugLog.addEntryMessage(ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()), e.get2(), "lost event " + e.get1() + " " + e.get2()); + TestDebugLog.addEntryMessage(ignite(4).affinity(DEFAULT_CACHE_NAME).partition(e.get1()), e.get2(), "lost event " + e.get1() + " " + e.get2()); - TestDebugLog.printKeyMessages(true, ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1())); + TestDebugLog.printKeyMessages(true, ignite(4).affinity(DEFAULT_CACHE_NAME).partition(e.get1())); System.exit(1); } @@ -1598,6 +1598,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC for (int i = 0; i < 10; i++) { final int idx = i % (SRV_NODES - 1); + TestDebugLog.addMessage("Stop node: " + idx); + log.info("Stop node: " + idx); stopGrid(idx); @@ -1609,6 +1611,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC for (int j = 0; j < aff.partitions(); j++) { Integer oldVal = (Integer)qryClnCache.get(j); + TestDebugLog.addEntryMessage(ignite(4).affinity(DEFAULT_CACHE_NAME).partition(j), i, "do put " + j + " " + i); + qryClnCache.put(j, i); afterRestEvts.add(new T3<>((Object)j, (Object)i, (Object)oldVal)); @@ -1616,6 +1620,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC checkEvents(new ArrayList<>(afterRestEvts), lsnr, false); + TestDebugLog.addMessage("Start node: " + idx); + log.info("Start node: " + idx); startGrid(idx);
