IGNITE-143 - Continuous queries refactoring

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2168c303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2168c303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2168c303

Branch: refs/heads/ignite-143
Commit: 2168c303eab919a88ab6d7867fc1cb63d6c27480
Parents: 63845f4
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Thu Feb 12 13:49:49 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Thu Feb 12 13:49:49 2015 -0800

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 42 ++++++++++---
 .../continuous/CacheContinuousQueryManager.java | 14 ++---
 ...ridCacheContinuousQueryAbstractSelfTest.java | 63 --------------------
 3 files changed, 42 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2168c303/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 568f3fe..4dc32a0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -35,8 +35,10 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
+import sun.plugin.dom.exception.*;
 
 import javax.cache.*;
+import javax.cache.event.*;
 import javax.cache.expiry.*;
 import javax.cache.processor.*;
 import java.io.*;
@@ -1164,8 +1166,11 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                     subjId, null, taskName);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
+            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear())) {
+                EventType type = old != null || oldBytes != null ? 
EventType.UPDATED : EventType.CREATED;
+
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes, type);
+            }
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1324,7 +1329,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 }
 
                 if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                    cctx.continuousQueries().onEntryUpdated(this, key, null, 
null, old, oldBytes);
+                    cctx.continuousQueries().onEntryUpdated(this, key, null, 
null, old, oldBytes, EventType.REMOVED);
 
                 cctx.dataStructures().onEntryUpdated(key, true);
             }
@@ -1633,7 +1638,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             if (res)
                 updateMetrics(op, metrics);
 
-            cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
+            cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes, eventType(op));
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -1645,7 +1650,8 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             }
         }
 
-        return new GridTuple3<>(res, cctx.<V>unwrapTemporary(interceptorRes != 
null ? interceptorRes.get2() : old), invokeRes);
+        return new GridTuple3<>(res, cctx.<V>unwrapTemporary(interceptorRes != 
null ? interceptorRes.get2() : old),
+            invokeRes);
     }
 
     /** {@inheritDoc} */
@@ -2205,7 +2211,8 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 updateMetrics(op, metrics);
 
             if (cctx.isReplicated() || primary)
-                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes,
+                    eventType(op));
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -3212,7 +3219,8 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 if (!skipQryNtf) {
                     if (!preload && (cctx.isLocal() || cctx.isReplicated() ||
                         cctx.affinity().primary(cctx.localNode(), key, 
topVer)))
-                        cctx.continuousQueries().onEntryUpdated(this, key, 
val, valueBytesUnlocked(), null, null);
+                        cctx.continuousQueries().onEntryUpdated(this, key, 
val, valueBytesUnlocked(), null, null,
+                            EventType.CREATED);
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }
@@ -4376,6 +4384,26 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
         return cctx.marshaller().unmarshal(res, ldr);
     }
 
+    /**
+     * @param op Operation.
+     * @return Event type.
+     */
+    private EventType eventType(GridCacheOperation op) {
+        switch (op) {
+            case CREATE:
+                return EventType.CREATED;
+
+            case UPDATE:
+                return EventType.UPDATED;
+
+            case DELETE:
+                return EventType.REMOVED;
+
+            default:
+                throw new InvalidStateException("Invalid operation: " + op);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         // Identity comparison left on purpose.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2168c303/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 92416b6..ac113c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -120,7 +120,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @throws IgniteCheckedException In case of error.
      */
     public void onEntryUpdated(GridCacheEntryEx<K, V> e, K key, V newVal, 
GridCacheValueBytes newBytes,
-        V oldVal, GridCacheValueBytes oldBytes) throws IgniteCheckedException {
+        V oldVal, GridCacheValueBytes oldBytes, EventType type) throws 
IgniteCheckedException {
         assert e != null;
         assert key != null;
 
@@ -136,17 +136,14 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
 
         oldVal = cctx.unwrapTemporary(oldVal);
 
-        EventType evtType = newVal == null ? REMOVED :
-            ((oldVal != null || (oldBytes != null && !oldBytes.isNull()) ? 
UPDATED : CREATED));
-
         CacheContinuousQueryEntry<K, V> e0 = new 
CacheContinuousQueryEntry<>(key, newVal, newBytes, oldVal, oldBytes);
 
         e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader());
 
         CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>(
-            cctx.kernalContext().grid().jcache(cctx.name()), evtType, e0);
+            cctx.kernalContext().grid().jcache(cctx.name()), type, e0);
 
-        boolean primary = e.wrap(false).primary();
+        boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1);
         boolean recordIgniteEvt = !e.isInternal() && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
         for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values())
@@ -177,8 +174,11 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
             CacheContinuousQueryEvent<K, V> evt = new 
CacheContinuousQueryEvent<>(
                 cctx.kernalContext().grid().jcache(cctx.name()), EXPIRED, e0);
 
+            boolean primary = cctx.affinity().primary(cctx.localNode(), key, 
-1);
+            boolean recordIgniteEvt = 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
             for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values())
-                lsnr.onEntryUpdated(evt, true, false);
+                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2168c303/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 918f3b9..87d940a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.processors.datastructures.*;
-import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -43,7 +42,6 @@ import org.jetbrains.annotations.*;
 
 import javax.cache.*;
 import javax.cache.configuration.*;
-import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.*;
 import javax.cache.integration.*;
 import java.util.*;
@@ -838,67 +836,6 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
     /**
      * @throws Exception If failed.
      */
-    public void testCallbackForPreload() throws Exception {
-        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
-
-        if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() == 
LOCAL)
-            return;
-
-        Map<Integer, Integer> map = new HashMap<>();
-
-        final int keysCnt = 1000;
-
-        for (int i = 0; i < keysCnt; i++)
-            map.put(i, i);
-
-        cache.putAll(map);
-
-        ContinuousQuery<Integer, Integer> qry = Query.continuous();
-
-        final CountDownLatch latch = new CountDownLatch(1);
-        final Collection<Integer> keys = new GridConcurrentHashSet<>();
-
-        qry.setInitialPredicate(Query.scan(new P2<Integer, Integer>() {
-            @Override public boolean apply(Integer k, Integer v) {
-                return true;
-            }
-        }));
-
-        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() 
{
-            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts) {
-                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt 
: evts) {
-                    keys.add(evt.getKey());
-
-                    if (keys.size() >= keysCnt)
-                        latch.countDown();
-                }
-            }
-        });
-
-        IgniteCache<Integer, Integer> cache0 = 
startGrid("anotherGrid").jcache(null);
-
-        boolean repl = 
cache0.getConfiguration(CacheConfiguration.class).getCacheMode() == REPLICATED;
-
-        try (QueryCursor<Cache.Entry<Integer, Integer>> cur = repl ? 
cache0.localQuery(qry) : cache0.query(qry)) {
-            for (Cache.Entry<Integer, Integer> evt : cur) {
-                keys.add(evt.getKey());
-
-                if (keys.size() >= keysCnt)
-                    latch.countDown();
-            }
-
-            assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : keys.size();
-
-            assertEquals(keysCnt, keys.size());
-        }
-        finally {
-            stopGrid("anotherGrid");
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testEvents() throws Exception {
         final AtomicInteger cnt = new AtomicInteger();
         final CountDownLatch latch = new CountDownLatch(50);

Reply via email to