IGNITE-2004 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9c81590 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9c81590 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9c81590 Branch: refs/heads/ignite-2004 Commit: a9c81590dcde6be6c7a7f25c0e3d5edef08d0252 Parents: 3fad0ab Author: nikolay_tikhonov <[email protected]> Authored: Fri Apr 8 20:37:59 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Fri Apr 8 20:37:59 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 5 +- .../processors/cache/GridCacheMapEntry.java | 10 +- .../cache/GridCacheUpdateAtomicResult.java | 15 +- .../dht/atomic/GridDhtAtomicCache.java | 29 +-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 23 +- .../distributed/near/GridNearAtomicCache.java | 1 + .../continuous/CacheContinuousQueryClosure.java | 33 --- .../continuous/CacheContinuousQueryHandler.java | 218 +++++++------------ .../CacheContinuousQueryListener.java | 9 +- .../continuous/CacheContinuousQueryManager.java | 29 +-- 10 files changed, 120 insertions(+), 252 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 3a7b5ec..8270c21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -24,6 +24,7 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.eviction.EvictableEntry; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; @@ -454,6 +455,7 @@ public interface GridCacheEntryEx { * @param subjId Subject ID initiated this update. * @param taskName Task name. * @param updateCntr Update counter. + * @param fut Dht atomic future. * @return Tuple where first value is flag showing whether operation succeeded, * second value is old entry value if return value is requested, third is updated entry value, * fourth is the version to enqueue for deferred delete the fifth is DR conflict context @@ -489,7 +491,8 @@ public interface GridCacheEntryEx { @Nullable UUID subjId, String taskName, @Nullable CacheObject prevVal, - @Nullable Long updateCntr + @Nullable Long updateCntr, + @Nullable IgniteInternalFuture fut ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 061da27..921be85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -35,6 +35,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.eviction.EvictableEntry; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; @@ -47,7 +48,6 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtr import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -1246,6 +1246,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme true, false, updateCntr0, + null, topVer); } @@ -1444,6 +1445,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme true, false, updateCntr0, + null, topVer); } @@ -1821,6 +1823,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme true, false, updateCntr, + null, AffinityTopologyVersion.NONE); } @@ -1870,7 +1873,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable UUID subjId, String taskName, @Nullable CacheObject prevVal, - @Nullable Long updateCntr + @Nullable Long updateCntr, + @Nullable IgniteInternalFuture fut ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic(); @@ -1898,8 +1902,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Long updateCntr0 = null; - List<CacheContinuousQueryClosure> clsrs = null; - synchronized (this) { boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index a96675b..10d37d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache; import java.util.List; import javax.cache.processor.EntryProcessor; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -65,9 +64,6 @@ public class GridCacheUpdateAtomicResult { /** Value computed by entry processor. */ private IgniteBiTuple<Object, Exception> res; - /** Continuous query closures. */ - private List<CacheContinuousQueryClosure> cntQryClsrs; - /** * Constructor. * @@ -91,8 +87,7 @@ public class GridCacheUpdateAtomicResult { @Nullable GridCacheVersion rmvVer, @Nullable GridCacheVersionConflictContext<?, ?> conflictRes, boolean sndToDht, - long updateCntr, - List<CacheContinuousQueryClosure> cntQryClsrs + long updateCntr ) { this.success = success; this.oldVal = oldVal; @@ -104,7 +99,6 @@ public class GridCacheUpdateAtomicResult { this.conflictRes = conflictRes; this.sndToDht = sndToDht; this.updateCntr = updateCntr; - this.cntQryClsrs = cntQryClsrs; } /** @@ -178,13 +172,6 @@ public class GridCacheUpdateAtomicResult { return sndToDht; } - /** - * @return Continuous query closures. - */ - public List<CacheContinuousQueryClosure> continuousQueryClosures() { - return cntQryClsrs; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheUpdateAtomicResult.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4713729..9768243 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -76,7 +76,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -2171,7 +2170,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName, null, - null); + null, + dhtFut); if (dhtFut == null && !F.isEmpty(filteredReaders)) { dhtFut = createDhtFuture(ver, req, res, completionCb, true); @@ -2199,8 +2199,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { newConflictVer, sndPrevVal, updRes.oldValue(), - updRes.updateCounter(), - updRes.continuousQueryClosures()); + updRes.updateCounter()); } if (!F.isEmpty(filteredReaders)) @@ -2217,10 +2216,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); } } - else if (lsnrs != null && updRes.continuousQueryClosures() != null) { - for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures()) - clsr.onEntryUpdate(); - } if (hasNear) { if (primary && updRes.sendToDht()) { @@ -2465,7 +2460,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName, null, - null); + null, + dhtFut); assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null : "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry; @@ -2508,8 +2504,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, sndPrevVal, updRes.oldValue(), - updRes.updateCounter(), - updRes.continuousQueryClosures()); + updRes.updateCounter()); } if (!F.isEmpty(filteredReaders)) @@ -2520,10 +2515,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); } - else if (lsnrs != null && updRes.continuousQueryClosures() != null) { - for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures()) - clsr.onEntryUpdate(); - } if (hasNear) { if (primary) { @@ -2960,16 +2951,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName, prevVal, - updateIdx); + updateIdx, + null); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); - if (lsnrs != null && updRes.continuousQueryClosures() != null) { - for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures()) - clsr.onEntryUpdate(); - } - entry.onUnlock(); break; // While. http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 32d10c1..73a2ede 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -104,9 +103,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Response count. */ private volatile int resCnt; - /** Continuous query closures. */ - private List<CacheContinuousQueryClosure> contQryClsrs; - /** * @param cctx Cache context. * @param completionCb Callback to invoke when future is completed. @@ -221,7 +217,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> * @param addPrevVal If {@code true} sends previous value to backups. * @param prevVal Previous value. * @param updateCntr Partition update counter. - * @param clsrs */ public void addWriteEntry(GridDhtCacheEntry entry, @Nullable CacheObject val, @@ -231,7 +226,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> @Nullable GridCacheVersion conflictVer, boolean addPrevVal, @Nullable CacheObject prevVal, - long updateCntr, List<CacheContinuousQueryClosure> clsrs) { + long updateCntr) { AffinityTopologyVersion topVer = updateReq.topologyVersion(); Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); @@ -243,13 +238,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> keys.add(entry.key()); - if (clsrs != null) { - if (contQryClsrs == null) - contQryClsrs = new ArrayList<>(keys.size()); - - contQryClsrs.addAll(clsrs); - } - for (ClusterNode node : dhtNodes) { UUID nodeId = node.id(); @@ -356,15 +344,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> if (err != null) { for (KeyCacheObject key : keys) updateRes.addFailedKey(key, err); - - if (contQryClsrs != null) - for (CacheContinuousQueryClosure clsr : contQryClsrs) - clsr.skipEvent(); - } - else { - if (contQryClsrs != null) - for (CacheContinuousQueryClosure clsr : contQryClsrs) - clsr.onEntryUpdate(); } if (updateReq.writeSynchronizationMode() == FULL_SYNC) http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 5bb9aaa..e0c7187 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -271,6 +271,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { subjId, taskName, null, + null, null); if (updRes.removeVersion() != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java deleted file mode 100644 index 3fd9e57..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -/** - * Continuous query closure. - */ -public interface CacheContinuousQueryClosure extends Runnable { - /** - * Callback for case when future completed successfully. - */ - public void onEntryUpdate(); - - /** - * Callback for case when future completed with error.. - */ - public void skipEvent(); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 56c02d6..3ecac40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -52,6 +52,7 @@ import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; @@ -170,6 +171,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** */ private transient boolean asyncCallback; + /** */ + private transient UUID nodeId; + + /** */ + private transient UUID routineId; + + /** */ + private transient GridKernalContext ctx; + + /** */ + private transient IgniteLogger log; + /** * Required by {@link Externalizable}. */ @@ -317,10 +330,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler rcvs = new ConcurrentHashMap<>(); + this.nodeId = nodeId; + + this.routineId = routineId; + + this.ctx = ctx; + final boolean loc = nodeId.equals(ctx.localNodeId()); assert !skipPrimaryCheck || loc; + log = ctx.log(CacheContinuousQueryHandler.class); + CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { @Override public void onExecution() { if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -346,47 +367,32 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return keepBinary; } - @Override public CacheContinuousQueryClosure onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, + @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt, - boolean fireEvent) { + boolean fireEvent, + IgniteInternalFuture<?> fut) { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) - return null; + return ; final GridCacheContext<K, V> cctx = cacheContext(ctx); // Check that cache stopped. if (cctx == null) - return null; - - final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); + return; // skipPrimaryCheck is set only when listen locally for replicated cache events. assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); - final ContinuousQueryClosureImpl clsr = new ContinuousQueryClosureImpl(taskName(), - recordIgniteEvt, - routineId, - nodeId, - ctx, - loc, - primary, - cctx, - filter, - evt, - fireEvent, - cache); - - if (!asyncCallback) { - clsr.filter(); - - if (fireEvent) - clsr.onEntryUpdate(); - } - else - ctx.continuousQueryPool().execute(clsr, evt.partitionId()); + if (asyncCallback) { + ContinuousQueryClosureImpl clsr = new ContinuousQueryClosureImpl( + primary, + evt, + recordIgniteEvt, + fut); - return clsr; + ctx.continuousQueryPool().execute(clsr, evt.partitionId()); + } } @Override public void onUnregister() { @@ -432,15 +438,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx); } - @Override public CacheContinuousQueryClosure skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, - AffinityTopologyVersion topVer, boolean primary, boolean fireEvnt) { + @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, + AffinityTopologyVersion topVer, boolean primary, boolean fireEvt) { assert evt != null; CacheContinuousQueryEntry e = evt.entry(); e.markFiltered(); - return onEntryUpdated(evt, primary, false, fireEvnt); + onEntryUpdated(evt, primary, false, fireEvt, null); } @Override public void onPartitionEvicted(int part) { @@ -1249,101 +1255,53 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** * */ - private class ContinuousQueryClosureImpl implements CacheContinuousQueryClosure { - /** */ - private final IgniteCache cache; - - /** */ - private final IgniteLogger log; - - /** */ - private final boolean fireEvent; - + private class ContinuousQueryClosureImpl implements Runnable { /** */ private CacheContinuousQueryEvent<K, V> evt; /** */ - private CacheEntryEventFilter filter; - - /** */ - private final GridCacheContext<K, V> cctx; - - /** */ private boolean primary; /** */ - private boolean loc; - - /** */ - private GridKernalContext ctx; - - /** */ - private UUID nodeId; - - /** */ - private UUID routineId; - - /** */ - private boolean recordIgniteEvt; - - /** */ - private final String taskName; - - /** */ private boolean notify; /** */ - private boolean backup; + private boolean recordIgniteEvt; /** */ - private final CountDownLatch latch = new CountDownLatch(1); + private IgniteInternalFuture<?> fut; /** - * @param taskName Task name. - * @param recordIgniteEvt Fired event. - * @param routineId Routine id. - * @param nodeId Node id. - * @param ctx Kernal context. - * @param loc Local. * @param primary Primary flag. - * @param cctx Cache context. - * @param filter Filter. * @param evt Event. - * @param fireEvent Immediately fire event. - * @param cache Cache. + * @param recordIgniteEvt Fired event. */ - ContinuousQueryClosureImpl(String taskName, - boolean recordIgniteEvt, - UUID routineId, - UUID nodeId, - GridKernalContext ctx, - boolean loc, + ContinuousQueryClosureImpl( boolean primary, - GridCacheContext<K, V> cctx, - CacheEntryEventFilter filter, CacheContinuousQueryEvent<K, V> evt, - boolean fireEvent, IgniteCache cache) { - this.taskName = taskName; - this.recordIgniteEvt = recordIgniteEvt; - this.routineId = routineId; - this.nodeId = nodeId; - this.ctx = ctx; - this.loc = loc; + boolean recordIgniteEvt, + IgniteInternalFuture<?> fut) { this.primary = primary; - this.cctx = cctx; - this.filter = filter; this.evt = evt; - this.cache = cache; - this.fireEvent = fireEvent; - - log = ctx.log(CacheContinuousQueryHandler.class); + this.recordIgniteEvt = recordIgniteEvt; + this.fut = fut; } /** {@inheritDoc} */ @Override public void run() { - filter(); + if (!filter()) + return; - if (fireEvent || waitIfAsync()) + if (fut != null) { + if (waitIfAsync()) + onEntryUpdate0(); + else { + evt.entry().markFiltered(); + + onEntryUpdate0(); + } + } + else onEntryUpdate0(); } @@ -1358,49 +1316,29 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @return {@code False} if filter sync. */ private boolean waitIfAsync() { - if (backup) - return false; - try { - U.await(latch); + fut.get(); } - catch (IgniteInterruptedCheckedException e) { - log.error("Failed to wait latch."); + catch (IgniteCheckedException e) { + return false; } return true; } - /** {@inheritDoc} */ - @Override public void skipEvent() { - if (evt != null && evt.entry() != null) - evt.entry().markFiltered(); - - onEntryUpdate(); - } - - /** {@inheritDoc} */ - @Override public void onEntryUpdate() { - if (backup) - return; - - if (!fireEvent && asyncCallback) { - latch.countDown(); - - return; - } - - onEntryUpdate0(); - } - /** * */ private void onEntryUpdate0() { try { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx == null) + return; + final CacheContinuousQueryEntry entry = evt.entry(); - if (loc) { + if (routineId.equals(nodeId)) { if (!locCache) { T2<Collection<CacheEntryEvent<? extends K, ? extends V>>, PartitionRecovery> events = handleEvent(ctx, entry, asyncCallback); @@ -1454,11 +1392,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler null, null, null, - filter instanceof CacheEntryEventSerializableFilter ? - (CacheEntryEventSerializableFilter)filter : null, + getEventFilter() instanceof CacheEntryEventSerializableFilter ? + (CacheEntryEventSerializableFilter)getEventFilter() : null, null, nodeId, - taskName, + taskName(), evt.getKey(), evt.getValue(), evt.getOldValue(), @@ -1468,16 +1406,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** - * + * @return {@code True} if event happen on primary node otherwise {@code false}. */ - public void filter() { + public boolean filter() { CacheContinuousQueryEntry entry = evt.entry(); notify = !entry.isFiltered(); try { - if (notify && filter != null) - notify = filter.evaluate(evt); + if (notify && getEventFilter() != null) + notify = getEventFilter().evaluate(evt); } catch (Exception e) { U.error(log, "CacheEntryEventFilter failed: " + e); @@ -1496,8 +1434,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - backup = true; + return false; } + + return true; + } + + private String taskName() { + return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 3aefafe..e86ec47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.util.Map; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.jetbrains.annotations.Nullable; /** * Continuous query listener. @@ -37,9 +39,10 @@ public interface CacheContinuousQueryListener<K, V> { * @param primary Primary flag. * @param recordIgniteEvt Whether to record event. * @param fireEvent Immediately fired events. + * @param fut Dht atomic future. */ - public CacheContinuousQueryClosure onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, - boolean recordIgniteEvt, boolean fireEvent); + public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, + boolean recordIgniteEvt, boolean fireEvent, @Nullable IgniteInternalFuture<?> fut); /** * Listener unregistered callback. @@ -71,7 +74,7 @@ public interface CacheContinuousQueryListener<K, V> { * @param topVer Topology version. * @param primary Primary */ - public CacheContinuousQueryClosure skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, + public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary, boolean fireEvnt); /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 12819c9..f6ab8b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -47,6 +47,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cluster.ClusterNode; @@ -172,7 +173,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param fireEvnt * @param topVer Topology version. */ - public List<CacheContinuousQueryClosure> skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, + public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, KeyCacheObject key, int partId, long updCntr, @@ -181,8 +182,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { AffinityTopologyVersion topVer) { assert lsnrs != null; - List<CacheContinuousQueryClosure> clsrs = new ArrayList<>(lsnrs.size()); - for (CacheContinuousQueryListener lsnr : lsnrs.values()) { CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( cctx.cacheId(), @@ -198,10 +197,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - clsrs.add(lsnr.skipUpdateEvent(evt, topVer, primary, fireEvnt)); + lsnr.skipUpdateEvent(evt, topVer, primary, fireEvnt); } - - return clsrs; } /** @@ -234,6 +231,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param primary {@code True} if called on primary node. * @param preload Whether update happened during preloading. * @param updateCntr Update counter. + * @param fut Dht atomic future. * @param topVer Topology version. * @throws IgniteCheckedException In case of error. */ @@ -247,6 +245,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean fireEvnt, boolean preload, long updateCntr, + @Nullable IgniteInternalFuture<?> fut, AffinityTopologyVersion topVer) throws IgniteCheckedException { Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload); @@ -262,6 +261,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { fireEvnt, preload, updateCntr, + fut, topVer); } } @@ -278,9 +278,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param preload Whether update happened during preloading. * @param updateCntr Update counter. * @param topVer Topology version. + * @param fut Dht atomic future. * @throws IgniteCheckedException In case of error. */ - public List<CacheContinuousQueryClosure> onEntryUpdated( + public void onEntryUpdated( Map<UUID, CacheContinuousQueryListener> lsnrCol, KeyCacheObject key, CacheObject newVal, @@ -291,6 +292,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean fireEvnt, boolean preload, long updateCntr, + @Nullable IgniteInternalFuture<?> fut, AffinityTopologyVersion topVer) throws IgniteCheckedException { @@ -301,7 +303,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean hasOldVal = oldVal != null; if (!hasNewVal && !hasOldVal) - return skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, fireEvnt, topVer); + skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, fireEvnt, topVer); EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED; @@ -309,8 +311,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - List<CacheContinuousQueryClosure> clsrs = new ArrayList<>(lsnrCol.size()); - for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { if (preload && !lsnr.notifyExisting()) continue; @@ -343,13 +343,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - CacheContinuousQueryClosure clsr = lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fireEvnt); - - if (clsr != null) - clsrs.add(clsr); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fireEvnt, fut); } - - return clsrs; } /** @@ -402,7 +397,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, true); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, true, null); } } }
