Repository: ignite Updated Branches: refs/heads/ignite-5075 0f8c69aa4 -> c54051399
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c5405139 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c5405139 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c5405139 Branch: refs/heads/ignite-5075 Commit: c540513998fcb51a403d99469626215f974cf4ae Parents: 0f8c69a Author: sboikov <[email protected]> Authored: Thu May 25 18:22:26 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 25 18:22:26 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheGroupInfrastructure.java | 39 ++++++++++++++ .../processors/cache/GridCacheMapEntry.java | 26 +++++---- .../distributed/dht/GridDhtCacheEntry.java | 4 +- .../distributed/dht/GridDhtLocalPartition.java | 8 +-- .../continuous/CacheContinuousQueryManager.java | 14 +++++ .../query/continuous/CounterSkipContext.java | 56 ++++++++++++++++++++ 6 files changed, 133 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index a38a643..aed96e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -667,6 +668,44 @@ public class CacheGroupInfrastructure { } /** + * @param cacheId ID of cache initiated counter update. + * @param part Partition number. + * @param cntr Counter. + * @param topVer Topology version for current operation. + */ + public void onPartitionCounterUpdate(int cacheId, + int part, + long cntr, + AffinityTopologyVersion topVer) { + assert sharedGroup(); + + if (isLocal()) + return; + + List<GridCacheContext> caches = this.caches; + + CounterSkipContext skipCtx = null; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + if (cacheId != cctx.cacheId()) + skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer); + } + + final List<Runnable> entriesC = skipCtx != null ? skipCtx.readyEntries() : null; + + if (entriesC != null) { + ctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + for (Runnable c : entriesC) + c.run(); + } + }); + } + } + + /** * @throws IgniteCheckedException If failed. */ public void start() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 95abd86..272de55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -982,7 +982,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) deletedUnlocked(false); - updateCntr0 = nextPartitionCounter(); + updateCntr0 = nextPartitionCounter(topVer); if (updateCntr != null && updateCntr != 0) updateCntr0 = updateCntr; @@ -1161,7 +1161,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - updateCntr0 = nextPartitionCounter(); + updateCntr0 = nextPartitionCounter(topVer); if (updateCntr != null && updateCntr != 0) updateCntr0 = updateCntr; @@ -1563,7 +1563,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updateMetrics(op, metrics); if (lsnrCol != null) { - long updateCntr = nextPartitionCounter(); + long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE); cctx.continuousQueries().onEntryUpdated( lsnrCol, @@ -1652,6 +1652,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue())); c = new AtomicCacheUpdateClosure(this, + topVer, newVer, op, writeObj, @@ -1723,7 +1724,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else evtVal = (CacheObject)writeObj; - long updateCntr0 = nextPartitionCounter(); + long updateCntr0 = nextPartitionCounter(topVer); if (updateCntr != null) updateCntr0 = updateCntr; @@ -2613,7 +2614,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long updateCntr = 0; if (!preload) - updateCntr = nextPartitionCounter(); + updateCntr = nextPartitionCounter(topVer); if (walEnabled) { cctx.shared().wal().log(new DataRecord(new DataEntry( @@ -2669,9 +2670,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** + * @param topVer Topology version for current operation. * @return Update counter. */ - protected long nextPartitionCounter() { + protected long nextPartitionCounter(AffinityTopologyVersion topVer) { return 0; } @@ -3937,6 +3939,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private final GridCacheMapEntry entry; /** */ + private final AffinityTopologyVersion topVer; + + /** */ private GridCacheVersion newVer; /** */ @@ -3999,7 +4004,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** */ private CacheDataRow oldRow; - AtomicCacheUpdateClosure(GridCacheMapEntry entry, + AtomicCacheUpdateClosure( + GridCacheMapEntry entry, + AffinityTopologyVersion topVer, GridCacheVersion newVer, GridCacheOperation op, Object writeObj, @@ -4020,6 +4027,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert op == UPDATE || op == DELETE || op == TRANSFORM : op; this.entry = entry; + this.topVer = topVer; this.newVer = newVer; this.op = op; this.writeObj = writeObj; @@ -4388,7 +4396,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ", locNodeId=" + cctx.localNodeId() + ']'; } - long updateCntr0 = entry.nextPartitionCounter(); + long updateCntr0 = entry.nextPartitionCounter(topVer); if (updateCntr != null) updateCntr0 = updateCntr; @@ -4472,7 +4480,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Must persist inside synchronization in non-tx mode. cctx.store().remove(null, entry.key); - long updateCntr0 = entry.nextPartitionCounter(); + long updateCntr0 = entry.nextPartitionCounter(topVer); if (updateCntr != null) updateCntr0 = updateCntr; http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 299ef3d..abda6f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -93,8 +93,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected long nextPartitionCounter() { - return locPart.nextUpdateCounter(); + @Override protected long nextPartitionCounter(AffinityTopologyVersion topVer) { + return locPart.nextUpdateCounter(cctx.cacheId(), topVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 90fbfa4..f2c0206 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -919,13 +919,15 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @param cacheId ID of cache initiated counter update. + * @param topVer Topology version for current operation. * @return Next update index. */ - public long nextUpdateCounter() { + long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer) { long nextCntr = store.nextUpdateCounter(); -// if (grp.sharedGroup()) -// grp.onPartitionCounterUpdate(cacheId, nextCntr); + if (grp.sharedGroup()) + grp.onPartitionCounterUpdate(cacheId, id, nextCntr, topVer); return nextCntr; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index f913aeb..fc39b6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -203,6 +203,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * @param ctx Context. + * @param part Partition number. + * @param cntr Update counter. + * @param topVer Topology version. + * @return Context. + */ + @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext ctx, + int part, + long cntr, + AffinityTopologyVersion topVer) { + return null; + } + + /** * @param internal Internal entry flag (internal key or not user cache). * @param preload Whether update happened during preloading. * @return Registered listeners. http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java new file mode 100644 index 0000000..342b9d7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.List; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CounterSkipContext { + /** */ + private CacheContinuousQueryEntry entry; + + /** */ + private List<Runnable> readySendC; + + CacheContinuousQueryEntry entry(int part, long cntr, AffinityTopologyVersion topVer) { + if (entry == null) { + entry = new CacheContinuousQueryEntry(0, + null, + null, + null, + null, + false, + part, + cntr, + topVer); + } + + return entry; + } + + /** + * @return Entries + */ + @Nullable public List<Runnable> readyEntries() { + return readySendC; + } +}
