IGNITE-143 - Continuous queries refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2168c303 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2168c303 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2168c303 Branch: refs/heads/ignite-143 Commit: 2168c303eab919a88ab6d7867fc1cb63d6c27480 Parents: 63845f4 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Thu Feb 12 13:49:49 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Thu Feb 12 13:49:49 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 42 ++++++++++--- .../continuous/CacheContinuousQueryManager.java | 14 ++--- ...ridCacheContinuousQueryAbstractSelfTest.java | 63 -------------------- 3 files changed, 42 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2168c303/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 568f3fe..4dc32a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -35,8 +35,10 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; +import sun.plugin.dom.exception.*; import javax.cache.*; +import javax.cache.event.*; import javax.cache.expiry.*; import javax.cache.processor.*; import java.io.*; @@ -1164,8 +1166,11 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> subjId, null, taskName); } - if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); + if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) { + EventType type = old != null || oldBytes != null ? EventType.UPDATED : EventType.CREATED; + + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, type); + } cctx.dataStructures().onEntryUpdated(key, false); } @@ -1324,7 +1329,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes); + cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes, EventType.REMOVED); cctx.dataStructures().onEntryUpdated(key, true); } @@ -1633,7 +1638,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (res) updateMetrics(op, metrics); - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, eventType(op)); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -1645,7 +1650,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } } - return new GridTuple3<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old), invokeRes); + return new GridTuple3<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old), + invokeRes); } /** {@inheritDoc} */ @@ -2205,7 +2211,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updateMetrics(op, metrics); if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, + eventType(op)); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -3212,7 +3219,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (!skipQryNtf) { if (!preload && (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null, + EventType.CREATED); cctx.dataStructures().onEntryUpdated(key, false); } @@ -4376,6 +4384,26 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> return cctx.marshaller().unmarshal(res, ldr); } + /** + * @param op Operation. + * @return Event type. + */ + private EventType eventType(GridCacheOperation op) { + switch (op) { + case CREATE: + return EventType.CREATED; + + case UPDATE: + return EventType.UPDATED; + + case DELETE: + return EventType.REMOVED; + + default: + throw new InvalidStateException("Invalid operation: " + op); + } + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { // Identity comparison left on purpose. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2168c303/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 92416b6..ac113c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -120,7 +120,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @throws IgniteCheckedException In case of error. */ public void onEntryUpdated(GridCacheEntryEx<K, V> e, K key, V newVal, GridCacheValueBytes newBytes, - V oldVal, GridCacheValueBytes oldBytes) throws IgniteCheckedException { + V oldVal, GridCacheValueBytes oldBytes, EventType type) throws IgniteCheckedException { assert e != null; assert key != null; @@ -136,17 +136,14 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K oldVal = cctx.unwrapTemporary(oldVal); - EventType evtType = newVal == null ? REMOVED : - ((oldVal != null || (oldBytes != null && !oldBytes.isNull()) ? UPDATED : CREATED)); - CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key, newVal, newBytes, oldVal, oldBytes); e0.initValue(cctx.marshaller(), cctx.deploy().globalLoader()); CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( - cctx.kernalContext().grid().jcache(cctx.name()), evtType, e0); + cctx.kernalContext().grid().jcache(cctx.name()), type, e0); - boolean primary = e.wrap(false).primary(); + boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1); boolean recordIgniteEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) @@ -177,8 +174,11 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().grid().jcache(cctx.name()), EXPIRED, e0); + boolean primary = cctx.affinity().primary(cctx.localNode(), key, -1); + boolean recordIgniteEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) - lsnr.onEntryUpdated(evt, true, false); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2168c303/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 918f3b9..87d940a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.processors.datastructures.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -43,7 +42,6 @@ import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.configuration.*; -import javax.cache.event.CacheEntryEvent; import javax.cache.event.*; import javax.cache.integration.*; import java.util.*; @@ -838,67 +836,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo /** * @throws Exception If failed. */ - public void testCallbackForPreload() throws Exception { - IgniteCache<Integer, Integer> cache = grid(0).jcache(null); - - if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() == LOCAL) - return; - - Map<Integer, Integer> map = new HashMap<>(); - - final int keysCnt = 1000; - - for (int i = 0; i < keysCnt; i++) - map.put(i, i); - - cache.putAll(map); - - ContinuousQuery<Integer, Integer> qry = Query.continuous(); - - final CountDownLatch latch = new CountDownLatch(1); - final Collection<Integer> keys = new GridConcurrentHashSet<>(); - - qry.setInitialPredicate(Query.scan(new P2<Integer, Integer>() { - @Override public boolean apply(Integer k, Integer v) { - return true; - } - })); - - qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { - keys.add(evt.getKey()); - - if (keys.size() >= keysCnt) - latch.countDown(); - } - } - }); - - IgniteCache<Integer, Integer> cache0 = startGrid("anotherGrid").jcache(null); - - boolean repl = cache0.getConfiguration(CacheConfiguration.class).getCacheMode() == REPLICATED; - - try (QueryCursor<Cache.Entry<Integer, Integer>> cur = repl ? cache0.localQuery(qry) : cache0.query(qry)) { - for (Cache.Entry<Integer, Integer> evt : cur) { - keys.add(evt.getKey()); - - if (keys.size() >= keysCnt) - latch.countDown(); - } - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : keys.size(); - - assertEquals(keysCnt, keys.size()); - } - finally { - stopGrid("anotherGrid"); - } - } - - /** - * @throws Exception If failed. - */ public void testEvents() throws Exception { final AtomicInteger cnt = new AtomicInteger(); final CountDownLatch latch = new CountDownLatch(50);