Repository: ignite Updated Branches: refs/heads/ignite-5075-cc1 [created] 13d378534
cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/13d37853 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/13d37853 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/13d37853 Branch: refs/heads/ignite-5075-cc1 Commit: 13d378534013b84a55656b8873bd7092544e302c Parents: 01f45c1 Author: sboikov <[email protected]> Authored: Fri May 26 09:31:12 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri May 26 09:31:12 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 12 +++- .../dht/GridClientPartitionTopology.java | 5 ++ .../distributed/dht/GridDhtLocalPartition.java | 8 ++- .../dht/GridDhtPartitionTopology.java | 2 + .../dht/GridDhtPartitionTopologyImpl.java | 62 +++++++++++------ .../GridNearAtomicSingleUpdateFuture.java | 2 + .../GridDhtPartitionsExchangeFuture.java | 27 ++++++-- .../CacheContinuousQueryEventBuffer.java | 70 ++++++++++++-------- .../CacheContinuousQueryPartitionRecovery.java | 5 +- 9 files changed, 134 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 80f872c..ba21964 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -28,7 +28,6 @@ import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; - import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -61,8 +60,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; -import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; +import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridTuple; @@ -75,7 +74,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED; @@ -1726,6 +1724,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long updateCntr0 = nextPartCounter(); + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(partition(), updateCntr0, "set new counter conflict on " + (primary ? "primary" : "backup")); + if (updateCntr != null) updateCntr0 = updateCntr; @@ -2615,6 +2615,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long updateCntr = 0; + assert preload; + if (!preload) updateCntr = nextPartCounter(topVer); @@ -4410,6 +4412,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long updateCntr0 = entry.nextPartCounter(); + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(entry.partition(), updateCntr0, "new counter for update on " + (primary ? "primary" : "backup")); + if (updateCntr != null) updateCntr0 = updateCntr; @@ -4490,6 +4494,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Must persist inside synchronization in non-tx mode. cctx.store().remove(null, entry.key); + System.exit(11); + long updateCntr0 = entry.nextPartCounter(); if (updateCntr != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 1de64c5..2eebda3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -649,6 +649,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } + @Override + public void updateCounters(Map<Integer, T2<Long, Long>> cntrMap) { + throw new UnsupportedOperationException(); + } + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update( http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 6fb557a..fd76d3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -799,7 +799,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return Current update index. */ public long updateCounter() { - return store.updateCounter(); + long cntr0 = store.updateCounter(); + + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(id, cntr0, "provide counter " + cctx.shared().exchange().lastTopologyFuture().topologyVersion()); + + return cntr0; } /** @@ -813,6 +817,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param val Update index value. */ public void updateCounter(long val) { + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(id, val, "set new counter"); + store.updateCounter(val); } http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index f9fd852..91289a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -241,6 +241,8 @@ public interface GridDhtPartitionTopology { GridDhtPartitionMap parts, @Nullable Map<Integer, T2<Long, Long>> cntrMap); + public void updateCounters(Map<Integer, T2<Long, Long>> cntrMap); + /** * Checks if there is at least one owner for each partition in the cache topology. * If not, marks such a partition as LOST. http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 8e79eda..d93fedf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1122,8 +1122,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { T2<Long, Long> cntr = cntrMap.get(part.id()); - if (cntr != null) + if (cntr != null) { + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(part.id(), cntr.get2(), "set new counter from full " + exchId.topologyVersion()); + part.updateCounter(cntr.get2()); + } } } @@ -1255,29 +1258,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - /** {@inheritDoc} */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap update( - @Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts, - @Nullable Map<Integer, T2<Long, Long>> cntrMap - ) { - if (log.isDebugEnabled()) - log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); - - if (!cctx.discovery().alive(parts.nodeId())) { - if (log.isDebugEnabled()) - log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + - ", parts=" + parts + ']'); - - return null; - } - + @Override + public void updateCounters(Map<Integer, T2<Long, Long>> cntrMap) { lock.writeLock().lock(); try { if (stopping) - return null; + return; if (cntrMap != null) { for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { @@ -1295,10 +1282,43 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { T2<Long, Long> cntr = cntrMap.get(part.id()); - if (cntr != null && cntr.get2() > part.updateCounter()) + if (cntr != null && cntr.get2() > part.updateCounter()) { + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(part.id(), cntr.get2(), "set new counter from single"); + part.updateCounter(cntr.get2()); + } } } + } + finally { + lock.writeLock().unlock(); + + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Nullable @Override public GridDhtPartitionMap update( + @Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionMap parts, + @Nullable Map<Integer, T2<Long, Long>> cntrMap + ) { + if (log.isDebugEnabled()) + log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); + + if (!cctx.discovery().alive(parts.nodeId())) { + if (log.isDebugEnabled()) + log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + + ", parts=" + parts + ']'); + + return null; + } + + lock.writeLock().lock(); + + try { + if (stopping) + return null; if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 6ffa373..16301e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -558,6 +558,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda ClusterNode primary = nodes.get(0); + //TestDebugLog.addEntryMessage(cacheKey.partition(), primary.id(), "mapped primary"); + boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1; GridNearAtomicAbstractUpdateRequest req; http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 8b8c87c..4b25ac4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.PartitionLossPolicy; @@ -47,18 +46,19 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; -import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -494,6 +494,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT crd = srvNodes.isEmpty() ? null : srvNodes.get(0); + //org.apache.ignite.spi.collision.TestDebugLog.addMessage("Start exchange topVer=" + topologyVersion() + ", crd=" + crd.id()); + boolean crdNode = crd != null && crd.isLocal(); skipPreload = cctx.kernalContext().clientNode(); @@ -1106,6 +1108,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) { boolean realExchange = !dummy && !forcePreload; +// if (realExchange) +// org.apache.ignite.spi.collision.TestDebugLog.addMessage("End exchange topVer=" + topologyVersion() + ", crd=" + (crd != null ? crd.id() : null)); + if (realExchange && !cctx.kernalContext().clientNode() && (serverNotDiscoveryEvent() || affChangeMsg != null)) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) @@ -1564,6 +1569,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } + for (GridDhtPartitionsAbstractMessage msg0 : msgs.values()) { + if (msg0 instanceof GridDhtPartitionsSingleMessage) { + for (Map.Entry<Integer, GridDhtPartitionMap> entry : ((GridDhtPartitionsSingleMessage)msg0).partitions().entrySet()) { + Integer cacheId = entry.getKey(); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : + cctx.exchange().clientTopology(cacheId, this); + + top.updateCounters(msg0.partitionUpdateCounters(cacheId)); + } + } + } + if (discoEvt.type() == EVT_NODE_JOINED) { if (cctx.kernalContext().state().active()) assignPartitionsStates(); @@ -1795,7 +1814,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : cctx.exchange().clientTopology(cacheId, this); - top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId)); + top.update(exchId, entry.getValue(), null); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index c59b851..65b7d2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -143,6 +143,8 @@ public class CacheContinuousQueryEventBuffer { * @return Collected entries to pass to listener (single entry or entries list). */ @Nullable Object processEntry(CacheContinuousQueryEntry e, boolean backup) { + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(e.partition(), e.updateCounter(), "processEntry on " + (backup ? "backup" : "primary")); + return process0(e.updateCounter(), e, backup); } @@ -235,11 +237,46 @@ public class CacheContinuousQueryEventBuffer { for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr, true).entrySet()) { long cntr = p.getKey(); - assert cntr >= batch.startCntr && cntr <= batch.endCntr : cntr; + assert cntr <= batch.endCntr; + + if (pending.remove(p.getKey()) != null) { + if (cntr < batch.startCntr) + res = addResult(res, p.getValue(), backup); + else + res = batch.processEntry0(res, p.getKey(), p.getValue(), backup); + } + } + } + + return res; + } + + private Object addResult(Object res, CacheContinuousQueryEntry entry, boolean backup) { + if (res == null) { + if (backup) + backupQ.add(entry); + else + res = entry; + } + else { + assert !backup; + + List<CacheContinuousQueryEntry> resList; + + if (res instanceof CacheContinuousQueryEntry) { + resList = new ArrayList<>(); + + resList.add((CacheContinuousQueryEntry)res); + } + else { + assert res instanceof List : res; - if (pending.remove(p.getKey()) != null) - res = batch.processEntry0(res, p.getKey(), p.getValue(), backup); + resList = (List<CacheContinuousQueryEntry>)res; } + + resList.add(entry); + + res = resList; } return res; @@ -399,32 +436,7 @@ public class CacheContinuousQueryEventBuffer { filtered = 0; - if (res == null) { - if (backup) - backupQ.add(entry0); - else - res = entry0; - } - else { - assert !backup; - - List<CacheContinuousQueryEntry> resList; - - if (res instanceof CacheContinuousQueryEntry) { - resList = new ArrayList<>(); - - resList.add((CacheContinuousQueryEntry)res); - } - else { - assert res instanceof List : res; - - resList = (List<CacheContinuousQueryEntry>)res; - } - - resList.add(entry0); - - res = resList; - } + res = addResult(res, entry0, backup); } else filtered++; http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java index 59252d2..e1bc49a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java @@ -243,8 +243,11 @@ class CacheContinuousQueryPartitionRecovery { else { if (pending.isFiltered()) skippedFiltered = true; - else + else { + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(pending.partition(), pending.updateCounter(), " stop process last=" + lastFiredEvt); + break; + } } }
