Repository: ignite Updated Branches: refs/heads/ignite-5075-cc e36163fc4 -> 01f45c1b8
cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01f45c1b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01f45c1b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01f45c1b Branch: refs/heads/ignite-5075-cc Commit: 01f45c1b8e2dcb2a5eea0596a55c434f682cc60b Parents: e36163f Author: sboikov <[email protected]> Authored: Fri May 26 07:01:40 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri May 26 07:01:40 2017 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 16 +- .../CacheContinuousQueryEventBuffer.java | 150 ++++++++++++------- .../continuous/CacheContinuousQueryHandler.java | 4 +- 3 files changed, 107 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/01f45c1b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 544f847..8b8c87c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -789,14 +789,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null; - //todo check for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId())) continue; if (topChanged) { - cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion()); - // Partition release future is done so we can flush the write-behind store. cacheCtx.store().forceFlush(); } @@ -1101,10 +1098,23 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } + private boolean serverNotDiscoveryEvent() { + return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode()); + } + /** {@inheritDoc} */ @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) { boolean realExchange = !dummy && !forcePreload; + if (realExchange && !cctx.kernalContext().clientNode() && (serverNotDiscoveryEvent() || affChangeMsg != null)) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion()); + } + } + if (err == null && realExchange) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) http://git-wip-us.apache.org/repos/asf/ignite/blob/01f45c1b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index fd4029c..c59b851 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -40,6 +40,9 @@ public class CacheContinuousQueryEventBuffer { IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 5); /** */ + private static final Object RETRY = new Object(); + + /** */ protected final int part; /** */ @@ -75,32 +78,42 @@ public class CacheContinuousQueryEventBuffer { /** * @return Backup entries. */ - @Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() { - Collection<CacheContinuousQueryEntry> ret; + @Nullable Collection<CacheContinuousQueryEntry> flushOnExchange() { + Collection<CacheContinuousQueryEntry> ret = null; - List<CacheContinuousQueryEntry> entries = null; + for (;;) { + Batch batch = curBatch.get(); - Batch batch = curBatch.get(); + if (batch != null) { + Collection<CacheContinuousQueryEntry> ret0 = batch.flushAndReset(); - if (batch != null) - entries = batch.backupFlushEntries(); + if (ret0 != null) { + if (ret == null) + ret = ret0; + else + ret.addAll(ret0); + } + } - if (!backupQ.isEmpty()) { - if (entries != null) - backupQ.addAll(entries); + if (!backupQ.isEmpty()) { + if (ret == null) + ret = new ArrayList<>(); - ret = this.backupQ; + CacheContinuousQueryEntry e; - backupQ = new ConcurrentLinkedDeque<>(); - } - else - ret = entries; + while ((e = backupQ.pollFirst()) != null) + ret.add(e); + } - if (!pending.isEmpty()) { - if (ret == null) - ret = new ArrayList<>(); + if (!pending.isEmpty()) { + if (ret == null) + ret = new ArrayList<>(); - ret.addAll(pending.values()); + ret.addAll(pending.values()); + } + + if (curBatch.compareAndSet(batch, null)) + break; } return ret; @@ -142,21 +155,30 @@ public class CacheContinuousQueryEventBuffer { private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) { assert cntr >= 0 : cntr; - Batch batch = initBatch(entry.topologyVersion()); + Batch batch; + Object res = null; - if (batch == null || cntr < batch.startCntr) { - if (backup) - backupQ.add(entry); + for (;;) { + batch = initBatch(entry.topologyVersion()); - return entry; - } + if (batch == null || cntr < batch.startCntr) { + if (backup) + backupQ.add(entry); - Object res = null; + return entry; + } + + if (cntr <= batch.endCntr) { + res = batch.processEntry0(null, cntr, entry, backup); + + if (res == RETRY) + continue; + } + else + pending.put(cntr, entry); - if (cntr <= batch.endCntr) - res = batch.processEvent0(null, cntr, entry, backup); - else - pending.put(cntr, entry); + break; + } Batch batch0 = curBatch.get(); @@ -166,7 +188,7 @@ public class CacheContinuousQueryEventBuffer { res = processPending(res, batch, backup); - batch0 = curBatch.get(); + batch0 = initBatch(entry.topologyVersion()); } while (batch != batch0); } @@ -184,17 +206,22 @@ public class CacheContinuousQueryEventBuffer { if (batch != null) return batch; - long curCntr = currentPartitionCounter(); + for (;;) { + long curCntr = currentPartitionCounter(); - if (curCntr == -1) - return null; + if (curCntr == -1) + return null; - batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer); + batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer); - if (curBatch.compareAndSet(null, batch)) - return batch; + if (curBatch.compareAndSet(null, batch)) + return batch; + + batch = curBatch.get(); - return curBatch.get(); + if (batch != null) + return batch; + } } /** @@ -211,7 +238,7 @@ public class CacheContinuousQueryEventBuffer { assert cntr >= batch.startCntr && cntr <= batch.endCntr : cntr; if (pending.remove(p.getKey()) != null) - res = batch.processEvent0(res, p.getKey(), p.getValue(), backup); + res = batch.processEntry0(res, p.getKey(), p.getValue(), backup); } } @@ -235,7 +262,7 @@ public class CacheContinuousQueryEventBuffer { private int lastProc = -1; /** */ - private final CacheContinuousQueryEntry[] entries; + private CacheContinuousQueryEntry[] entries; /** */ private final AffinityTopologyVersion topVer; @@ -261,7 +288,10 @@ public class CacheContinuousQueryEventBuffer { /** * @return Entries to send as part of backup queue. */ - @Nullable synchronized List<CacheContinuousQueryEntry> backupFlushEntries() { + @Nullable synchronized List<CacheContinuousQueryEntry> flushAndReset() { + if (entries == null) + return null; + List<CacheContinuousQueryEntry> res = null; long filtered = this.filtered; @@ -283,15 +313,7 @@ public class CacheContinuousQueryEventBuffer { if (e.isFiltered()) filtered++; else { - flushEntry = new CacheContinuousQueryEntry(e.cacheId(), - e.eventType(), - e.key(), - e.value(), - e.oldValue(), - e.isKeepBinary(), - e.partition(), - e.updateCounter(), - e.topologyVersion()); + flushEntry = e; flushEntry.filteredCount(filtered); @@ -316,6 +338,8 @@ public class CacheContinuousQueryEventBuffer { res.add(filteredEntry(cntr - 1, filtered - 1)); } + entries = null; + return res; } @@ -350,7 +374,7 @@ public class CacheContinuousQueryEventBuffer { * @return New result. */ @SuppressWarnings("unchecked") - @Nullable private Object processEvent0( + @Nullable private Object processEntry0( @Nullable Object res, long cntr, CacheContinuousQueryEntry entry, @@ -358,6 +382,9 @@ public class CacheContinuousQueryEventBuffer { int pos = (int)(cntr - startCntr); synchronized (this) { + if (entries == null) + return RETRY; + entries[pos] = entry; int next = lastProc + 1; @@ -409,17 +436,24 @@ public class CacheContinuousQueryEventBuffer { } lastProc = pos; - } - else - return res; - } - if (pos == entries.length -1) { - Arrays.fill(entries, null); + if (pos == entries.length - 1) { + Arrays.fill(entries, null); + + Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, + filtered, + entries, + entry.topologyVersion()); + + entries = null; - Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, entries, entry.topologyVersion()); + assert curBatch.get() == this; - curBatch.set(nextBatch); + curBatch.set(nextBatch); + } + } + else + return res; } return res; http://git-wip-us.apache.org/repos/asf/ignite/blob/01f45c1b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index b4f2dbd..ebfbe4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -452,7 +452,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) { CacheContinuousQueryEventBuffer buf = bufE.getValue(); - Collection<CacheContinuousQueryEntry> backupQueue = buf.resetBackupQueue(); + Collection<CacheContinuousQueryEntry> backupQueue = buf.flushOnExchange(); if (backupQueue != null && node != null) { for (CacheContinuousQueryEntry e : backupQueue) { @@ -958,7 +958,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) { CacheContinuousQueryEventBuffer buf = bufE.getValue(); - buf.resetBackupQueue(); + buf.flushOnExchange(); } }
