IGNITE-2004 Fixed review notes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b118b685 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b118b685 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b118b685 Branch: refs/heads/ignite-2004 Commit: b118b6850458bd0946a52e4fb4cd0ecde8c56458 Parents: a9c8159 Author: nikolay_tikhonov <[email protected]> Authored: Mon Apr 11 17:36:17 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon Apr 11 17:36:17 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 36 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 24 +- .../distributed/near/GridNearAtomicCache.java | 1 + .../continuous/CacheContinuousQueryHandler.java | 462 +++++++++---------- .../CacheContinuousQueryListener.java | 8 +- .../continuous/CacheContinuousQueryManager.java | 23 +- .../processors/cache/GridCacheTestEntryEx.java | 4 +- ...ryFactoryAsyncFilterRandomOperationTest.java | 40 ++ .../CacheContinuousQueryOrderingEventTest.java | 26 +- ...acheContinuousQueryRandomOperationsTest.java | 13 + 11 files changed, 332 insertions(+), 309 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 8270c21..c7f3a38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -24,10 +24,10 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.eviction.EvictableEntry; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -492,7 +492,7 @@ public interface GridCacheEntryEx { String taskName, @Nullable CacheObject prevVal, @Nullable Long updateCntr, - @Nullable IgniteInternalFuture fut + @Nullable GridDhtAtomicUpdateFuture fut ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 921be85..f64803a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -42,6 +41,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras; @@ -1243,7 +1243,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme internal, partition(), tx.local(), - true, false, updateCntr0, null, @@ -1442,7 +1441,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme internal, partition(), tx.local(), - true, false, updateCntr0, null, @@ -1820,7 +1818,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme internal, partition(), true, - true, false, updateCntr, null, @@ -1874,7 +1871,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme String taskName, @Nullable CacheObject prevVal, @Nullable Long updateCntr, - @Nullable IgniteInternalFuture fut + @Nullable GridDhtAtomicUpdateFuture fut ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic(); @@ -2011,8 +2008,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateCntr0 == null ? 0 : updateCntr0, - null); + updateCntr0 == null ? 0 : updateCntr0); } // Will update something. else { @@ -2096,9 +2092,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme isInternal() || !context().userCache(), partition(), primary, - true, false, updateCntr0, + null, topVer); } @@ -2111,8 +2107,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - 0, - null); + 0); } } else @@ -2189,8 +2184,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateCntr0 == null ? 0 : updateCntr0, - null); + updateCntr0 == null ? 0 : updateCntr0); } } @@ -2238,8 +2232,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateCntr0 == null ? 0 : updateCntr0, - null); + updateCntr0 == null ? 0 : updateCntr0); } } else @@ -2340,8 +2333,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateCntr0 == null ? 0 : updateCntr0, - null); + updateCntr0 == null ? 0 : updateCntr0); else if (interceptorVal != updated0) { updated0 = cctx.unwrapTemporary(interceptorVal); @@ -2424,8 +2416,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateCntr0 == null ? 0 : updateCntr0, - null); + updateCntr0 == null ? 0 : updateCntr0); } if (writeThrough) @@ -2527,8 +2518,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(evtOldVal)); } - clsrs = cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal, - partition(), primary, false, false, updateCntr0, topVer); + cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal, + partition(), primary, false, updateCntr0, fut, topVer); } cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary); @@ -2556,8 +2547,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme enqueueVer, conflictCtx, true, - updateCntr0 == null ? 0 : updateCntr0, - clsrs); + updateCntr0 == null ? 0 : updateCntr0); } /** @@ -3331,9 +3321,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme this.isInternal() || !this.context().userCache(), this.partition(), true, - true, preload, updateCntr, + null, topVer); cctx.dataStructures().onEntryUpdated(key, false, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 73a2ede..b0fc948 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -42,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -97,6 +97,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Future keys. */ private final Collection<KeyCacheObject> keys; + /** Continuous query closures. */ + private Collection<CI1<Boolean>> cntQryClsrs; + /** */ private final boolean waitForExchange; @@ -336,16 +339,33 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } } + /** + * @param clsr Continuous query closure. + */ + public void addContinuousQueryClosure(CI1<Boolean> clsr){ + if (cntQryClsrs == null) + cntQryClsrs = new ArrayList<>(10); + + cntQryClsrs.add(clsr); + } + /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { if (super.onDone(res, err)) { cctx.mvcc().removeAtomicFuture(version()); - if (err != null) { + boolean suc = err == null; + + if (!suc) { for (KeyCacheObject key : keys) updateRes.addFailedKey(key, err); } + if (cntQryClsrs != null) { + for (CI1<Boolean> clsr : cntQryClsrs) + clsr.apply(suc); + } + if (updateReq.writeSynchronizationMode() == FULL_SYNC) completionCb.apply(updateReq, updateRes); http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index e0c7187..d8cbe6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -373,6 +373,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { req.subjectId(), taskName, null, + null, null); if (updRes.removeVersion() != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 3ecac40..79d19c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -34,17 +34,13 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cluster.ClusterNode; @@ -53,7 +49,6 @@ import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.deployment.GridDeployment; @@ -64,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter; import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; @@ -73,15 +69,13 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin import org.apache.ignite.internal.util.GridConcurrentSkipListSet; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; @@ -367,11 +361,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return keepBinary; } - @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, + @Override public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt, boolean primary, - boolean recordIgniteEvt, - boolean fireEvent, - IgniteInternalFuture<?> fut) { + final boolean recordIgniteEvt, + GridDhtAtomicUpdateFuture fut) { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) return ; @@ -385,7 +378,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); if (asyncCallback) { - ContinuousQueryClosureImpl clsr = new ContinuousQueryClosureImpl( + ContinuousQueryAsyncClosure clsr = new ContinuousQueryAsyncClosure( primary, evt, recordIgniteEvt, @@ -393,6 +386,23 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler ctx.continuousQueryPool().execute(clsr, evt.partitionId()); } + else { + final boolean notify = filter(evt, primary); + + if (primary || skipPrimaryCheck) { + if (fut == null) + onEntryUpdate(evt, notify, loc, recordIgniteEvt); + else + fut.addContinuousQueryClosure(new CI1<Boolean>() { + @Override public void apply(Boolean suc) { + if (!suc) + evt.entry().markFiltered(); + + onEntryUpdate(evt, notify, loc, recordIgniteEvt); + } + }); + } + } } @Override public void onUnregister() { @@ -439,14 +449,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, - AffinityTopologyVersion topVer, boolean primary, boolean fireEvt) { + AffinityTopologyVersion topVer, boolean primary) { assert evt != null; CacheContinuousQueryEntry e = evt.entry(); e.markFiltered(); - onEntryUpdated(evt, primary, false, fireEvt, null); + onEntryUpdated(evt, primary, false, null); } @Override public void onPartitionEvicted(int part) { @@ -543,85 +553,85 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, final GridKernalContext ctx) { + @Override public void notifyCallback(final UUID nodeId, + final UUID routineId, + Collection<?> objs, + final GridKernalContext ctx) { assert nodeId != null; assert routineId != null; assert objs != null; assert ctx != null; - Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs; + final Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs; + + if (entries.iterator().hasNext()) { + if (asyncCallback) { + int partId = entries.iterator().next().partition(); + ctx.continuousQueryPool().execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, entries); + } + }, partId); + } + else + notifyCallback0(nodeId, ctx, entries); + } + } + + /** + * @param nodeId Node id. + * @param ctx Kernal context. + * @param entries Entries. + */ + private void notifyCallback0(UUID nodeId, + final GridKernalContext ctx, + Collection<CacheContinuousQueryEntry> entries) { final GridCacheContext cctx = cacheContext(ctx); final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(); - final List<PartitionRecovery> rcvs = new ArrayList<>(); + for (CacheContinuousQueryEntry e : entries) { + GridCacheDeploymentManager depMgr = cctx.deploy(); - try { - for (CacheContinuousQueryEntry e : entries) { - GridCacheDeploymentManager depMgr = cctx.deploy(); + ClassLoader ldr = depMgr.globalLoader(); - ClassLoader ldr = depMgr.globalLoader(); + if (ctx.config().isPeerClassLoadingEnabled()) { + GridDeploymentInfo depInfo = e.deployInfo(); - if (ctx.config().isPeerClassLoadingEnabled()) { - GridDeploymentInfo depInfo = e.deployInfo(); - - if (depInfo != null) { - depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), - depInfo.participants(), depInfo.localDeploymentOwner()); - } + if (depInfo != null) { + depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), + depInfo.participants(), depInfo.localDeploymentOwner()); } + } - try { - e.unmarshal(cctx, ldr); - - if (!asyncCallback) { - T2<Collection<CacheEntryEvent<? extends K, ? extends V>>, PartitionRecovery> evts = - handleEvent(ctx, e, false); + try { + e.unmarshal(cctx, ldr); - if (evts.get2() != null) - rcvs.add(evts.get2()); + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, e); - entries0.addAll(evts.get1()); - } - } - catch (IgniteCheckedException ex) { - if (ignoreClsNotFound) - assert internal; - else - U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); - } + if (evts != null && !evts.isEmpty()) + entries0.addAll(evts); } - - if (asyncCallback) { - for (final CacheContinuousQueryEntry e : entries) { - ctx.continuousQueryPool().execute(new Runnable() { - @Override public void run() { - T2<Collection<CacheEntryEvent<? extends K, ? extends V>>, PartitionRecovery> evts = - handleEvent(ctx, e, false); - - locLsnr.onUpdated(evts.get1()); - } - }, e.partition()); - } + catch (IgniteCheckedException ex) { + if (ignoreClsNotFound) + assert internal; + else + U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); } - else if (!entries0.isEmpty()) - locLsnr.onUpdated(entries0); - } - finally { - for (PartitionRecovery rec : rcvs) - rec.unlock(); } + + if (!entries0.isEmpty()) + locLsnr.onUpdated(entries0); } /** * @param ctx Context. * @param e entry. - * @param async Async. * @return Entry collection. */ - private T2<Collection<CacheEntryEvent<? extends K, ? extends V>>, PartitionRecovery> - handleEvent(GridKernalContext ctx, CacheContinuousQueryEntry e, boolean async) { + private Collection<CacheEntryEvent<? extends K, ? extends V>> handleEvent(GridKernalContext ctx, + CacheContinuousQueryEntry e) { assert e != null; GridCacheContext<K, V> cctx = cacheContext(ctx); @@ -630,22 +640,137 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (internal) { if (e.isFiltered()) - return new T2(Collections.emptyList(), null); + return Collections.emptyList(); else - return new T2(F.<CacheEntryEvent<? extends K, ? extends V>>asList( - new CacheContinuousQueryEvent<K, V>(cache, cctx, e)), null); + return F.<CacheEntryEvent<? extends K, ? extends V>>asList( + new CacheContinuousQueryEvent<K, V>(cache, cctx, e)); } // Initial query entry or evicted entry. These events should be fired immediately. if (e.updateCounter() == -1L) { - return !e.isFiltered() ? new T2(F.<CacheEntryEvent<? extends K, ? extends V>>asList( - new CacheContinuousQueryEvent<K, V>(cache, cctx, e)), null) : - new T2(Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(), null); + return !e.isFiltered() ? F.<CacheEntryEvent<? extends K, ? extends V>>asList( + new CacheContinuousQueryEvent<K, V>(cache, cctx, e)) : + Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); } PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition()); - return new T2<>(rec.<K, V>collectEntries(e, cctx, async, cache), rec); + return rec.collectEntries(e, cctx, cache); + } + + /** + * @param primary Primary. + * @param evt Query event. + * @return {@code True} if event passed filter otherwise {@code true}. + */ + public boolean filter(CacheContinuousQueryEvent evt, boolean primary) { + CacheContinuousQueryEntry entry = evt.entry(); + + boolean notify = !entry.isFiltered(); + + try { + if (notify && getEventFilter() != null) + notify = getEventFilter().evaluate(evt); + } + catch (Exception e) { + U.error(log, "CacheEntryEventFilter failed: " + e); + } + + if (!notify) + entry.markFiltered(); + + if (!primary) { + if (!internal) { + // Skip init query and expire entries. + if (entry.updateCounter() != -1L) { + entry.markBackup(); + + backupQueue.add(entry); + } + } + } + + return notify; + } + + /** + * @param evt Continuous query event. + * @param notify Notify flag. + * @param loc Listener deployed on this node. + * @param recordIgniteEvt Record ignite event. + */ + private void onEntryUpdate(CacheContinuousQueryEvent evt, boolean notify, boolean loc, boolean recordIgniteEvt) { + try { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx == null) + return; + + final CacheContinuousQueryEntry entry = evt.entry(); + + if (loc) { + if (!locCache) { + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry); + + if (!evts.isEmpty()) { + locLsnr.onUpdated(evts); + + if (!internal && !skipPrimaryCheck) + sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + } + } + else { + if (!entry.isFiltered()) + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + } + } + else { + if (!entry.isFiltered()) + prepareEntry(cctx, nodeId, entry); + + CacheContinuousQueryEntry e = handleEntry(entry); + + if (e != null) + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); + } + } + catch (ClusterTopologyCheckedException ex) { + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + + if (recordIgniteEvt && notify) { + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS.name(), + cacheName, + null, + null, + null, + getEventFilter() instanceof CacheEntryEventSerializableFilter ? + (CacheEntryEventSerializableFilter)getEventFilter() : null, + null, + nodeId, + taskName(), + evt.getKey(), + evt.getValue(), + evt.getOldValue(), + null + )); + } + } + + /** + * @return Task name. + */ + private String taskName() { + return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; } /** @@ -748,9 +873,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** */ private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); - /** */ - private Lock lock = new ReentrantLock(); - /** * @param log Logger. * @param topVer Topology version. @@ -778,23 +900,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler */ <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(CacheContinuousQueryEntry entry, GridCacheContext cctx, - boolean async, IgniteCache cache) { - if (!async) - lock.lock(); + assert entry != null; - try { - assert entry != null; + if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. + assert entry.updateCounter() == 0L : entry; - if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. - assert entry.updateCounter() == 0L : entry; - - return F.<CacheEntryEvent<? extends K, ? extends V>> - asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); - } + return F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); + } - List<CacheEntryEvent<? extends K, ? extends V>> entries; + List<CacheEntryEvent<? extends K, ? extends V>> entries; + synchronized (pendingEvts) { // Received first event. if (curTop == AffinityTopologyVersion.NONE) { lastFiredEvt = entry.updateCounter(); @@ -886,22 +1004,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler break; } } - - return entries; } - catch (Exception e) { - if (!async) - lock.unlock(); - - throw new IgniteException("Failed to collect entries."); - } - } - /** - * Unlock. - */ - public void unlock() { - lock.unlock(); + return entries; } } @@ -1255,7 +1360,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** * */ - private class ContinuousQueryClosureImpl implements Runnable { + private class ContinuousQueryAsyncClosure implements Runnable { /** */ private CacheContinuousQueryEvent<K, V> evt; @@ -1263,9 +1368,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private boolean primary; /** */ - private boolean notify; - - /** */ private boolean recordIgniteEvt; /** */ @@ -1275,8 +1377,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param primary Primary flag. * @param evt Event. * @param recordIgniteEvt Fired event. + * @param fut Dht future. */ - ContinuousQueryClosureImpl( + ContinuousQueryAsyncClosure( boolean primary, CacheContinuousQueryEvent<K, V> evt, boolean recordIgniteEvt, @@ -1289,20 +1392,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** {@inheritDoc} */ @Override public void run() { - if (!filter()) - return; + boolean notify = filter(evt, primary); - if (fut != null) { - if (waitIfAsync()) - onEntryUpdate0(); - else { - evt.entry().markFiltered(); + if (primary()) { + if (fut != null) { + if (waitFuture()) + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + else { + evt.entry().markFiltered(); - onEntryUpdate0(); + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + } } + else + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); } - else - onEntryUpdate0(); } /** @@ -1313,9 +1417,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** - * @return {@code False} if filter sync. + * @return {@code False} if future completed with error otherwise {@code true}. */ - private boolean waitIfAsync() { + private boolean waitFuture() { try { fut.get(); } @@ -1325,124 +1429,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return true; } - - /** - * - */ - private void onEntryUpdate0() { - try { - GridCacheContext<K, V> cctx = cacheContext(ctx); - - if (cctx == null) - return; - - final CacheContinuousQueryEntry entry = evt.entry(); - - if (routineId.equals(nodeId)) { - if (!locCache) { - T2<Collection<CacheEntryEvent<? extends K, ? extends V>>, PartitionRecovery> events = - handleEvent(ctx, entry, asyncCallback); - - try { - Collection<CacheEntryEvent<? extends K, ? extends V>> evts = events.get1(); - - if (!evts.isEmpty()) { - locLsnr.onUpdated(evts); - - if (!internal && !skipPrimaryCheck) - sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); - } - } - finally { - if (events.get2() != null) - events.get2().unlock(); - } - } - else { - if (!entry.isFiltered()) - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); - } - } - else { - if (!entry.isFiltered()) - prepareEntry(cctx, nodeId, entry); - - CacheContinuousQueryEntry e = handleEntry(entry); - - if (e != null) - ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); - } - } - catch (ClusterTopologyCheckedException ex) { - if (log.isDebugEnabled()) - log.debug("Failed to send event notification to node, node left cluster " + - "[node=" + nodeId + ", err=" + ex + ']'); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); - } - - if (recordIgniteEvt && notify) { - ctx.event().record(new CacheQueryReadEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.CONTINUOUS.name(), - cacheName, - null, - null, - null, - getEventFilter() instanceof CacheEntryEventSerializableFilter ? - (CacheEntryEventSerializableFilter)getEventFilter() : null, - null, - nodeId, - taskName(), - evt.getKey(), - evt.getValue(), - evt.getOldValue(), - null - )); - } - } - - /** - * @return {@code True} if event happen on primary node otherwise {@code false}. - */ - public boolean filter() { - CacheContinuousQueryEntry entry = evt.entry(); - - notify = !entry.isFiltered(); - - try { - if (notify && getEventFilter() != null) - notify = getEventFilter().evaluate(evt); - } - catch (Exception e) { - U.error(log, "CacheEntryEventFilter failed: " + e); - } - - if (!notify) - entry.markFiltered(); - - if (!primary()) { - if (!internal) { - // Skip init query and expire entries. - if (entry.updateCounter() != -1L) { - entry.markBackup(); - - backupQueue.add(entry); - } - } - - return false; - } - - return true; - } - - private String taskName() { - return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index e86ec47..8eca81c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -19,8 +19,8 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.util.Map; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.jetbrains.annotations.Nullable; /** @@ -38,11 +38,10 @@ public interface CacheContinuousQueryListener<K, V> { * @param evt Event * @param primary Primary flag. * @param recordIgniteEvt Whether to record event. - * @param fireEvent Immediately fired events. * @param fut Dht atomic future. */ public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, - boolean recordIgniteEvt, boolean fireEvent, @Nullable IgniteInternalFuture<?> fut); + boolean recordIgniteEvt, @Nullable GridDhtAtomicUpdateFuture fut); /** * Listener unregistered callback. @@ -74,8 +73,7 @@ public interface CacheContinuousQueryListener<K, V> { * @param topVer Topology version. * @param primary Primary */ - public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, - AffinityTopologyVersion topVer, boolean primary, boolean fireEvnt); + public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary); /** * @param part Partition. http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index f6ab8b5..3a5e891 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -25,7 +25,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; @@ -47,8 +46,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; @@ -57,11 +54,13 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.security.SecurityPermission; @@ -170,7 +169,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param partId Partition id. * @param updCntr Updated counter. * @param primary Primary. - * @param fireEvnt * @param topVer Topology version. */ public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, @@ -178,7 +176,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { int partId, long updCntr, boolean primary, - boolean fireEvnt, AffinityTopologyVersion topVer) { assert lsnrs != null; @@ -197,7 +194,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.skipUpdateEvent(evt, topVer, primary, fireEvnt); + lsnr.skipUpdateEvent(evt, topVer, primary); } } @@ -242,10 +239,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean internal, int partId, boolean primary, - boolean fireEvnt, boolean preload, long updateCntr, - @Nullable IgniteInternalFuture<?> fut, + @Nullable GridDhtAtomicUpdateFuture fut, AffinityTopologyVersion topVer) throws IgniteCheckedException { Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload); @@ -258,7 +254,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { internal, partId, primary, - fireEvnt, preload, updateCntr, fut, @@ -274,7 +269,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param internal Internal entry (internal key or not user cache), * @param partId Partition. * @param primary {@code True} if called on primary node. - * @param fireEvnt Fired event immediately. * @param preload Whether update happened during preloading. * @param updateCntr Update counter. * @param topVer Topology version. @@ -289,10 +283,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean internal, int partId, boolean primary, - boolean fireEvnt, boolean preload, long updateCntr, - @Nullable IgniteInternalFuture<?> fut, + @Nullable GridDhtAtomicUpdateFuture fut, AffinityTopologyVersion topVer) throws IgniteCheckedException { @@ -303,7 +296,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean hasOldVal = oldVal != null; if (!hasNewVal && !hasOldVal) - skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, fireEvnt, topVer); + skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, topVer); EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED; @@ -343,7 +336,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fireEvnt, fut); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut); } } @@ -397,7 +390,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, true, null); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, null); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 82b3f4b..3127dcb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -26,6 +26,7 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.eviction.EvictableEntry; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -537,7 +538,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr UUID subjId, String taskName, @Nullable CacheObject prevVal, - @Nullable Long updateCntr) throws IgniteCheckedException, + @Nullable Long updateCntr, + @Nullable GridDhtAtomicUpdateFuture fut) throws IgniteCheckedException, GridCacheEntryRemovedException { assert false; http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java index 5d7afdc..a556abd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryListenerException; @@ -90,4 +91,43 @@ public class CacheContinuousQueryFactoryAsyncFilterRandomOperationTest return new NonSerializableAsyncFilter(); } } + + /** {@inheritDoc} */ + @Override protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> noOpFilterFactory() { + return FactoryBuilder.factoryOf(NoopAsyncFilter.class); + } + + /** + * + */ + @IgniteAsyncCallback + protected static class NoopAsyncFilter implements + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable { + /** */ + public NoopAsyncFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) + throws CacheEntryListenerException { + assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("contQry-")); + + assertFalse("Failed. Current thread name: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + return true; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java index e728b91..8a7eb86 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java @@ -62,7 +62,7 @@ import org.apache.ignite.transactions.Transaction; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; /** * @@ -163,16 +163,6 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes /** * @throws Exception If failed. */ - public void testAtomicReplicated() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC, - CacheMemoryMode.ONHEAP_TIERED); - - doOrderingTest(ccfg, false); - } - - /** - * @throws Exception If failed. - */ public void testAtomicReplicatedOffheap() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC, CacheMemoryMode.OFFHEAP_TIERED); @@ -183,16 +173,6 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes /** * @throws Exception If failed. */ - public void testAtomicOnheapWithoutBackup() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.ATOMIC, - CacheMemoryMode.ONHEAP_TIERED); - - doOrderingTest(ccfg, false); - } - - /** - * @throws Exception If failed. - */ public void testTxOnheapTwoBackup() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.TRANSACTIONAL, CacheMemoryMode.ONHEAP_TIERED); @@ -203,7 +183,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes /** * @throws Exception If failed. */ - public void testTxOnheap() throws Exception { + public void testTxOnheapWithoutBackup() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.TRANSACTIONAL, CacheMemoryMode.ONHEAP_TIERED); @@ -585,7 +565,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes ccfg.setAtomicityMode(atomicityMode); ccfg.setCacheMode(cacheMode); ccfg.setMemoryMode(memoryMode); - ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setWriteSynchronizationMode(PRIMARY_SYNC); ccfg.setAtomicWriteOrderMode(PRIMARY); if (cacheMode == PARTITIONED) http://git-wip-us.apache.org/repos/asf/ignite/blob/b118b685/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java index e9fbf70..e5d007d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java @@ -598,6 +598,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts = new CopyOnWriteArrayList<>(); + if (noOpFilterFactory() != null) + qry.setRemoteFilterFactory(noOpFilterFactory()); + qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> events) throws CacheEntryListenerException { @@ -699,6 +702,13 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } /** + * @return No-op filter factory for batch operations. + */ + protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> noOpFilterFactory() { + return null; + } + + /** * @param ccfg Cache configuration. * @throws Exception If failed. */ @@ -711,6 +721,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts = new CopyOnWriteArrayList<>(); + if (noOpFilterFactory() != null) + qry.setRemoteFilterFactory(noOpFilterFactory()); + qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> events) throws CacheEntryListenerException {
