IGNITE-143 - Continuous queries refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3520bfde Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3520bfde Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3520bfde Branch: refs/heads/ignite-143 Commit: 3520bfdeb3248f8305978bf35a804e9f2ac2827c Parents: 4759d02 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Thu Feb 12 13:03:24 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Thu Feb 12 13:03:24 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 21 +- .../processors/cache/GridCacheProcessor.java | 11 +- .../processors/cache/GridCacheProjectionEx.java | 5 + .../processors/cache/IgniteCacheProxy.java | 76 +- .../CacheDataStructuresManager.java | 71 +- .../continuous/CacheContinuousQueryEntry.java | 8 - .../continuous/CacheContinuousQueryHandler.java | 94 +-- .../CacheContinuousQueryListener.java | 4 +- .../continuous/CacheContinuousQueryManager.java | 709 +++++++++---------- .../service/GridServiceProcessor.java | 37 +- .../hadoop/jobtracker/GridHadoopJobTracker.java | 53 +- 11 files changed, 504 insertions(+), 585 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 47a0d10..1783352 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1166,7 +1166,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); cctx.dataStructures().onEntryUpdated(key, false); } @@ -1325,7 +1325,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes); cctx.dataStructures().onEntryUpdated(key, true); } @@ -1602,7 +1602,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (res) updateMetrics(op, metrics); - cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -2139,7 +2139,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updateMetrics(op, metrics); if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -3137,16 +3137,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> drReplicate(drType, val, valBytes, ver); if (!skipQryNtf) { - if (cctx.isLocal() || cctx.isReplicated() || - cctx.affinity().primary(cctx.localNode(), key, topVer)) { - cctx.continuousQueries().onEntryUpdate(this, - key, - val, - valueBytesUnlocked(), - null, - null, - preload); - } + if (!preload && (cctx.isLocal() || cctx.isReplicated() || + cctx.affinity().primary(cctx.localNode(), key, topVer))) + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null); cctx.dataStructures().onEntryUpdated(key, false); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e7fa211..e596605 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1589,15 +1589,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @return Utility cache. */ - public <K, V> GridCache<K, V> utilityCache() { - return cache(CU.UTILITY_CACHE_NAME); - } - - /** - * @return Utility cache. - */ - public <K, V> IgniteCache<K, V> utilityJCache() { - return jcache(CU.UTILITY_CACHE_NAME); + public <K, V> GridCacheAdapter<K, V> utilityCache() { + return internalCache(CU.UTILITY_CACHE_NAME); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java index 1d30458..d080ddf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java @@ -453,4 +453,9 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args); + + /** + * @return Context. + */ + public GridCacheContext<K, V> context(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 3e8d461..72e7e46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -24,8 +24,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.internal.processors.cache.query.continuous.*; -import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; @@ -33,7 +31,6 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.mxbean.*; -import org.apache.ignite.plugin.security.*; import org.jetbrains.annotations.*; import javax.cache.*; @@ -46,8 +43,6 @@ import java.io.*; import java.util.*; import java.util.concurrent.locks.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; - /** * Cache proxy. */ @@ -337,7 +332,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * Executes continuous query. * * @param qry Query. - * @param loc Whether query is local. + * @param loc Local flag. * @return Initial iteration cursor. */ private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, boolean loc) { @@ -348,63 +343,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (qry.getLocalListener() == null) throw new IgniteException("Mandatory local listener is not set for the query: " + qry); - ctx.checkSecurity(GridSecurityPermission.CACHE_READ); - - IgniteEx ignite = ctx.kernalContext().grid(); - - ClusterGroup grp = (loc ? ignite.forLocal() : ignite).forCacheNodes(ctx.name()); - - Collection<ClusterNode> nodes = grp.nodes(); - - if (nodes.isEmpty()) - throw new ClusterTopologyException("Failed to execute continuous query (empty cluster group is " + - "provided): " + qry); - - boolean skipPrimaryCheck = false; - - switch (ctx.config().getCacheMode()) { - case LOCAL: - if (!nodes.contains(ctx.localNode())) - throw new ClusterTopologyException("Continuous query for LOCAL cache can be executed " + - "only locally (provided projection contains remote nodes only): " + qry); - else if (nodes.size() > 1) - U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " + - "ignored): " + this); - - grp = grp.forNode(ctx.localNode()); - - break; - - case REPLICATED: - if (nodes.size() == 1 && F.first(nodes).equals(ctx.localNode())) { - CacheDistributionMode distributionMode = ctx.config().getDistributionMode(); - - if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED) - skipPrimaryCheck = true; - } - - break; - } - - int taskNameHash = ctx.kernalContext().security().enabled() ? - ctx.kernalContext().job().currentTaskNameHash() : 0; - - GridContinuousHandler hnd = new CacheContinuousQueryHandler<>( - ctx.name(), - ctx.continuousQueries().topic(), - qry.getLocalListener(), - qry.getRemoteFilter(), - false, - false, - false, - true, - skipPrimaryCheck, - taskNameHash, - false); - try { - final UUID routineId = ctx.kernalContext().continuous().startRoutine(hnd, qry.getBufferSize(), - qry.getTimeInterval(), qry.isAutoUnsubscribe(), grp.predicate()).get(); + final UUID routineId = ctx.continuousQueries().executeQuery( + qry.getLocalListener(), + qry.getRemoteFilter(), + qry.getBufferSize(), + qry.getTimeInterval(), + qry.isAutoUnsubscribe(), + loc ? ctx.grid().forLocal() : null); final QueryCursor<Cache.Entry<K, V>> cur; @@ -1226,7 +1172,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - ctx.continuousQueries().registerCacheEntryListener(lsnrCfg, true); + ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false); } catch (IgniteCheckedException e) { throw cacheException(e); @@ -1237,11 +1183,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ - @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) { + @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - ctx.continuousQueries().deregisterCacheEntryListener(lsnrCfg); + ctx.continuousQueries().cancelJCacheQuery(lsnrCfg); } catch (IgniteCheckedException e) { throw cacheException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 5ee2b31..11c85ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; @@ -32,8 +31,6 @@ import org.apache.ignite.resources.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; -import javax.cache.*; -import javax.cache.event.CacheEntryEvent; import javax.cache.event.*; import java.io.*; import java.util.*; @@ -60,8 +57,11 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, /** Queue header view. */ private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView; + /** System cache. */ + private GridCacheAdapter<Object, Object> sysCache; + /** Query notifying about queue update. */ - private QueryCursor<Cache.Entry<Object, Object>> queueQryCur; + private UUID queueQryId; /** Queue query creation guard. */ private final AtomicBoolean queueQryGuard = new AtomicBoolean(); @@ -89,6 +89,8 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, try { queueHdrView = cctx.cache().projection(GridCacheQueueHeaderKey.class, GridCacheQueueHeader.class); + sysCache = cctx.kernalContext().cache().utilityCache(); + initFlag = true; } finally { @@ -100,8 +102,8 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, @Override protected void onKernalStop0(boolean cancel) { busyLock.block(); - if (queueQryCur != null) - queueQryCur.close(); + if (queueQryId != null) + cctx.continuousQueries().cancelInternalQuery(queueQryId); for (GridCacheQueueProxy q : queuesMap.values()) q.delegate().onKernalStop(); @@ -184,48 +186,43 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, return null; if (queueQryGuard.compareAndSet(false, true)) { - ContinuousQuery<Object, Object> qry = Query.continuous(); - - qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { - if (!busyLock.enterBusy()) - return; + queueQryId = sysCache.context().continuousQueries().executeInternalQuery( + new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + if (!busyLock.enterBusy()) + return; - try { - for (CacheEntryEvent<?, ?> e : evts) { - GridCacheQueueHeaderKey key = (GridCacheQueueHeaderKey)e.getKey(); - GridCacheQueueHeader hdr = (GridCacheQueueHeader)e.getValue(); + try { + for (CacheEntryEvent<?, ?> e : evts) { + GridCacheQueueHeaderKey key = (GridCacheQueueHeaderKey)e.getKey(); + GridCacheQueueHeader hdr = (GridCacheQueueHeader)e.getValue(); - for (final GridCacheQueueProxy queue : queuesMap.values()) { - if (queue.name().equals(key.queueName())) { - if (hdr == null) { - GridCacheQueueHeader oldHdr = (GridCacheQueueHeader)e.getOldValue(); + for (final GridCacheQueueProxy queue : queuesMap.values()) { + if (queue.name().equals(key.queueName())) { + if (hdr == null) { + GridCacheQueueHeader oldHdr = (GridCacheQueueHeader)e.getOldValue(); - assert oldHdr != null; + assert oldHdr != null; - if (oldHdr.id().equals(queue.delegate().id())) { - queue.delegate().onRemoved(false); + if (oldHdr.id().equals(queue.delegate().id())) { + queue.delegate().onRemoved(false); - queuesMap.remove(queue.delegate().id()); + queuesMap.remove(queue.delegate().id()); + } } + else + queue.delegate().onHeaderChanged(hdr); } - else - queue.delegate().onHeaderChanged(hdr); } } } + finally { + busyLock.leaveBusy(); + } } - finally { - busyLock.leaveBusy(); - } - } - }); - - qry.setRemoteFilter(new QueueHeaderPredicate()); - - IgniteCache<Object, Object> jCache = cctx.kernalContext().cache().utilityJCache(); - - queueQryCur = cctx.isLocal() || cctx.isReplicated() ? jCache.localQuery(qry) : jCache.query(qry); + }, + new QueueHeaderPredicate(), + cctx.isLocal() || cctx.isReplicated()); } GridCacheQueueProxy queue = queuesMap.get(hdr.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 92f0f0a..c8d9fec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -172,14 +172,6 @@ class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externaliz return oldVal; } - /** - * Nullifies old value. - */ - void nullifyOldValue() { - oldVal = null; - oldValBytes = null; - } - /** {@inheritDoc} */ @Override public void prepare(GridDeploymentInfo depInfo) { this.depInfo = depInfo; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 5c03488..ece5a01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -29,8 +29,8 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; -import javax.cache.event.CacheEntryEvent; import javax.cache.event.*; +import javax.cache.event.EventType; import java.io.*; import java.util.*; @@ -49,33 +49,30 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** Topic for ordered messages. */ private Object topic; - /** Local callback. */ - private CacheEntryUpdatedListener<K, V> locLsnr; + /** Local listener. */ + private transient CacheEntryUpdatedListener<K, V> locLsnr; - /** Filter. */ + /** Remote filter. */ private CacheEntryEventFilter<K, V> rmtFilter; /** Deployable object for filter. */ - private DeployableObject filterDep; + private DeployableObject rmtFilterDep; /** Internal flag. */ private boolean internal; - /** Entry listener flag. */ - private boolean entryLsnr; + /** Old value required flag. */ + private boolean oldValRequired; - /** Synchronous listener flag. */ + /** Synchronous flag. */ private boolean sync; - /** {@code True} if old value is required. */ - private boolean oldVal; + /** Ignore expired events flag. */ + private boolean ignoreExpired; /** Task name hash code. */ private int taskHash; - /** Keep portable flag. */ - private boolean keepPortable; - /** Whether to skip primary check for REPLICATED cache. */ private transient boolean skipPrimaryCheck; @@ -93,30 +90,36 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param topic Topic for ordered messages. * @param locLsnr Local listener. * @param rmtFilter Remote filter. - * @param internal If {@code true} then query is notified about internal entries updates. - * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. - * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. - * @param oldVal {@code True} if old value is required. + * @param internal Internal flag. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired events flag. * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. * @param taskHash Task name hash code. */ - public CacheContinuousQueryHandler(@Nullable String cacheName, Object topic, - CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, boolean internal, - boolean entryLsnr, boolean sync, boolean oldVal, boolean skipPrimaryCheck, int taskHash, boolean keepPortable) { + public CacheContinuousQueryHandler( + String cacheName, + Object topic, + CacheEntryUpdatedListener<K, V> locLsnr, + CacheEntryEventFilter<K, V> rmtFilter, + boolean internal, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + int taskHash, + boolean skipPrimaryCheck) { assert topic != null; assert locLsnr != null; - assert !sync || entryLsnr; this.cacheName = cacheName; this.topic = topic; this.locLsnr = locLsnr; this.rmtFilter = rmtFilter; this.internal = internal; - this.entryLsnr = entryLsnr; + this.oldValRequired = oldValRequired; this.sync = sync; - this.oldVal = oldVal; + this.ignoreExpired = ignoreExpired; this.taskHash = taskHash; - this.keepPortable = keepPortable; this.skipPrimaryCheck = skipPrimaryCheck; } @@ -170,7 +173,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - @Override public void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt) { + @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, + boolean recordIgniteEvt) { + if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) + return; + GridCacheContext<K, V> cctx = cacheContext(ctx); if (cctx.isReplicated() && !skipPrimaryCheck && !primary) @@ -190,9 +197,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } if (notify) { - if (!oldVal) - evt.entry().nullifyOldValue(); - if (loc) locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); else { @@ -218,7 +222,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - if (!entryLsnr && recordIgniteEvt) { + if (recordIgniteEvt) { ctx.event().record(new CacheQueryReadEvent<>( ctx.discovery().localNode(), "Continuous query executed.", @@ -242,16 +246,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } @Override public void onUnregister() { - if (rmtFilter != null && rmtFilter instanceof CacheContinuousQueryFilterEx) + if (rmtFilter instanceof CacheContinuousQueryFilterEx) ((CacheContinuousQueryFilterEx)rmtFilter).onQueryUnregister(); } - @Nullable private String taskName() { + @Override public boolean oldValueRequired() { + return oldValRequired; + } + + private String taskName() { return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; } }; - return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr); + return manager(ctx).registerListener(routineId, lsnr, internal); } /** {@inheritDoc} */ @@ -332,7 +340,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert ctx.config().isPeerClassLoadingEnabled(); if (rmtFilter != null && !U.isGrid(rmtFilter.getClass())) - filterDep = new DeployableObject(rmtFilter, ctx); + rmtFilterDep = new DeployableObject(rmtFilter, ctx); } /** {@inheritDoc} */ @@ -341,8 +349,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert ctx != null; assert ctx.config().isPeerClassLoadingEnabled(); - if (filterDep != null) - rmtFilter = filterDep.unmarshal(nodeId, ctx); + if (rmtFilterDep != null) + rmtFilter = rmtFilterDep.unmarshal(nodeId, ctx); } /** {@inheritDoc} */ @@ -355,21 +363,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler U.writeString(out, cacheName); out.writeObject(topic); - boolean b = filterDep != null; + boolean b = rmtFilterDep != null; out.writeBoolean(b); if (b) - out.writeObject(filterDep); + out.writeObject(rmtFilterDep); else out.writeObject(rmtFilter); out.writeBoolean(internal); - out.writeBoolean(entryLsnr); + out.writeBoolean(oldValRequired); out.writeBoolean(sync); - out.writeBoolean(oldVal); + out.writeBoolean(ignoreExpired); out.writeInt(taskHash); - out.writeBoolean(keepPortable); } /** {@inheritDoc} */ @@ -381,16 +388,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler boolean b = in.readBoolean(); if (b) - filterDep = (DeployableObject)in.readObject(); + rmtFilterDep = (DeployableObject)in.readObject(); else rmtFilter = (CacheEntryEventFilter<K, V>)in.readObject(); internal = in.readBoolean(); - entryLsnr = in.readBoolean(); + oldValRequired = in.readBoolean(); sync = in.readBoolean(); - oldVal = in.readBoolean(); + ignoreExpired = in.readBoolean(); taskHash = in.readInt(); - keepPortable = in.readBoolean(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 69e9523..21c45f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -32,10 +32,12 @@ interface CacheContinuousQueryListener<K, V> { * @param e Entry. * @param recordIgniteEvt Whether to record event. */ - public void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt); + public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt); /** * Listener unregistered callback. */ public void onUnregister(); + + public boolean oldValueRequired(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 52e915e..92416b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -18,13 +18,15 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; +import org.apache.ignite.plugin.security.*; import org.jdk8.backport.*; -import org.jetbrains.annotations.*; import javax.cache.configuration.*; import javax.cache.event.*; @@ -34,6 +36,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static javax.cache.event.EventType.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.GridTopic.*; @@ -41,8 +44,17 @@ import static org.apache.ignite.internal.GridTopic.*; * Continuous queries manager. */ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K, V> { - /** Ordered topic prefix. */ - private String topicPrefix; + /** */ + private static final byte CREATED_FLAG = 0b0001; + + /** */ + private static final byte UPDATED_FLAG = 0b0010; + + /** */ + private static final byte REMOVED_FLAG = 0b0100; + + /** */ + private static final byte EXPIRED_FLAG = 0b1000; /** Listeners. */ private final ConcurrentMap<UUID, CacheContinuousQueryListener<K, V>> lsnrs = new ConcurrentHashMap8<>(); @@ -59,9 +71,12 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K /** Query sequence number for message topic. */ private final AtomicLong seq = new AtomicLong(); -// /** Continues queries created for cache event listeners. */ -// private final ConcurrentMap<CacheEntryListenerConfiguration, CacheContinuousQuery<K, V>> lsnrQrys = -// new ConcurrentHashMap8<>(); + /** JCache listeners. */ + private final ConcurrentMap<CacheEntryListenerConfiguration, JCacheQuery> jCacheLsnrs = + new ConcurrentHashMap8<>(); + + /** Ordered topic prefix. */ + private String topicPrefix; /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { @@ -72,11 +87,11 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected void onKernalStart0() throws IgniteCheckedException { - Iterable<CacheEntryListenerConfiguration<K, V>> lsnrCfgs = cctx.config().getCacheEntryListenerConfigurations(); + Iterable<CacheEntryListenerConfiguration<K, V>> cfgs = cctx.config().getCacheEntryListenerConfigurations(); - if (lsnrCfgs != null) { - for (CacheEntryListenerConfiguration<K, V> cfg : lsnrCfgs) - registerCacheEntryListener(cfg, false); + if (cfgs != null) { + for (CacheEntryListenerConfiguration<K, V> cfg : cfgs) + executeJCacheQuery(cfg, true); } } @@ -84,22 +99,15 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K @Override protected void onKernalStop0(boolean cancel) { super.onKernalStop0(cancel); -// for (CacheEntryListenerConfiguration lsnrCfg : lsnrQrys.keySet()) { -// try { -// deregisterCacheEntryListener(lsnrCfg); -// } -// catch (IgniteCheckedException e) { -// if (log.isDebugEnabled()) -// log.debug("Failed to remove cache entry listener: " + e); -// } -// } - } - - /** - * @return New topic. - */ - public Object topic() { - return TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()); + for (JCacheQuery lsnr : jCacheLsnrs.values()) { + try { + lsnr.cancel(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to stop JCache entry listener: " + e.getMessage()); + } + } } /** @@ -109,16 +117,10 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @param newBytes New value bytes. * @param oldVal Old value. * @param oldBytes Old value bytes. - * @param preload {@code True} if entry is updated during preloading. * @throws IgniteCheckedException In case of error. */ - public void onEntryUpdate(GridCacheEntryEx<K, V> e, - K key, - @Nullable V newVal, - @Nullable GridCacheValueBytes newBytes, - V oldVal, - @Nullable GridCacheValueBytes oldBytes, - boolean preload) throws IgniteCheckedException { + public void onEntryUpdated(GridCacheEntryEx<K, V> e, K key, V newVal, GridCacheValueBytes newBytes, + V oldVal, GridCacheValueBytes oldBytes) throws IgniteCheckedException { assert e != null; assert key != null; @@ -147,12 +149,8 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K boolean primary = e.wrap(false).primary(); boolean recordIgniteEvt = !e.isInternal() && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) { -// if (preload && lsnr.entryListener()) -// continue; - - lsnr.onEntryUpdate(evt, primary, recordIgniteEvt); - } + for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); } /** @@ -161,10 +159,10 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K * @param oldVal Old value. * @param oldBytes Old value bytes. */ - public void onEntryExpired(GridCacheEntryEx<K, V> e, - K key, - V oldVal, - @Nullable GridCacheValueBytes oldBytes) { + public void onEntryExpired(GridCacheEntryEx<K, V> e, K key, V oldVal, GridCacheValueBytes oldBytes) { + assert e != null; + assert key != null; + if (e.isInternal()) return; @@ -179,106 +177,185 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().grid().jcache(cctx.name()), EXPIRED, e0); - for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) { -// if (!lsnr.entryListener()) -// continue; + for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) + lsnr.onEntryUpdated(evt, true, false); + } + } - lsnr.onEntryUpdate(evt, true, false); - } + /** + * @param locLsnr Local listener. + * @param rmtFilter Remote filter. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param grp Cluster group. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + public UUID executeQuery(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, + int bufSize, long timeInterval, boolean autoUnsubscribe, ClusterGroup grp) throws IgniteCheckedException { + return executeQuery0( + locLsnr, + rmtFilter, + bufSize, + timeInterval, + autoUnsubscribe, + false, + true, + false, + true, + grp); + } + + /** + * @param locLsnr Local listener. + * @param rmtFilter Remote filter. + * @param loc Local flag. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + public UUID executeInternalQuery(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, + boolean loc) throws IgniteCheckedException { + return executeQuery0( + locLsnr, + rmtFilter, + ContinuousQuery.DFLT_BUF_SIZE, + ContinuousQuery.DFLT_TIME_INTERVAL, + ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, + true, + true, + false, + true, + loc ? cctx.grid().forLocal() : null); + } + + public void cancelInternalQuery(UUID routineId) { + try { + cctx.kernalContext().continuous().stopRoutine(routineId).get(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to stop internal continuous query: " + e.getMessage()); } } /** - * @param lsnrCfg Listener configuration. - * @param addToCfg If {@code true} adds listener configuration to cache configuration. - * @throws IgniteCheckedException If failed. + * @param cfg Listener configuration. + * @param onStart Whether listener is created on node start. + * @throws IgniteCheckedException */ - @SuppressWarnings("unchecked") - public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg, boolean addToCfg) + public void executeJCacheQuery(CacheEntryListenerConfiguration<K, V> cfg, boolean onStart) throws IgniteCheckedException { -// GridCacheContinuousQueryAdapter<K, V> qry = null; -// -// try { -// A.notNull(lsnrCfg, "lsnrCfg"); -// -// Factory<CacheEntryListener<? super K, ? super V>> factory = lsnrCfg.getCacheEntryListenerFactory(); -// -// A.notNull(factory, "cacheEntryListenerFactory"); -// -// CacheEntryListener lsnr = factory.create(); -// -// A.notNull(lsnr, "lsnr"); -// -// IgniteCacheProxy<K, V> cache= cctx.kernalContext().cache().jcache(cctx.name()); -// -// EntryListenerCallback cb = new EntryListenerCallback(cache, lsnr); -// -// if (!(cb.create() || cb.update() || cb.remove() || cb.expire())) -// throw new IllegalArgumentException("Listener must implement one of CacheEntryListener sub-interfaces."); -// -// qry = (GridCacheContinuousQueryAdapter<K, V>)cctx.cache().queries().createContinuousQuery(); -// -// CacheContinuousQuery<K, V> old = lsnrQrys.putIfAbsent(lsnrCfg, qry); -// -// if (old != null) -// throw new IllegalArgumentException("Listener is already registered for configuration: " + lsnrCfg); -// -// qry.autoUnsubscribe(true); -// -// qry.bufferSize(1); -// -//// qry.localCallback(cb); -// -// EntryListenerFilter<K, V> fltr = new EntryListenerFilter<>(cb.create(), -// cb.update(), -// cb.remove(), -// cb.expire(), -// lsnrCfg.getCacheEntryEventFilterFactory(), -// cctx.kernalContext().grid(), -// cctx.name()); -// -//// qry.remoteFilter(fltr); -// -// qry.execute(null, false, true, lsnrCfg.isSynchronous(), lsnrCfg.isOldValueRequired()); -// -// if (addToCfg) -// cctx.config().addCacheEntryListenerConfiguration(lsnrCfg); -// } -// catch (IgniteCheckedException e) { -// lsnrQrys.remove(lsnrCfg, qry); // Remove query if failed to execute it. -// -// throw e; -// } + JCacheQuery lsnr = new JCacheQuery(cfg, onStart); + + JCacheQuery old = jCacheLsnrs.putIfAbsent(cfg, lsnr); + + if (old != null) + throw new IgniteCheckedException("Listener is already registered for configuration: " + cfg); + + try { + lsnr.execute(); + } + catch (IgniteCheckedException e) { + cancelJCacheQuery(cfg); + + throw e; + } } /** - * @param lsnrCfg Listener configuration. - * @throws IgniteCheckedException If failed. + * @param cfg Listener configuration. + * @throws IgniteCheckedException In case of error. */ - @SuppressWarnings("unchecked") - public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) throws IgniteCheckedException { - A.notNull(lsnrCfg, "lsnrCfg"); - -// CacheContinuousQuery<K, V> qry = lsnrQrys.remove(lsnrCfg); -// -// if (qry != null) { -// cctx.config().removeCacheEntryListenerConfiguration(lsnrCfg); -// -// qry.close(); -// } + public void cancelJCacheQuery(CacheEntryListenerConfiguration<K, V> cfg) throws IgniteCheckedException { + JCacheQuery lsnr = jCacheLsnrs.remove(cfg); + + if (lsnr != null) + lsnr.cancel(); + } + + /** + * @param locLsnr Local listener. + * @param rmtFilter Remote filter. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param internal Internal flag. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired event flag. + * @param grp Cluster group. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + private UUID executeQuery0(CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, + int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean oldValRequired, + boolean sync, boolean ignoreExpired, ClusterGroup grp) throws IgniteCheckedException { + cctx.checkSecurity(GridSecurityPermission.CACHE_READ); + + if (grp == null) + grp = cctx.kernalContext().grid(); + + Collection<ClusterNode> nodes = grp.nodes(); + + if (nodes.isEmpty()) + throw new ClusterTopologyException("Failed to execute continuous query (empty cluster group is " + + "provided)."); + + boolean skipPrimaryCheck = false; + + switch (cctx.config().getCacheMode()) { + case LOCAL: + if (!nodes.contains(cctx.localNode())) + throw new ClusterTopologyException("Continuous query for LOCAL cache can be executed " + + "only locally (provided projection contains remote nodes only)."); + else if (nodes.size() > 1) + U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " + + "ignored)."); + + grp = grp.forNode(cctx.localNode()); + + break; + + case REPLICATED: + if (nodes.size() == 1 && F.first(nodes).equals(cctx.localNode())) { + CacheDistributionMode distributionMode = cctx.config().getDistributionMode(); + + if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED) + skipPrimaryCheck = true; + } + + break; + } + + int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? + cctx.kernalContext().job().currentTaskNameHash() : 0; + + GridContinuousHandler hnd = new CacheContinuousQueryHandler<>( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilter, + internal, + oldValRequired, + sync, + ignoreExpired, + taskNameHash, + skipPrimaryCheck); + + return cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, + autoUnsubscribe, grp.predicate()).get(); } /** * @param lsnrId Listener ID. * @param lsnr Listener. * @param internal Internal flag. - * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. * @return Whether listener was actually registered. */ boolean registerListener(UUID lsnrId, CacheContinuousQueryListener<K, V> lsnr, - boolean internal, - boolean entryLsnr) { + boolean internal) { boolean added; if (internal) { @@ -324,295 +401,215 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K } /** - * */ - static class EntryListenerFilter<K1, V1> implements - IgnitePredicate<CacheContinuousQueryEntry<K1, V1>>, Externalizable { + private class JCacheQuery { /** */ - private static final long serialVersionUID = 0L; + private final CacheEntryListenerConfiguration<K, V> cfg; /** */ - private boolean create; + private final boolean onStart; /** */ - private boolean update; + private volatile UUID routineId; - /** */ - private boolean rmv; + /** + * @param cfg Listener configuration. + */ + private JCacheQuery(CacheEntryListenerConfiguration<K, V> cfg, boolean onStart) { + this.cfg = cfg; + this.onStart = onStart; + } - /** */ - private boolean expire; + /** + * @throws IgniteCheckedException In case of error. + */ + @SuppressWarnings("unchecked") + void execute() throws IgniteCheckedException { + if (!onStart) + cctx.config().addCacheEntryListenerConfiguration(cfg); + + CacheEntryListener<? super K, ? super V> locLsnrImpl = cfg.getCacheEntryListenerFactory().create(); + + if (locLsnrImpl == null) + throw new IgniteCheckedException("Local CacheEntryListener is mandatory and can't be null."); + + byte types = 0; + + types |= locLsnrImpl instanceof CacheEntryCreatedListener ? CREATED_FLAG : 0; + types |= locLsnrImpl instanceof CacheEntryUpdatedListener ? UPDATED_FLAG : 0; + types |= locLsnrImpl instanceof CacheEntryRemovedListener ? REMOVED_FLAG : 0; + types |= locLsnrImpl instanceof CacheEntryExpiredListener ? EXPIRED_FLAG : 0; + + if (types == 0) + throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces."); + + CacheEntryUpdatedListener<K, V> locLsnr = (CacheEntryUpdatedListener<K, V>)new JCacheQueryLocalListener<>( + locLsnrImpl); + + CacheEntryEventFilter<K, V> rmtFilter = (CacheEntryEventFilter<K, V>)new JCacheQueryRemoteFilter<>( + cfg.getCacheEntryEventFilterFactory().create(), types); + + routineId = executeQuery0( + locLsnr, + rmtFilter, + ContinuousQuery.DFLT_BUF_SIZE, + ContinuousQuery.DFLT_TIME_INTERVAL, + ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, + false, + cfg.isOldValueRequired(), + cfg.isSynchronous(), + false, + null); + } - /** */ - private Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory; + /** + * @throws IgniteCheckedException In case of error. + */ + @SuppressWarnings("unchecked") + void cancel() throws IgniteCheckedException { + UUID routineId0 = routineId; - /** */ - private CacheEntryEventFilter fltr; + assert routineId0 != null; - /** */ - @IgniteInstanceResource - private Ignite ignite; + cctx.kernalContext().continuous().stopRoutine(routineId0).get(); - /** */ - private IgniteCache cache; + cctx.config().removeCacheEntryListenerConfiguration(cfg); + } + } + /** + */ + private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> { /** */ - private String cacheName; + private CacheEntryListener<K, V> impl; /** - * + * @param impl Listener. */ - public EntryListenerFilter() { - // No-op. - } + private JCacheQueryLocalListener(CacheEntryListener<K, V> impl) { + assert impl != null; - /** - * @param create {@code True} if listens for create events. - * @param update {@code True} if listens for create events. - * @param rmv {@code True} if listens for remove events. - * @param expire {@code True} if listens for expire events. - * @param fltrFactory Filter factory. - * @param ignite Ignite instance. - * @param cacheName Cache name. - */ - EntryListenerFilter( - boolean create, - boolean update, - boolean rmv, - boolean expire, - Factory<CacheEntryEventFilter<? super K1, ? super V1>> fltrFactory, - Ignite ignite, - @Nullable String cacheName) { - this.create = create; - this.update = update; - this.rmv = rmv; - this.expire = expire; - this.fltrFactory = fltrFactory; - this.ignite = ignite; - this.cacheName = cacheName; - - if (fltrFactory != null) - fltr = fltrFactory.create(); - - cache = ignite.jcache(cacheName); - - assert cache != null : cacheName; + this.impl = impl; } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public boolean apply(CacheContinuousQueryEntry<K1, V1> entry) { - return false; - -// try { -// EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType()); -// -// switch (evtType) { -// case EXPIRED: -// if (!expire) -// return false; -// -// break; -// -// case REMOVED: -// if (!rmv) -// return false; -// -// break; -// -// case CREATED: -// if (!create) -// return false; -// -// break; -// -// case UPDATED: -// if (!update) -// return false; -// -// break; -// -// default: -// assert false : evtType; -// } -// -// if (fltr == null) -// return true; -// -// if (cache == null) { -// cache = ignite.jcache(cacheName); -// -// assert cache != null : cacheName; -// } -// -// return fltr.evaluate(new CacheEntryEvent(cache, evtType, entry)); -// } -// catch (Exception e) { -// LT.warn(ignite.log(), e, "Cache entry event filter error: " + e); -// -// return true; -// } - } + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + for (CacheEntryEvent<? extends K, ? extends V> evt : evts) { + switch (evt.getEventType()) { + case CREATED: + assert impl instanceof CacheEntryCreatedListener; - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeBoolean(create); + ((CacheEntryCreatedListener<K, V>)impl).onCreated(singleton(evt)); - out.writeBoolean(update); + break; - out.writeBoolean(rmv); + case UPDATED: + assert impl instanceof CacheEntryUpdatedListener; - out.writeBoolean(expire); + ((CacheEntryUpdatedListener<K, V>)impl).onUpdated(singleton(evt)); - U.writeString(out, cacheName); + break; - out.writeObject(fltrFactory); - } + case REMOVED: + assert impl instanceof CacheEntryRemovedListener; - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - create = in.readBoolean(); + ((CacheEntryRemovedListener<K, V>)impl).onRemoved(singleton(evt)); - update = in.readBoolean(); + break; - rmv = in.readBoolean(); + case EXPIRED: + assert impl instanceof CacheEntryExpiredListener; - expire = in.readBoolean(); + ((CacheEntryExpiredListener<K, V>)impl).onExpired(singleton(evt)); - cacheName = U.readString(in); + break; - fltrFactory = (Factory<CacheEntryEventFilter<? super K1, ? super V1>>)in.readObject(); + default: + throw new IllegalStateException("Unknown type: " + evt.getEventType()); + } + } + } - if (fltrFactory != null) - fltr = fltrFactory.create(); + /** + * @param evt Event. + * @return Singleton iterable. + */ + private Iterable<CacheEntryEvent<? extends K, ? extends V>> singleton( + CacheEntryEvent<? extends K, ? extends V> evt) { + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(1); + + evts.add(evt); + + return evts; } } /** - * */ - private class EntryListenerCallback implements - IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> { - /** */ - private final IgniteCacheProxy<K, V> cache; - - /** */ - private final CacheEntryCreatedListener createLsnr; - + private static class JCacheQueryRemoteFilter<K, V> implements CacheEntryEventFilter<K, V>, Externalizable { /** */ - private final CacheEntryUpdatedListener updateLsnr; + private CacheEntryEventFilter<K, V> impl; /** */ - private final CacheEntryRemovedListener rmvLsnr; - - /** */ - private final CacheEntryExpiredListener expireLsnr; + private byte types; /** - * @param cache Cache to be used as event source. - * @param lsnr Listener. + * For {@link Externalizable}. */ - EntryListenerCallback(IgniteCacheProxy<K, V> cache, CacheEntryListener lsnr) { - this.cache = cache; - - createLsnr = lsnr instanceof CacheEntryCreatedListener ? (CacheEntryCreatedListener)lsnr : null; - updateLsnr = lsnr instanceof CacheEntryUpdatedListener ? (CacheEntryUpdatedListener)lsnr : null; - rmvLsnr = lsnr instanceof CacheEntryRemovedListener ? (CacheEntryRemovedListener)lsnr : null; - expireLsnr = lsnr instanceof CacheEntryExpiredListener ? (CacheEntryExpiredListener)lsnr : null; + public JCacheQueryRemoteFilter() { + // no-op. } /** - * @return {@code True} if listens for create event. + * @param impl Filter. + * @param types Types. */ - boolean create() { - return createLsnr != null; - } + JCacheQueryRemoteFilter(CacheEntryEventFilter<K, V> impl, byte types) { + assert types != 0; - /** - * @return {@code True} if listens for update event. - */ - boolean update() { - return updateLsnr != null; + this.impl = impl; + this.types = types; } - /** - * @return {@code True} if listens for remove event. - */ - boolean remove() { - return rmvLsnr != null; + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> evt) { + return (types & flag(evt.getEventType())) != 0 && (impl == null || impl.evaluate(evt)); } - /** - * @return {@code True} if listens for expire event. - */ - boolean expire() { - return expireLsnr != null; + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(impl); + out.writeByte(types); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public boolean apply(UUID uuid, - Collection<CacheContinuousQueryEntry<K, V>> entries) { -// for (CacheContinuousQueryEntry entry : entries) { -// try { -// EventType evtType = (((GridCacheContinuousQueryEntry)entry).eventType()); -// -// switch (evtType) { -// case EXPIRED: { -// assert expireLsnr != null; -// -// CacheEntryEvent evt0 = -// new CacheEntryEvent(cache, EXPIRED, entry); -// -// expireLsnr.onExpired(Collections.singleton(evt0)); -// -// break; -// } -// -// case REMOVED: { -// assert rmvLsnr != null; -// -// CacheEntryEvent evt0 = -// new CacheEntryEvent(cache, REMOVED, entry); -// -// rmvLsnr.onRemoved(Collections.singleton(evt0)); -// -// break; -// } -// -// case UPDATED: { -// assert updateLsnr != null; -// -// CacheEntryEvent evt0 = -// new CacheEntryEvent(cache, UPDATED, entry); -// -// updateLsnr.onUpdated(Collections.singleton(evt0)); -// -// break; -// } -// -// case CREATED: { -// assert createLsnr != null; -// -// CacheEntryEvent evt0 = -// new CacheEntryEvent(cache, CREATED, entry); -// -// createLsnr.onCreated(Collections.singleton(evt0)); -// -// break; -// } -// -// default: -// assert false : evtType; -// } -// } -// catch (CacheEntryListenerException e) { -// LT.warn(log, e, "Cache entry listener error: " + e); -// } -// } - - return true; + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + impl = (CacheEntryEventFilter<K, V>)in.readObject(); + types = in.readByte(); + } + + /** + * @param evtType Type. + * @return Flag value. + */ + private byte flag(EventType evtType) { + switch (evtType) { + case CREATED: + return CREATED_FLAG; + + case UPDATED: + return UPDATED_FLAG; + + case REMOVED: + return REMOVED_FLAG; + + case EXPIRED: + return EXPIRED_FLAG; + + default: + throw new IllegalStateException("Unknown type: " + evtType); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 10f5d36..faa8012 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.service; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; @@ -41,8 +40,6 @@ import org.apache.ignite.thread.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; -import javax.cache.*; -import javax.cache.event.CacheEntryEvent; import javax.cache.event.*; import java.util.*; import java.util.concurrent.*; @@ -89,11 +86,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** Topology listener. */ private GridLocalEventListener topLsnr = new TopologyListener(); - /** Deployment listener. */ - private QueryCursor<Cache.Entry<Object, Object>> cfgQryCur; + /** Deployment listener ID. */ + private UUID cfgQryId; - /** Assignment listener. */ - private QueryCursor<Cache.Entry<Object, Object>> assignQryCur; + /** Assignment listener ID. */ + private UUID assignQryId; /** * @param ctx Kernal context. @@ -122,7 +119,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (ctx.isDaemon()) return; - cache = (GridCacheProjectionEx<Object, Object>)ctx.cache().utilityCache(); + cache = ctx.cache().utilityCache(); ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY); @@ -130,19 +127,9 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (ctx.deploy().enabled()) ctx.cache().context().deploy().ignoreOwnership(true); - IgniteCache<Object, Object> jCache = ctx.cache().utilityJCache(); - - ContinuousQuery<Object, Object> cfgQry = Query.continuous(); - - cfgQry.setLocalListener(new DeploymentListener()); - - cfgQryCur = jCache.localQuery(cfgQry); - - ContinuousQuery<Object, Object> assignQry = Query.continuous(); - - assignQry.setLocalListener(new AssignmentListener()); - - assignQryCur = jCache.localQuery(assignQry); + cfgQryId = cache.context().continuousQueries().executeInternalQuery(new DeploymentListener(), null, true); + assignQryId = cache.context().continuousQueries().executeInternalQuery( + new AssignmentListener(), null, true); } finally { if (ctx.deploy().enabled()) @@ -175,11 +162,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { ctx.event().removeLocalEventListener(topLsnr); - if (cfgQryCur != null) - cfgQryCur.close(); + if (cfgQryId != null) + cache.context().continuousQueries().cancelInternalQuery(cfgQryId); - if (assignQryCur != null) - assignQryCur.close(); + if (assignQryId != null) + cache.context().continuousQueries().cancelInternalQuery(assignQryId); Collection<ServiceContextImpl> ctxs = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3520bfde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java index da7f47b..6b2d302 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.jobtracker; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; import org.apache.ignite.events.*; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.*; @@ -38,7 +37,6 @@ import org.jdk8.backport.*; import org.jetbrains.annotations.*; import javax.cache.event.*; -import javax.cache.event.CacheEntryEvent; import javax.cache.expiry.*; import javax.cache.processor.*; import java.io.*; @@ -84,6 +82,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** Component busy lock. */ private GridSpinReadWriteLock busyLock; + /** Job meta query ID. */ + private UUID jobMetaQryId; + /** Closure to check result of async transform of system cache. */ private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> gridFut) { @@ -138,8 +139,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent { ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy( new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl())); - finishedJobMetaPrj = ((GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>)prj). - withExpiryPolicy(finishedJobPlc); + finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc); } else finishedJobMetaPrj = jobMetaPrj; @@ -172,30 +172,29 @@ public class GridHadoopJobTracker extends GridHadoopComponent { @Override public void onKernalStart() throws IgniteCheckedException { super.onKernalStart(); - ContinuousQuery<GridHadoopJobId, GridHadoopJobMetadata> qry = Query.continuous(); + jobMetaQryId = jobMetaCache().context().continuousQueries().executeInternalQuery( + new CacheEntryUpdatedListener<GridHadoopJobId, GridHadoopJobMetadata>() { + @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends GridHadoopJobId, + ? extends GridHadoopJobMetadata>> evts) { + if (!busyLock.tryReadLock()) + return; - qry.setLocalListener(new CacheEntryUpdatedListener<GridHadoopJobId, GridHadoopJobMetadata>() { - @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends GridHadoopJobId, - ? extends GridHadoopJobMetadata>> evts) { - if (!busyLock.tryReadLock()) - return; - - try { - // Must process query callback in a separate thread to avoid deadlocks. - evtProcSvc.submit(new EventHandler() { - @Override protected void body() throws IgniteCheckedException { - processJobMetadataUpdates(evts); - } - }); - } - finally { - busyLock.readUnlock(); + try { + // Must process query callback in a separate thread to avoid deadlocks. + evtProcSvc.submit(new EventHandler() { + @Override protected void body() throws IgniteCheckedException { + processJobMetadataUpdates(evts); + } + }); + } + finally { + busyLock.readUnlock(); + } } - } - }); - - // TODO: Filter by type - ctx.kernalContext().cache().utilityJCache().localQuery(qry); + }, + null, + true + ); ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { @Override public void onEvent(final Event evt) { @@ -228,6 +227,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent { // Fail all pending futures. for (GridFutureAdapter<GridHadoopJobId> fut : activeFinishFuts.values()) fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping).")); + + jobMetaCache().context().continuousQueries().cancelInternalQuery(jobMetaQryId); } /**