Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 0f8c69aa4 -> c54051399


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c5405139
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c5405139
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c5405139

Branch: refs/heads/ignite-5075
Commit: c540513998fcb51a403d99469626215f974cf4ae
Parents: 0f8c69a
Author: sboikov <[email protected]>
Authored: Thu May 25 18:22:26 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu May 25 18:22:26 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheGroupInfrastructure.java         | 39 ++++++++++++++
 .../processors/cache/GridCacheMapEntry.java     | 26 +++++----
 .../distributed/dht/GridDhtCacheEntry.java      |  4 +-
 .../distributed/dht/GridDhtLocalPartition.java  |  8 +--
 .../continuous/CacheContinuousQueryManager.java | 14 +++++
 .../query/continuous/CounterSkipContext.java    | 56 ++++++++++++++++++++
 6 files changed, 133 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index a38a643..aed96e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -46,6 +46,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import 
org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -667,6 +668,44 @@ public class CacheGroupInfrastructure {
     }
 
     /**
+     * @param cacheId ID of cache initiated counter update.
+     * @param part Partition number.
+     * @param cntr Counter.
+     * @param topVer Topology version for current operation.
+     */
+    public void onPartitionCounterUpdate(int cacheId,
+        int part,
+        long cntr,
+        AffinityTopologyVersion topVer) {
+        assert sharedGroup();
+
+        if (isLocal())
+            return;
+
+        List<GridCacheContext> caches = this.caches;
+
+        CounterSkipContext skipCtx = null;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            if (cacheId != cctx.cacheId())
+                skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, 
part, cntr, topVer);
+        }
+
+        final List<Runnable> entriesC = skipCtx != null ? 
skipCtx.readyEntries() : null;
+
+        if (entriesC != null) {
+            ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    for (Runnable c : entriesC)
+                        c.run();
+                }
+            });
+        }
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     public void start() throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 95abd86..272de55 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -982,7 +982,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && 
!detached())
                 deletedUnlocked(false);
 
-            updateCntr0 = nextPartitionCounter();
+            updateCntr0 = nextPartitionCounter(topVer);
 
             if (updateCntr != null && updateCntr != 0)
                 updateCntr0 = updateCntr;
@@ -1161,7 +1161,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 }
             }
 
-            updateCntr0 = nextPartitionCounter();
+            updateCntr0 = nextPartitionCounter(topVer);
 
             if (updateCntr != null && updateCntr != 0)
                 updateCntr0 = updateCntr;
@@ -1563,7 +1563,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 updateMetrics(op, metrics);
 
             if (lsnrCol != null) {
-                long updateCntr = nextPartitionCounter();
+                long updateCntr = 
nextPartitionCounter(AffinityTopologyVersion.NONE);
 
                 cctx.continuousQueries().onEntryUpdated(
                     lsnrCol,
@@ -1652,6 +1652,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 (op == GridCacheOperation.TRANSFORM || 
cctx.loadPreviousValue()));
 
             c = new AtomicCacheUpdateClosure(this,
+                topVer,
                 newVer,
                 op,
                 writeObj,
@@ -1723,7 +1724,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         else
                             evtVal = (CacheObject)writeObj;
 
-                        long updateCntr0 = nextPartitionCounter();
+                        long updateCntr0 = nextPartitionCounter(topVer);
 
                         if (updateCntr != null)
                             updateCntr0 = updateCntr;
@@ -2613,7 +2614,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 long updateCntr = 0;
 
                 if (!preload)
-                    updateCntr = nextPartitionCounter();
+                    updateCntr = nextPartitionCounter(topVer);
 
                 if (walEnabled) {
                     cctx.shared().wal().log(new DataRecord(new DataEntry(
@@ -2669,9 +2670,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     }
 
     /**
+     * @param topVer Topology version for current operation.
      * @return Update counter.
      */
-    protected long nextPartitionCounter() {
+    protected long nextPartitionCounter(AffinityTopologyVersion topVer) {
         return 0;
     }
 
@@ -3937,6 +3939,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         private final GridCacheMapEntry entry;
 
         /** */
+        private final AffinityTopologyVersion topVer;
+
+        /** */
         private GridCacheVersion newVer;
 
         /** */
@@ -3999,7 +4004,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         /** */
         private CacheDataRow oldRow;
 
-        AtomicCacheUpdateClosure(GridCacheMapEntry entry,
+        AtomicCacheUpdateClosure(
+            GridCacheMapEntry entry,
+            AffinityTopologyVersion topVer,
             GridCacheVersion newVer,
             GridCacheOperation op,
             Object writeObj,
@@ -4020,6 +4027,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             assert op == UPDATE || op == DELETE || op == TRANSFORM : op;
 
             this.entry = entry;
+            this.topVer = topVer;
             this.newVer = newVer;
             this.op = op;
             this.writeObj = writeObj;
@@ -4388,7 +4396,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     ", locNodeId=" + cctx.localNodeId() + ']';
             }
 
-            long updateCntr0 = entry.nextPartitionCounter();
+            long updateCntr0 = entry.nextPartitionCounter(topVer);
 
             if (updateCntr != null)
                 updateCntr0 = updateCntr;
@@ -4472,7 +4480,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 // Must persist inside synchronization in non-tx mode.
                 cctx.store().remove(null, entry.key);
 
-            long updateCntr0 = entry.nextPartitionCounter();
+            long updateCntr0 = entry.nextPartitionCounter(topVer);
 
             if (updateCntr != null)
                 updateCntr0 = updateCntr;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 299ef3d..abda6f2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -93,8 +93,8 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected long nextPartitionCounter() {
-        return locPart.nextUpdateCounter();
+    @Override protected long nextPartitionCounter(AffinityTopologyVersion 
topVer) {
+        return locPart.nextUpdateCounter(cctx.cacheId(), topVer);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 90fbfa4..f2c0206 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -919,13 +919,15 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @param cacheId ID of cache initiated counter update.
+     * @param topVer Topology version for current operation.
      * @return Next update index.
      */
-    public long nextUpdateCounter() {
+    long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer) {
         long nextCntr = store.nextUpdateCounter();
 
-//        if (grp.sharedGroup())
-//            grp.onPartitionCounterUpdate(cacheId, nextCntr);
+        if (grp.sharedGroup())
+            grp.onPartitionCounterUpdate(cacheId, id, nextCntr, topVer);
 
         return nextCntr;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index f913aeb..fc39b6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -203,6 +203,20 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
     }
 
     /**
+     * @param ctx Context.
+     * @param part Partition number.
+     * @param cntr Update counter.
+     * @param topVer Topology version.
+     * @return Context.
+     */
+    @Nullable public CounterSkipContext skipUpdateCounter(@Nullable 
CounterSkipContext ctx,
+        int part,
+        long cntr,
+        AffinityTopologyVersion topVer) {
+        return null;
+    }
+
+    /**
      * @param internal Internal entry flag (internal key or not user cache).
      * @param preload Whether update happened during preloading.
      * @return Registered listeners.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5405139/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
new file mode 100644
index 0000000..342b9d7
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.List;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CounterSkipContext {
+    /** */
+    private CacheContinuousQueryEntry entry;
+
+    /** */
+    private List<Runnable> readySendC;
+
+    CacheContinuousQueryEntry entry(int part, long cntr, 
AffinityTopologyVersion topVer) {
+        if (entry == null) {
+            entry = new CacheContinuousQueryEntry(0,
+                null,
+                null,
+                null,
+                null,
+                false,
+                part,
+                cntr,
+                topVer);
+        }
+
+        return entry;
+    }
+
+    /**
+     * @return Entries
+     */
+    @Nullable public List<Runnable> readyEntries() {
+        return readySendC;
+    }
+}

Reply via email to