IGNITE-2004 Added tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/94e301d8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/94e301d8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/94e301d8 Branch: refs/heads/ignite-2004 Commit: 94e301d8cd06795f255784e4c903230559c754bc Parents: 3f2c2e6 Author: nikolay_tikhonov <[email protected]> Authored: Fri Apr 1 20:15:31 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Fri Apr 1 20:15:31 2016 +0300 ---------------------------------------------------------------------- .../ignite/cache/query/CacheAsyncCallback.java | 36 -- .../ignite/cache/query/ContinuousQuery.java | 9 +- .../processors/cache/GridCacheMapEntry.java | 17 +- .../cache/GridCacheUpdateAtomicResult.java | 29 +- .../dht/atomic/GridDhtAtomicCache.java | 59 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 123 +--- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 35 +- .../continuous/CacheContinuousQueryClosure.java | 33 ++ .../continuous/CacheContinuousQueryEvent.java | 17 - .../continuous/CacheContinuousQueryHandler.java | 464 +++++++-------- .../CacheContinuousQueryListener.java | 16 +- .../continuous/CacheContinuousQueryManager.java | 166 +----- .../apache/ignite/lang/IgniteAsyncCallback.java | 39 ++ ...eContinuousQueryAsyncFilterListenerTest.java | 8 +- ...ryFactoryAsyncFilterRandomOperationTest.java | 5 +- ...usQueryFactoryFilterRandomOperationTest.java | 7 +- .../CacheContinuousQueryOrderingEventTest.java | 558 +++++++++++++++++++ ...ridCacheContinuousQueryAbstractSelfTest.java | 8 + .../IgniteCacheQuerySelfTestSuite4.java | 2 +- 19 files changed, 997 insertions(+), 634 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/cache/query/CacheAsyncCallback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheAsyncCallback.java deleted file mode 100644 index a67b369..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheAsyncCallback.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.query; - -import javax.cache.configuration.Factory; -import javax.cache.event.CacheEntryEventFilter; -import javax.cache.event.CacheEntryListener; -import org.apache.ignite.configuration.IgniteConfiguration; - -/** - * Marker interface. If {@link CacheEntryEventFilter filter} or {@link CacheEntryListener} - * implementations extend this interface then they will be executing on a separate thread pool. It allows - * to use cache API in a callbacks. - * <p> - * Thread pool which will be used for it can be configured by - * {@link IgniteConfiguration#setContinuousQueryPoolSize(int)} - * - * @see ContinuousQuery#setRemoteFilterFactory(Factory) - */ -public interface CacheAsyncCallback { -} http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index 1b6c16e..cb5b05e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -23,6 +23,7 @@ import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.lang.IgniteAsyncCallback; /** * API for configuring continuous cache queries. @@ -174,8 +175,8 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * synchronization or transactional cache operations), should be executed asynchronously without * blocking the thread that called the callback. Otherwise, you can get deadlocks. * * <p> - * If listener implements {@link CacheAsyncCallback} marker interface then cache operations are allowed. - * see {@link CacheAsyncCallback}. + * If listener implements {@link IgniteAsyncCallback} marker interface then cache operations are allowed. + * see {@link IgniteAsyncCallback}. * * @param locLsnr Local callback. * @return {@code this} for chaining. @@ -231,8 +232,8 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * (e.g., synchronization or transactional cache operations), should be executed asynchronously * without blocking the thread that called the filter. Otherwise, you can get deadlocks. * <p> - * If filter implements {@link CacheAsyncCallback} marker interface then cache operations are allowed. - * see {@link CacheAsyncCallback}. + * If filter implements {@link IgniteAsyncCallback} marker interface then cache operations are allowed. + * see {@link IgniteAsyncCallback}. * * @param rmtFilterFactory Key-value filter factory. * @return {@code this} for chaining. http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 2d58b15..2b81484 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.cache; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; @@ -46,6 +48,8 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtr import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -1237,6 +1241,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme internal, partition(), tx.local(), + true, false, updateCntr0, topVer); @@ -1434,6 +1439,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme internal, partition(), tx.local(), + true, false, updateCntr0, topVer); @@ -1808,6 +1814,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme internal, partition(), true, + true, false, updateCntr, AffinityTopologyVersion.NONE); @@ -1887,7 +1894,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Long updateCntr0 = null; - Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterRes = null; + List<CacheContinuousQueryClosure> clsrs = null; synchronized (this) { boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); @@ -2082,6 +2089,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme isInternal() || !context().userCache(), partition(), primary, + true, false, updateCntr0, topVer); @@ -2512,8 +2520,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(updated0)); } - filterRes = cctx.continuousQueries().filterEntry(lsnrs, key, evtVal, evtOldVal, partition(), false, - updateCntr0, topVer); + clsrs = cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal, + partition(), primary, false, false, updateCntr0, topVer); } cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary); @@ -2542,7 +2550,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme conflictCtx, true, updateCntr0 == null ? 0 : updateCntr0, - filterRes); + clsrs); } /** @@ -3316,6 +3324,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme this.isInternal() || !this.context().userCache(), this.partition(), true, + true, preload, updateCntr, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 11e7949..cbd9707 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -17,10 +17,9 @@ package org.apache.ignite.internal.processors.cache; -import java.util.Map; +import java.util.List; import javax.cache.processor.EntryProcessor; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -63,12 +62,12 @@ public class GridCacheUpdateAtomicResult { /** */ private final long updateCntr; - /** */ - Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> conQryFltrs; - /** Value computed by entry processor. */ private IgniteBiTuple<Object, Exception> res; + /** Continuous query closures. */ + private List<CacheContinuousQueryClosure> cntQryClsrs; + /** * Constructor. * @@ -82,7 +81,6 @@ public class GridCacheUpdateAtomicResult { * @param conflictRes DR resolution result. * @param sndToDht Whether update should be propagated to DHT node. * @param updateCntr Partition update counter. - * @param conQryFltrs Continuous query */ public GridCacheUpdateAtomicResult(boolean success, @Nullable CacheObject oldVal, @@ -94,7 +92,7 @@ public class GridCacheUpdateAtomicResult { @Nullable GridCacheVersionConflictContext<?, ?> conflictRes, boolean sndToDht, long updateCntr, - Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> conQryFltrs + List<CacheContinuousQueryClosure> cntQryClsrs ) { this.success = success; this.oldVal = oldVal; @@ -106,7 +104,7 @@ public class GridCacheUpdateAtomicResult { this.conflictRes = conflictRes; this.sndToDht = sndToDht; this.updateCntr = updateCntr; - this.conQryFltrs = conQryFltrs; + this.cntQryClsrs = cntQryClsrs; } /** @@ -181,10 +179,17 @@ public class GridCacheUpdateAtomicResult { } /** - * @return Continuous query filter results. + * @param clsrs Closures. + */ + private void continuousQueryClosures(List<CacheContinuousQueryClosure> clsrs) { + this.cntQryClsrs = clsrs; + } + + /** + * @return Continuous query closures. */ - public Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> getFilterResults() { - return conQryFltrs; + public List<CacheContinuousQueryClosure> continuousQueryClosures() { + return cntQryClsrs; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 9e6ba0a..16a0a47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -2077,8 +2078,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (dhtFut != null) { - dhtFut.listeners(lsnrs); - if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); @@ -2099,7 +2098,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { sndPrevVal, updRes.oldValue(), updRes.updateCounter(), - updRes.getFilterResults()); + updRes.continuousQueryClosures()); } if (!F.isEmpty(filteredReaders)) @@ -2116,19 +2115,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); } } - else if (lsnrs != null && updRes.updateCounter() != 0) { - ctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - updRes.newValue(), - updRes.oldValue(), - internal, - entry.partition(), - primary, - false, - updRes.updateCounter(), - topVer, - updRes.getFilterResults()); + else if (lsnrs != null && updRes.continuousQueryClosures() != null) { + for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures()) + clsr.onEntryUpdate(); } if (hasNear) { @@ -2405,12 +2394,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (dhtFut != null) { - dhtFut.listeners(lsnrs); - EntryProcessor<Object, Object, Object> entryProcessor = entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); - if (!batchRes.readersOnly()) + if (!batchRes.readersOnly()) { dhtFut.addWriteEntry(entry, writeVal, entryProcessor, @@ -2420,7 +2407,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { sndPrevVal, updRes.oldValue(), updRes.updateCounter(), - updRes.getFilterResults()); + updRes.continuousQueryClosures()); + } if (!F.isEmpty(filteredReaders)) dhtFut.addNearWriteEntries(filteredReaders, @@ -2430,18 +2418,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); } - else if (lsnrs != null && updRes.updateCounter() != 0) { - ctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - updRes.newValue(), - updRes.oldValue(), - entry.isInternal() || !context().userCache(), - entry.partition(), - primary, - false, - updRes.updateCounter(), - topVer); + else if (lsnrs != null && updRes.continuousQueryClosures() != null) { + for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures()) + clsr.onEntryUpdate(); } if (hasNear) { @@ -2884,19 +2863,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); - if (lsnrs != null && updRes.updateCounter() != 0) { - ctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - updRes.newValue(), - updRes.oldValue(), - internal, - entry.partition(), - false, - false, - updRes.updateCounter(), - req.topologyVersion(), - updRes.getFilterResults()); + if (lsnrs != null && updRes.continuousQueryClosures() != null) { + for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures()) + clsr.onEntryUpdate(); } entry.onUnlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index e3dfa4d..32d10c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -37,7 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -103,8 +104,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Response count. */ private volatile int resCnt; - /** */ - private Map<UUID, CacheContinuousQueryListener> lsnrs; + /** Continuous query closures. */ + private List<CacheContinuousQueryClosure> contQryClsrs; /** * @param cctx Cache context. @@ -138,13 +139,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest())); } - /** - * @param lsnrs Continuous query listeners. - */ - void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) { - this.lsnrs = lsnrs; - } - /** {@inheritDoc} */ @Override public IgniteUuid futureId() { return futVer.asGridUuid(); @@ -227,6 +221,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> * @param addPrevVal If {@code true} sends previous value to backups. * @param prevVal Previous value. * @param updateCntr Partition update counter. + * @param clsrs */ public void addWriteEntry(GridDhtCacheEntry entry, @Nullable CacheObject val, @@ -236,8 +231,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> @Nullable GridCacheVersion conflictVer, boolean addPrevVal, @Nullable CacheObject prevVal, - long updateCntr, - @Nullable Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterRes) { + long updateCntr, List<CacheContinuousQueryClosure> clsrs) { AffinityTopologyVersion topVer = updateReq.topologyVersion(); Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); @@ -249,6 +243,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> keys.add(entry.key()); + if (clsrs != null) { + if (contQryClsrs == null) + contQryClsrs = new ArrayList<>(keys.size()); + + contQryClsrs.addAll(clsrs); + } + for (ClusterNode node : dhtNodes) { UUID nodeId = node.id(); @@ -282,30 +283,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> addPrevVal, entry.partition(), prevVal, - updateCntr, - filterRes, - lsnrs != null); - } - else if (lsnrs != null && dhtNodes.size() == 1) { - try { - cctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - val, - prevVal, - entry.key().internal() || !cctx.userCache(), - entry.partition(), - true, - false, - updateCntr, - updateReq.topologyVersion(), - filterRes - ); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal=" - + val + ", err=" + e + "]"); - } + updateCntr); } } } @@ -376,72 +354,17 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> cctx.mvcc().removeAtomicFuture(version()); if (err != null) { - if (!mappings.isEmpty() && lsnrs != null) { - Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size()); - - exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { - for (int i = 0; i < req.size(); i++) { - KeyCacheObject key = req.key(i); - - if (!hndKeys.contains(key)) { - updateRes.addFailedKey(key, err); - - cctx.continuousQueries().skipUpdateEvent( - lsnrs, - key, - req.partitionId(i), - req.updateCounter(i), - updateReq.topologyVersion()); - - hndKeys.add(key); - - if (hndKeys.size() == keys.size()) - break exit; - } - } - } - } - else - for (KeyCacheObject key : keys) - updateRes.addFailedKey(key, err); + for (KeyCacheObject key : keys) + updateRes.addFailedKey(key, err); + + if (contQryClsrs != null) + for (CacheContinuousQueryClosure clsr : contQryClsrs) + clsr.skipEvent(); } else { - if (lsnrs != null) { - Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size()); - - exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { - for (int i = 0; i < req.size(); i++) { - KeyCacheObject key = req.key(i); - - if (!hndKeys.contains(key)) { - try { - cctx.continuousQueries().onEntryUpdated( - lsnrs, - key, - req.value(i), - req.localPreviousValue(i), - key.internal() || !cctx.userCache(), - req.partitionId(i), - true, - false, - req.updateCounter(i), - updateReq.topologyVersion(), - req.filterResult(i)); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send continuous query message. [key=" + key + - ", newVal=" + req.value(i) + - ", err=" + e + "]"); - } - - hndKeys.add(key); - - if (hndKeys.size() == keys.size()) - break exit; - } - } - } - } + if (contQryClsrs != null) + for (CacheContinuousQueryClosure clsr : contQryClsrs) + clsr.onEntryUpdate(); } if (updateReq.writeSynchronizationMode() == FULL_SYNC) http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index d0cfebd..9d65fa9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -162,10 +162,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectTransient private List<CacheObject> locPrevVals; - /** */ - @GridDirectTransient - private List<Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>>> filterRes; - /** Keep binary flag. */ private boolean keepBinary; @@ -249,8 +245,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @param partId Partition. * @param prevVal Previous value. * @param updateCntr Update counter. - * @param filterRes Filter results. - * @param storeLocPrevVal If {@code true} stores previous value. */ public void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, @@ -261,20 +255,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid boolean addPrevVal, int partId, @Nullable CacheObject prevVal, - @Nullable Long updateCntr, - @Nullable Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterRes, - boolean storeLocPrevVal) { + @Nullable Long updateCntr) { keys.add(key); partIds.add(partId); - if (storeLocPrevVal) { - if (locPrevVals == null) - locPrevVals = new ArrayList<>(); - - locPrevVals.add(prevVal); - } - if (forceTransformBackups) { assert entryProcessor != null; @@ -297,13 +282,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid updateCntrs.add(updateCntr); } - if (filterRes != null) { - if (this.filterRes == null) - this.filterRes = new ArrayList<>(); - - this.filterRes.add(filterRes); - } - // In case there is no conflict, do not create the list. if (conflictVer != null) { if (conflictVers == null) { @@ -504,17 +482,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** - * @param idx Index. - * @return Filter result future. - */ - public Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterResult(int idx) { - if (filterRes != null && idx < filterRes.size()) - return filterRes.get(idx); - - return null; - } - - /** * @param idx Near key index. * @return Key. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java new file mode 100644 index 0000000..f000b93 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +/** + * + */ +public interface CacheContinuousQueryClosure extends Runnable { + /** + * + */ + public void onEntryUpdate(); + + /** + * + */ + public void skipEvent(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index 6e58bb5..2bfd53d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -34,9 +34,6 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> { /** */ private final GridCacheContext cctx; - /** */ - private IgniteInternalFuture<Boolean> filterFut; - /** Entry. */ @GridToStringExclude private final CacheContinuousQueryEntry e; @@ -54,20 +51,6 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> { } /** - * @return Filter future. - */ - public IgniteInternalFuture<Boolean> getFilterFuture() { - return filterFut; - } - - /** - * @param filterFut Filter future. - */ - public void setFilterFut(IgniteInternalFuture<Boolean> filterFut) { - this.filterFut = filterFut; - } - - /** * @return Entry. */ CacheContinuousQueryEntry entry() { http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 5cd7b2b..b3d5028 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -34,22 +34,27 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; -import org.apache.ignite.cache.query.CacheAsyncCallback; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDeploymentCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.deployment.GridDeployment; @@ -68,11 +73,10 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; import org.apache.ignite.internal.util.GridConcurrentSkipListSet; import org.apache.ignite.internal.util.GridLongList; -import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -165,10 +169,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private transient boolean ignoreClsNotFound; /** */ - private transient boolean asyncLsnr; - - /** */ - private transient boolean asyncFilter; + private transient boolean asyncCallback; /** * Required by {@link Externalizable}. @@ -299,13 +300,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler final CacheEntryEventFilter filter = getEventFilter(); - asyncLsnr = locLsnr instanceof CacheAsyncCallback; + asyncCallback = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class); if (filter != null) { ctx.resource().injectGeneric(filter); - asyncFilter = filter instanceof CacheAsyncCallback - || (filter instanceof JCacheQueryRemoteFilter && ((JCacheQueryRemoteFilter)filter).async()); + if (!asyncCallback) + asyncCallback = U.hasAnnotation(filter, IgniteAsyncCallback.class) + || (filter instanceof JCacheQueryRemoteFilter && ((JCacheQueryRemoteFilter)filter).async()); } entryBufs = new ConcurrentHashMap<>(); @@ -320,9 +322,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert !skipPrimaryCheck || loc; - final boolean asyncLsnr0 = asyncLsnr; - final boolean asyncFilter0 = asyncFilter; - CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { @Override public void onExecution() { if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -348,59 +347,25 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return keepBinary; } - @Override public IgniteInternalFuture<Boolean> filter(final CacheContinuousQueryEvent<K, V> evt) { - final GridCacheContext<K, V> cctx = cacheContext(ctx); - - try { - if (filter == null) - return new GridFinishedFuture<>(true); - - if (asyncFilter0) { - final GridFutureAdapter<Boolean> f = new GridFutureAdapter<>(); - - ctx.continuousQueryPool().execute(new FilterClosure(evt, filter, f, - cctx.logger(CacheContinuousQueryHandler.class)), evt.partitionId()); - - return f; - } - else - return new GridFinishedFuture<>(filter.evaluate(evt)); - } - catch (Exception e) { - U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e); - - return new GridFinishedFuture<>(true); - } - } - - @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, - boolean recordIgniteEvt) { + @Override public CacheContinuousQueryClosure onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, + boolean primary, + boolean recordIgniteEvt, + boolean fireEvent) { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) - return; + return null; final GridCacheContext<K, V> cctx = cacheContext(ctx); // Check that cache stopped. if (cctx == null) - return; + return null; final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); // skipPrimaryCheck is set only when listen locally for replicated cache events. assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); - IgniteInternalFuture<Boolean> notify; - - if (evt.getFilterFuture() == null) { - if (!evt.entry().isFiltered() && filter != null) - notify = filter(evt); - else - notify = new GridFinishedFuture<>(!evt.entry().isFiltered()); - } - else - notify = evt.getFilterFuture(); - - final ContinuousQueryClosure clsr = new ContinuousQueryClosure(taskName(), + final ContinuousQueryClosureImpl clsr = new ContinuousQueryClosureImpl(taskName(), recordIgniteEvt, routineId, nodeId, @@ -409,14 +374,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler primary, cctx, filter, - notify, evt, + fireEvent, cache); - if (asyncFilter0 || asyncLsnr0) - ctx.continuousQueryPool().execute(clsr, evt.partitionId()); + if (!asyncCallback) { + clsr.filter(); + + if (fireEvent) + clsr.onEntryUpdate(); + } else - clsr.run(); + ctx.continuousQueryPool().execute(clsr, evt.partitionId()); + + return clsr; } @Override public void onUnregister() { @@ -462,15 +433,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx); } - @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, - boolean primary) { + @Override public CacheContinuousQueryClosure skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, + AffinityTopologyVersion topVer, boolean primary, boolean fireEvnt) { assert evt != null; CacheContinuousQueryEntry e = evt.entry(); e.markFiltered(); - onEntryUpdated(evt, primary, false); + return onEntryUpdated(evt, primary, false, fireEvnt); } @Override public void onPartitionEvicted(int part) { @@ -579,76 +550,90 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>(); - for (CacheContinuousQueryEntry e : entries) { - GridCacheDeploymentManager depMgr = cctx.deploy(); + List<PartitionRecovery> recoveries = new ArrayList<>(); + + try { + for (CacheContinuousQueryEntry e : entries) { + GridCacheDeploymentManager depMgr = cctx.deploy(); - ClassLoader ldr = depMgr.globalLoader(); + ClassLoader ldr = depMgr.globalLoader(); - if (ctx.config().isPeerClassLoadingEnabled()) { - GridDeploymentInfo depInfo = e.deployInfo(); + if (ctx.config().isPeerClassLoadingEnabled()) { + GridDeploymentInfo depInfo = e.deployInfo(); - if (depInfo != null) { - depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), - depInfo.participants(), depInfo.localDeploymentOwner()); + if (depInfo != null) { + depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), + depInfo.participants(), depInfo.localDeploymentOwner()); + } } - } - try { - e.unmarshal(cctx, ldr); + try { + e.unmarshal(cctx, ldr); - entries0.addAll(handleEvent(ctx, e)); - } - catch (IgniteCheckedException ex) { - if (ignoreClsNotFound) - assert internal; - else - U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); + T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> evts = handleEvent(ctx, e); + + if (evts.get2() != null) + recoveries.add(evts.get2()); + + entries0.addAll(evts.get1()); + } + catch (IgniteCheckedException ex) { + if (ignoreClsNotFound) + assert internal; + else + U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); + } } - } - final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); + final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - if (!entries0.isEmpty()) { - if (asyncLsnr) { - Iterable<CacheContinuousQueryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, - new C1<CacheContinuousQueryEntry, CacheContinuousQueryEvent<? extends K, ? extends V>>() { - @Override public CacheContinuousQueryEvent<? extends K, ? extends V> apply( - CacheContinuousQueryEntry e) { - return new CacheContinuousQueryEvent<>(cache, cctx, e); - } - }, - new IgnitePredicate<CacheContinuousQueryEntry>() { - @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.isFiltered(); + if (!entries0.isEmpty()) { + if (asyncCallback) { + Iterable<CacheContinuousQueryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, + new C1<CacheContinuousQueryEntry, CacheContinuousQueryEvent<? extends K, ? extends V>>() { + @Override public CacheContinuousQueryEvent<? extends K, ? extends V> apply( + CacheContinuousQueryEntry e) { + return new CacheContinuousQueryEvent<>(cache, cctx, e); + } + }, + new IgnitePredicate<CacheContinuousQueryEntry>() { + @Override public boolean apply(CacheContinuousQueryEntry entry) { + return !entry.isFiltered(); + } } - } - ); + ); - for (final CacheContinuousQueryEvent<? extends K, ? extends V> e : evts) { - ctx.continuousQueryPool().execute(new Runnable() { - @Override public void run() { - locLsnr.onUpdated(Collections.<CacheEntryEvent<? extends K, ? extends V>>singleton(e)); - } - }, e.partitionId()); + for (final CacheContinuousQueryEvent<? extends K, ? extends V> e : evts) { + ctx.continuousQueryPool().execute(new Runnable() { + @Override public void run() { + locLsnr.onUpdated(Collections.<CacheEntryEvent<? extends K, ? extends V>>singleton(e)); + } + }, e.partitionId()); + } } - } - else { - Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, - new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { - @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { - return new CacheContinuousQueryEvent<>(cache, cctx, e); - } - }, - new IgnitePredicate<CacheContinuousQueryEntry>() { - @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.isFiltered(); + else { + Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, + new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { + @Override + public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) { + return new CacheContinuousQueryEvent<>(cache, cctx, e); + } + }, + new IgnitePredicate<CacheContinuousQueryEntry>() { + @Override public boolean apply(CacheContinuousQueryEntry entry) { + return !entry.isFiltered(); + } } - } - ); + ); - locLsnr.onUpdated(evts); + locLsnr.onUpdated(evts); + } } } + finally { + for (PartitionRecovery rec : recoveries) + rec.unlock(); + } } /** @@ -656,24 +641,24 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param e entry. * @return Entry collection. */ - private Collection<CacheContinuousQueryEntry> handleEvent(GridKernalContext ctx, + private T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> handleEvent(GridKernalContext ctx, CacheContinuousQueryEntry e) { assert e != null; if (internal) { if (e.isFiltered()) - return Collections.emptyList(); + return new T2(Collections.emptyList(), null); else - return F.asList(e); + return new T2(F.asList(e), null); } // Initial query entry or evicted entry. These events should be fired immediately. if (e.updateCounter() == -1L) - return F.asList(e); + return new T2(F.asList(e), null); PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition()); - return rec.collectEntries(e); + return new T2<>(rec.collectEntries(e), rec); } /** @@ -777,6 +762,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** */ private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); + /** */ + private Lock lock = new ReentrantLock(); + /** * @param log Logger. * @param topVer Topology version. @@ -801,17 +789,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @return Collection entries which will be fired. */ public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) { - assert entry != null; + lock.lock(); - if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. - assert entry.updateCounter() == 0L : entry; + try { + assert entry != null; - return F.asList(entry); - } + if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. + assert entry.updateCounter() == 0L : entry; - List<CacheContinuousQueryEntry> entries; + return F.asList(entry); + } + + List<CacheContinuousQueryEntry> entries; - synchronized (pendingEvts) { // Received first event. if (curTop == AffinityTopologyVersion.NONE) { lastFiredEvt = entry.updateCounter(); @@ -899,9 +889,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler break; } } + + return entries; + } + catch (Exception e) { + lock.unlock(); + + throw new IgniteException("Failed to collect entries."); } + } - return entries; + /** + * Unlock. + */ + public void unlock() { + lock.unlock(); } } @@ -1255,56 +1257,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** * */ - private static class FilterClosure implements Runnable { + private class ContinuousQueryClosureImpl implements CacheContinuousQueryClosure { /** */ - private final CacheContinuousQueryEvent evt; - - /** */ - private final GridFutureAdapter<Boolean> f; + private final IgniteCache cache; /** */ private final IgniteLogger log; /** */ - private final CacheEntryEventFilter filter; - - /** - * @param evt Continuous query event. - * @param filter Filter. - * @param f Future. - * @param log Logger. - */ - public FilterClosure(CacheContinuousQueryEvent evt, - CacheEntryEventFilter filter, - GridFutureAdapter<Boolean> f, - IgniteLogger log) { - this.evt = evt; - this.f = f; - this.log = log; - this.filter = filter; - } - - /** {@inheritDoc} */ - @Override public void run() { - boolean res = true; - - try { - res = filter.evaluate(evt); - } - catch (Exception e) { - U.error(log, "CacheEntryEventFilter failed: " + e); - } - - f.onDone(res); - } - } - - /** - * - */ - private class ContinuousQueryClosure implements Runnable { - /** */ - private final IgniteCache cache; + private final boolean fireEvent; /** */ private CacheContinuousQueryEvent<K, V> evt; @@ -1313,9 +1274,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private CacheEntryEventFilter filter; /** */ - private IgniteInternalFuture<Boolean> notifyFut; - - /** */ private final GridCacheContext<K, V> cctx; /** */ @@ -1339,6 +1297,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** */ private final String taskName; + /** */ + private boolean notify; + + /** */ + private boolean backup; + + /** */ + private final CountDownLatch latch = new CountDownLatch(1); + /** * @param taskName Task name. * @param recordIgniteEvt Fired event. @@ -1349,11 +1316,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param primary Primary flag. * @param cctx Cache context. * @param filter Filter. - * @param notifyFut Notify future. * @param evt Event. + * @param fireEvent Immediately fire event. * @param cache Cache. */ - public ContinuousQueryClosure(String taskName, + ContinuousQueryClosureImpl(String taskName, boolean recordIgniteEvt, UUID routineId, UUID nodeId, @@ -1362,9 +1329,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler boolean primary, GridCacheContext<K, V> cctx, CacheEntryEventFilter filter, - IgniteInternalFuture<Boolean> notifyFut, CacheContinuousQueryEvent<K, V> evt, - IgniteCache cache) { + boolean fireEvent, IgniteCache cache) { this.taskName = taskName; this.recordIgniteEvt = recordIgniteEvt; this.routineId = routineId; @@ -1374,48 +1340,73 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler this.primary = primary; this.cctx = cctx; this.filter = filter; - this.notifyFut = notifyFut; this.evt = evt; this.cache = cache; + this.fireEvent = fireEvent; + + log = ctx.log(CacheContinuousQueryHandler.class); } /** {@inheritDoc} */ @Override public void run() { - boolean notify; + filter(); + + if (fireEvent || waitIfAsync()) + onEntryUpdate(); + } - if (notifyFut == null) { - notify = !evt.entry().isFiltered(); + /** + * @return {@code True} if event fired on this node. + */ + private boolean primary() { + return primary || skipPrimaryCheck; + } - if (notify && filter != null) { - try { - notify = filter.evaluate(evt); - } - catch (Exception e) { - U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed.", e); - } - } + /** + * @return {@code False} if filter sync. + */ + private boolean waitIfAsync() { + if (backup) + return false; + + try { + U.await(latch); + } + catch (IgniteInterruptedCheckedException e) { + log.error("Failed to wait latch."); } - else { - try { - notify = notifyFut.get(); - } - catch (IgniteCheckedException e) { - U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed.", e); - notify = true; - } + return true; + } + + /** {@inheritDoc} */ + @Override public void skipEvent() { + if (evt != null && evt.entry() != null) + evt.entry().markFiltered(); + + onEntryUpdate(); + } + + /** {@inheritDoc} */ + @Override public void onEntryUpdate() { + if (backup) + return; + + if (!fireEvent && asyncCallback) { + latch.countDown(); + + return; } try { final CacheContinuousQueryEntry entry = evt.entry(); - if (!notify) - entry.markFiltered(); + if (loc) { + if (!locCache) { + T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> events = handleEvent(ctx, entry); - if (primary || skipPrimaryCheck) { - if (loc) { - if (!locCache) { - Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx, entry); + try { + Collection<CacheContinuousQueryEntry> entries = events.get1(); if (!entries.isEmpty()) { Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries, @@ -1438,35 +1429,27 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); } } - else { - if (!entry.isFiltered()) - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + finally { + if (events.get2() != null) + events.get2().unlock(); } } else { if (!entry.isFiltered()) - prepareEntry(cctx, nodeId, entry); - - CacheContinuousQueryEntry e = handleEntry(entry); - - if (e != null) - ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); } } else { - if (!internal) { - // Skip init query and expire entries. - if (entry.updateCounter() != -1L) { - entry.markBackup(); + if (!entry.isFiltered()) + prepareEntry(cctx, nodeId, entry); - backupQueue.add(entry); - } - } + CacheContinuousQueryEntry e = handleEntry(entry); + + if (e != null) + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); } } catch (ClusterTopologyCheckedException ex) { - IgniteLogger log = ctx.log(getClass()); - if (log.isDebugEnabled()) log.debug("Failed to send event notification to node, node left cluster " + "[node=" + nodeId + ", err=" + ex + ']'); @@ -1497,6 +1480,39 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler )); } } + + /** + * + */ + public void filter() { + CacheContinuousQueryEntry entry = evt.entry(); + + notify = !entry.isFiltered(); + + try { + if (notify && filter != null) + notify = filter.evaluate(evt); + } + catch (Exception e) { + U.error(log, "CacheEntryEventFilter failed: " + e); + } + + if (!notify) + entry.markFiltered(); + + if (!primary()) { + if (!internal) { + // Skip init query and expire entries. + if (entry.updateCounter() != -1L) { + entry.markBackup(); + + backupQueue.add(entry); + } + } + + backup = true; + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 405dbff..bf1d4a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -37,17 +37,10 @@ public interface CacheContinuousQueryListener<K, V> { * @param evt Event * @param primary Primary flag. * @param recordIgniteEvt Whether to record event. + * @param fireEvent Immediately fired events. */ - public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt); - - /** - * Filters event. - * - * @param evt Event. - * @return {@code True} if the evaluation passes, otherwise false. - * The effect of returning true is that listener will be invoked. - */ - public IgniteInternalFuture<Boolean> filter(CacheContinuousQueryEvent<K, V> evt); + public CacheContinuousQueryClosure onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, + boolean recordIgniteEvt, boolean fireEvent); /** * Listener unregistered callback. @@ -79,7 +72,8 @@ public interface CacheContinuousQueryListener<K, V> { * @param topVer Topology version. * @param primary Primary */ - public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary); + public CacheContinuousQueryClosure skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, + AffinityTopologyVersion topVer, boolean primary, boolean fireEvnt); /** * @param part Partition. http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 2b3052f..12819c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -24,8 +24,8 @@ import java.io.ObjectOutput; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; @@ -47,11 +47,10 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.query.CacheAsyncCallback; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -169,32 +168,21 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param key Entry key. * @param partId Partition id. * @param updCntr Updated counter. - * @param topVer Topology version. - */ - public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, - KeyCacheObject key, - int partId, - long updCntr, - AffinityTopologyVersion topVer) { - skipUpdateEvent(lsnrs, key, partId, updCntr, true, topVer); - } - - /** - * @param lsnrs Listeners to notify. - * @param key Entry key. - * @param partId Partition id. - * @param updCntr Updated counter. - * @param topVer Topology version. * @param primary Primary. + * @param fireEvnt + * @param topVer Topology version. */ - public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, + public List<CacheContinuousQueryClosure> skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, KeyCacheObject key, int partId, long updCntr, boolean primary, + boolean fireEvnt, AffinityTopologyVersion topVer) { assert lsnrs != null; + List<CacheContinuousQueryClosure> clsrs = new ArrayList<>(lsnrs.size()); + for (CacheContinuousQueryListener lsnr : lsnrs.values()) { CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( cctx.cacheId(), @@ -210,8 +198,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.skipUpdateEvent(evt, topVer, primary); + clsrs.add(lsnr.skipUpdateEvent(evt, topVer, primary, fireEvnt)); } + + return clsrs; } /** @@ -254,6 +244,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean internal, int partId, boolean primary, + boolean fireEvnt, boolean preload, long updateCntr, AffinityTopologyVersion topVer) throws IgniteCheckedException { @@ -268,113 +259,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { internal, partId, primary, + fireEvnt, preload, updateCntr, - topVer, - null); - } - } - - /** - * @param lsnrCol Listeners to notify. - * @param key Key. - * @param newVal New value. - * @param oldVal Old value. - * @param partId Partition. - * @param preload Whether update happened during preloading. - * @param updateCntr Update counter. - * @param topVer Topology version. - * @throws IgniteCheckedException In case of error. - */ - public Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterEntry( - Map<UUID, CacheContinuousQueryListener> lsnrCol, - KeyCacheObject key, - CacheObject newVal, - CacheObject oldVal, - int partId, - boolean preload, - long updateCntr, - AffinityTopologyVersion topVer) - throws IgniteCheckedException - { - assert key != null; - assert lsnrCol != null; - - boolean hasNewVal = newVal != null; - boolean hasOldVal = oldVal != null; - - if (!hasNewVal && !hasOldVal) - return null; - - EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED; - - boolean initialized = false; - - Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> res = new HashMap<>(); - - for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { - if (preload && !lsnr.notifyExisting()) - continue; - - if (!initialized) { - if (lsnr.oldValueRequired()) { - oldVal = (CacheObject)cctx.unwrapTemporary(oldVal); - - if (oldVal != null) - oldVal.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); - } - - if (newVal != null) - newVal.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); - - initialized = true; - } - - CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( - cctx.cacheId(), - evtType, - key, - newVal, - lsnr.oldValueRequired() ? oldVal : null, - lsnr.keepBinary(), - partId, - updateCntr, topVer); - - CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( - cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - - res.put(lsnr, lsnr.filter(evt)); } - - return res; - } - - /** - * @param lsnrCol Listeners to notify. - * @param key Key. - * @param newVal New value. - * @param oldVal Old value. - * @param internal Internal entry (internal key or not user cache), - * @param partId Partition. - * @param primary {@code True} if called on primary node. - * @param preload Whether update happened during preloading. - * @param updateCntr Update counter. - * @param topVer Topology version. - * @throws IgniteCheckedException In case of error. - */ - public void onEntryUpdated( - Map<UUID, CacheContinuousQueryListener> lsnrCol, - KeyCacheObject key, - CacheObject newVal, - CacheObject oldVal, - boolean internal, - int partId, - boolean primary, - boolean preload, - long updateCntr, - AffinityTopologyVersion topVer) throws IgniteCheckedException { - onEntryUpdated(lsnrCol, key, newVal, oldVal, internal, partId, primary, preload, updateCntr, topVer, null); } /** @@ -385,13 +274,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param internal Internal entry (internal key or not user cache), * @param partId Partition. * @param primary {@code True} if called on primary node. + * @param fireEvnt Fired event immediately. * @param preload Whether update happened during preloading. * @param updateCntr Update counter. * @param topVer Topology version. - * @param filterRes Filter results. * @throws IgniteCheckedException In case of error. */ - public void onEntryUpdated( + public List<CacheContinuousQueryClosure> onEntryUpdated( Map<UUID, CacheContinuousQueryListener> lsnrCol, KeyCacheObject key, CacheObject newVal, @@ -399,10 +288,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean internal, int partId, boolean primary, + boolean fireEvnt, boolean preload, long updateCntr, - AffinityTopologyVersion topVer, - Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterRes) + AffinityTopologyVersion topVer) throws IgniteCheckedException { assert key != null; @@ -411,11 +300,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean hasNewVal = newVal != null; boolean hasOldVal = oldVal != null; - if (!hasNewVal && !hasOldVal) { - skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, topVer); - - return; - } + if (!hasNewVal && !hasOldVal) + return skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, fireEvnt, topVer); EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED; @@ -423,6 +309,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + List<CacheContinuousQueryClosure> clsrs = new ArrayList<>(lsnrCol.size()); + for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { if (preload && !lsnr.notifyExisting()) continue; @@ -455,11 +343,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - if (filterRes != null) - evt.setFilterFut(filterRes.get(lsnr)); + CacheContinuousQueryClosure clsr = lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fireEvnt); - lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); + if (clsr != null) + clsrs.add(clsr); } + + return clsrs; } /** @@ -512,7 +402,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, true); } } } @@ -1187,7 +1077,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return {@code True} if filter should be executed in non-system thread. */ protected boolean async() { - return impl != null && impl instanceof CacheAsyncCallback; + return U.hasAnnotation(impl, IgniteAsyncCallback.class); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java new file mode 100644 index 0000000..d2069c7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListener; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * If {@link CacheEntryEventFilter filter} or {@link CacheEntryListener} + * annotated this annotation then they will be executing on a separate thread pool. It allows + * to use cache API in a callbacks. + * <p> + * Different implementations can use different thread pools. For example continuous query will use continuous query + * thread poll which can be configured by {@link IgniteConfiguration#setContinuousQueryPoolSize(int)} + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface IgniteAsyncCallback { +} http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java index 8eab9d3..7958ac3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java @@ -32,7 +32,7 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.cache.query.CacheAsyncCallback; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; @@ -590,7 +590,8 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr /** * */ - private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter implements CacheAsyncCallback { + @IgniteAsyncCallback + private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter { /** * @param clsr Closure. */ @@ -632,7 +633,8 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr /** * */ - private static class CacheInvokeListenerAsync extends CacheInvokeListener implements CacheAsyncCallback { + @IgniteAsyncCallback + private static class CacheInvokeListenerAsync extends CacheInvokeListener { /** * @param clsr Closure. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java index 6efa3c4..5d7afdc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java @@ -26,7 +26,7 @@ import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryListenerException; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; -import org.apache.ignite.cache.query.CacheAsyncCallback; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.jetbrains.annotations.NotNull; /** @@ -43,8 +43,9 @@ public class CacheContinuousQueryFactoryAsyncFilterRandomOperationTest /** * */ + @IgniteAsyncCallback protected static class NonSerializableAsyncFilter implements - CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, CacheAsyncCallback, Externalizable { + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable { /** */ public NonSerializableAsyncFilter() { // No-op.
