Repository: ignite Updated Branches: refs/heads/master 42161cdd5 -> b2531569d
IGNITE-7346 Enable Ignite cache events per cache Signed-off-by: Anton Vinogradov <a...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b2531569 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b2531569 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b2531569 Branch: refs/heads/master Commit: b2531569d42aae871b09728a6dd7a850f3cbe2b3 Parents: 42161cd Author: Aleksey Plekhanov <plehanov.a...@gmail.com> Authored: Thu Feb 8 19:51:35 2018 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Thu Feb 8 19:51:35 2018 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 28 +++++++ .../processors/cache/CacheGroupContext.java | 48 ++++++------ .../processors/cache/GridCacheEventManager.java | 3 +- .../distributed/dht/GridDhtLocalPartition.java | 2 +- .../cache/query/GridCacheQueryManager.java | 18 ++--- .../continuous/CacheContinuousQueryHandler.java | 4 +- .../continuous/CacheContinuousQueryManager.java | 4 +- .../processors/query/GridQueryProcessor.java | 10 +-- .../cache/IgniteDynamicCacheStartSelfTest.java | 8 ++ .../distributed/GridCacheEventAbstractTest.java | 82 +++++++++++++++----- ...ridCachePartitionedUnloadEventsSelfTest.java | 25 ++++-- ...ridCacheReplicatedEventDisabledSelfTest.java | 75 ++++++++++++++++++ .../GridCacheReplicatedPreloadSelfTest.java | 7 ++ .../testsuites/IgniteCacheTestSuite3.java | 2 + .../query/h2/twostep/GridMapQueryExecutor.java | 9 +-- .../query/h2/twostep/MapQueryResult.java | 24 +++--- .../query/h2/twostep/MapQueryResults.java | 18 ++--- .../cache/IgniteCacheAbstractQuerySelfTest.java | 54 ++++++++++--- ...chePartitionedQueryEvtsDisabledSelfTest.java | 30 +++++++ ...acheReplicatedQueryEvtsDisabledSelfTest.java | 30 +++++++ .../IgniteCacheReplicatedQuerySelfTest.java | 25 ++---- .../IgniteCacheQuerySelfTestSuite.java | 18 +++-- .../ApiParity/CacheConfigurationParityTest.cs | 3 +- 23 files changed, 397 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 3a40824..d41e687 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -181,6 +181,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Default query parallelism. */ public static final int DFLT_QUERY_PARALLELISM = 1; + /** Default value for events disabled flag. */ + public static final boolean DFLT_EVENTS_DISABLED = false; + /** Cache name. */ private String name; @@ -361,6 +364,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Cache key configuration. */ private CacheKeyConfiguration[] keyCfg; + /** Events disabled. */ + private boolean evtsDisabled = DFLT_EVENTS_DISABLED; + /** Empty constructor (all values are initialized to their defaults). */ public CacheConfiguration() { /* No-op. */ @@ -453,6 +459,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { storeConcurrentLoadAllThreshold = cc.getStoreConcurrentLoadAllThreshold(); maxQryIterCnt = cc.getMaxQueryIteratorsCount(); sqlOnheapCache = cc.isSqlOnheapCacheEnabled(); + evtsDisabled = cc.isEventsDisabled(); } /** @@ -2184,6 +2191,27 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** + * Checks whether events are disabled for this cache. + * + * @return Events disabled flag. + */ + public Boolean isEventsDisabled() { + return evtsDisabled; + } + + /** + * Sets events disabled flag. + * + * @param evtsDisabled Events disabled flag. + * @return {@code this} for chaining. + */ + public CacheConfiguration<K, V> setEventsDisabled(boolean evtsDisabled) { + this.evtsDisabled = evtsDisabled; + + return this; + } + + /** * Gets cache key configuration. * * @return Cache key configuration. http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index b980d5c..fd9ea1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -407,7 +407,7 @@ public class CacheGroupContext { for (int i = 0; i < caches.size(); i++) { GridCacheContext cctx = caches.get(i); - if (cctx.recordEvent(type)) { + if (!cctx.config().isEventsDisabled() && cctx.recordEvent(type)) { cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), cctx.localNode(), "Cache rebalancing event.", @@ -434,14 +434,15 @@ public class CacheGroupContext { for (int i = 0; i < caches.size(); i++) { GridCacheContext cctx = caches.get(i); - cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), - cctx.localNode(), - "Cache unloading event.", - EVT_CACHE_REBALANCE_PART_UNLOADED, - part, - null, - 0, - 0)); + if (!cctx.config().isEventsDisabled()) + cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), + cctx.localNode(), + "Cache unloading event.", + EVT_CACHE_REBALANCE_PART_UNLOADED, + part, + null, + 0, + 0)); } } @@ -472,20 +473,21 @@ public class CacheGroupContext { for (int i = 0; i < caches.size(); i++) { GridCacheContext cctx = caches.get(i); - cctx.events().addEvent(part, - key, - evtNodeId, - (IgniteUuid)null, - null, - type, - newVal, - hasNewVal, - oldVal, - hasOldVal, - null, - null, - null, - keepBinary); + if (!cctx.config().isEventsDisabled()) + cctx.events().addEvent(part, + key, + evtNodeId, + (IgniteUuid)null, + null, + type, + newVal, + hasNewVal, + oldVal, + hasOldVal, + null, + null, + null, + keepBinary); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index a9692f8..3c5cf1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -372,7 +372,8 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { public boolean isRecordable(int type) { GridCacheContext cctx0 = cctx; - return cctx0 != null && cctx0.userCache() && cctx0.gridEvents().isRecordable(type); + return cctx0 != null && cctx0.userCache() && cctx0.gridEvents().isRecordable(type) + && !cctx0.config().isEventsDisabled(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index e63aab6..7a982ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -903,7 +903,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { removeEntry(cached); - if (rec) { + if (rec && !hld.cctx.config().isEventsDisabled()) { hld.cctx.events().addEvent(cached.partition(), cached.key(), ctx.localNodeId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index fe8e054..ea19c2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -68,7 +68,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; @@ -77,7 +76,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate; import org.apache.ignite.internal.processors.datastructures.SetItemKey; @@ -577,7 +575,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throw new IllegalStateException("Should never be called."); case SCAN: - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { cctx.gridEvents().record(new CacheQueryExecutedEvent<>( cctx.localNode(), "Scan query executed.", @@ -598,7 +596,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte break; case TEXT: - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { cctx.gridEvents().record(new CacheQueryExecutedEvent<>( cctx.localNode(), "Full text query executed.", @@ -672,7 +670,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } if (qry.type() == SQL_FIELDS) { - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { cctx.gridEvents().record(new CacheQueryExecutedEvent<>( cctx.localNode(), "SQL fields query executed.", @@ -704,7 +702,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte else { assert qry.type() == SPI : "Unexpected query type: " + qry.type(); - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { cctx.gridEvents().record(new CacheQueryExecutedEvent<>( cctx.localNode(), "SPI query executed.", @@ -912,7 +910,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final boolean statsEnabled = cctx.statisticsEnabled(); - final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + final boolean readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); try { // Preparing query closures. @@ -1154,7 +1152,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final boolean statsEnabled = cctx.statisticsEnabled(); - final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + final boolean readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); CacheObjectContext objCtx = cctx.cacheObjectContext(); @@ -1424,7 +1422,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final ClusterNode locNode = cctx.localNode(); final UUID subjId = qry.subjectId(); - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { cctx.gridEvents().record(new CacheQueryExecutedEvent<>( locNode, "Scan query executed.", @@ -2957,7 +2955,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte statsEnabled = locNode && cctx.statisticsEnabled(); - readEvt = locNode && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + readEvt = locNode && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); if(readEvt){ taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index f0cd7ca..9ff4623 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -365,7 +365,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { @Override public void onExecution() { - if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx != null && cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { ctx.event().record(new CacheQueryExecutedEvent<>( ctx.discovery().localNode(), "Continuous query executed.", http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 1e131ef..f40c077 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -331,7 +331,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean initialized = false; - boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + boolean recordIgniteEvt = primary && !internal && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); for (CacheContinuousQueryListener lsnr : lsnrCol.values()) { if (preload && !lsnr.notifyExisting()) @@ -398,7 +398,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean primary = cctx.affinity().primaryByPartition(cctx.localNode(), e.partition(), AffinityTopologyVersion.NONE); if (cctx.isReplicated() || primary) { - boolean recordIgniteEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + boolean recordIgniteEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); boolean initialized = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index cbb6b9b..f666cdd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -2037,7 +2037,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { idx.querySqlFields(schemaName, qry, keepBinary, failOnMultipleStmts, cancel); if (cctx != null) - sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx.name()); + sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx); return res; } @@ -2179,7 +2179,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { sendQueryExecutedEvent( qry.getSql(), qry.getArgs(), - cctx.name()); + cctx); if (cctx.config().getQueryParallelism() > 1) { qry.setDistributedJoins(true); @@ -2343,14 +2343,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param sqlQry Sql query. * @param params Params. */ - private void sendQueryExecutedEvent(String sqlQry, Object[] params, String cacheName) { - if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + private void sendQueryExecutedEvent(String sqlQry, Object[] params, GridCacheContext<?, ?> cctx) { + if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { ctx.event().record(new CacheQueryExecutedEvent<>( ctx.discovery().localNode(), "SQL query executed.", EVT_CACHE_QUERY_EXECUTED, CacheQueryType.SQL.name(), - cacheName, + cctx.name(), null, sqlQry, null, http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 8f601f8..50c0579 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -744,6 +744,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { cfg.setName(DYNAMIC_CACHE_NAME); cfg.setCacheMode(CacheMode.REPLICATED); + // This cache will not fire any cache events. + CacheConfiguration cfgEvtsDisabled = new CacheConfiguration(cfg); + + cfgEvtsDisabled.setName("DynamicCacheEvtsDisabled"); + cfgEvtsDisabled.setEventsDisabled(true); + final CountDownLatch[] starts = new CountDownLatch[nodeCount()]; final CountDownLatch[] stops = new CountDownLatch[nodeCount()]; @@ -781,6 +787,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ignite(i).events().localListen(lsnrs[i], EventType.EVTS_CACHE_LIFECYCLE); } + IgniteCache<Object, Object> cacheEvtsDisabled = ignite(0).createCache(cfgEvtsDisabled); IgniteCache<Object, Object> cache = ignite(0).createCache(cfg); try { @@ -788,6 +795,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { start.await(); } finally { + cacheEvtsDisabled.destroy(); cache.destroy(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java index 2202339..55190ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java @@ -65,6 +65,9 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe /** */ private static volatile int gridCnt; + /** Event listener. */ + protected static volatile EventListener evtLsnr; + /** * @return {@code True} if partitioned. */ @@ -78,8 +81,10 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe gridCnt = gridCount(); + evtLsnr = createEventListener(); + for (int i = 0; i < gridCnt; i++) - grid(i).events().localListen(new TestEventListener(partitioned()), EVTS_CACHE); + grid(i).events().localListen(evtLsnr, EVTS_CACHE); } /** {@inheritDoc} */ @@ -89,7 +94,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe if (TEST_INFO) info("Called beforeTest() callback."); - TestEventListener.reset(); + evtLsnr.reset(); } /** {@inheritDoc} */ @@ -97,17 +102,24 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe if (TEST_INFO) info("Called afterTest() callback."); - TestEventListener.stopListen(); + evtLsnr.stopListen(); try { super.afterTest(); } finally { - TestEventListener.listen(); + evtLsnr.listen(); } } /** + * Create event listener. + */ + protected EventListener createEventListener() { + return new TestEventListener(partitioned()); + } + + /** * Waits for event count on all nodes. * * @param gridIdx Grid index. @@ -117,7 +129,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe private void waitForEvents(int gridIdx, IgniteBiTuple<Integer, Integer>... evtCnts) throws Exception { if (!F.isEmpty(evtCnts)) try { - TestEventListener.waitForEventCount(evtCnts); + evtLsnr.waitForEventCount(evtCnts); } catch (IgniteCheckedException e) { printEventCounters(gridIdx, evtCnts); @@ -136,7 +148,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe for (IgniteBiTuple<Integer, Integer> t : expCnts) { Integer evtType = t.get1(); - int actCnt = TestEventListener.eventCount(evtType); + int actCnt = evtLsnr.eventCount(evtType); info("Event [evtType=" + evtType + ", expCnt=" + t.get2() + ", actCnt=" + actCnt + ']'); } @@ -178,13 +190,13 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe } finally { // This call is mainly required to correctly clear event futures. - TestEventListener.reset(); + evtLsnr.reset(); clearCaches(); // This call is required for the second time to reset counters for // the previous call. - TestEventListener.reset(); + evtLsnr.reset(); } } } @@ -649,20 +661,52 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe } /** + * Event listener interface. + */ + protected static interface EventListener extends IgnitePredicate<Event> { + /** + * Start listen. + */ + void listen(); + + /** + * Stop listen. + */ + void stopListen(); + + /** + * Gets events count by type. + * + * @param type Type. + */ + int eventCount(int type); + + /** + * Reset event counters. + */ + void reset(); + + /** + * @param evtCnts Event counters. + */ + void waitForEventCount(IgniteBiTuple<Integer, Integer>... evtCnts) throws IgniteCheckedException; + } + + /** * Local event listener. */ - private static class TestEventListener implements IgnitePredicate<Event> { + private static class TestEventListener implements EventListener { /** Events count map. */ - private static ConcurrentMap<Integer, AtomicInteger> cntrs = new ConcurrentHashMap<>(); + private ConcurrentMap<Integer, AtomicInteger> cntrs = new ConcurrentHashMap<>(); /** Event futures. */ - private static Collection<EventTypeFuture> futs = new GridConcurrentHashSet<>(); + private Collection<EventTypeFuture> futs = new GridConcurrentHashSet<>(); /** */ - private static volatile boolean listen = true; + private volatile boolean listen = true; /** */ - private static boolean partitioned; + private boolean partitioned; /** * @param p Partitioned flag. @@ -674,14 +718,14 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe /** * */ - private static void listen() { + public void listen() { listen = true; } /** * */ - private static void stopListen() { + public void stopListen() { listen = false; } @@ -689,7 +733,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe * @param type Event type. * @return Count. */ - static int eventCount(int type) { + public int eventCount(int type) { assert type > 0; AtomicInteger cntr = cntrs.get(type); @@ -700,7 +744,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe /** * Reset listener. */ - static void reset() { + public void reset() { cntrs.clear(); futs.clear(); @@ -734,7 +778,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe * @param evtCnts Array of tuples with values: V1 - event type, V2 - expected event count. * @throws IgniteCheckedException If failed to wait. */ - private static void waitForEventCount(IgniteBiTuple<Integer, Integer>... evtCnts) + public void waitForEventCount(IgniteBiTuple<Integer, Integer>... evtCnts) throws IgniteCheckedException { if (F.isEmpty(evtCnts)) return; @@ -837,4 +881,4 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe return S.toString(EventTypeFuture.class, this, "evtName", U.gridEventName(evtType)); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedUnloadEventsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedUnloadEventsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedUnloadEventsSelfTest.java index 8dc2835..d6cea58 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedUnloadEventsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedUnloadEventsSelfTest.java @@ -50,6 +50,9 @@ public class GridCachePartitionedUnloadEventsSelfTest extends GridCommonAbstract /** */ private static final int EVENTS_COUNT = 40; + /** Default cache name with cache events disabled. */ + private static final String DEFAULT_CACHE_NAME_EVTS_DISABLED = DEFAULT_CACHE_NAME + "EvtsDisabled"; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -58,7 +61,14 @@ public class GridCachePartitionedUnloadEventsSelfTest extends GridCommonAbstract disco.setIpFinder(ipFinder); cfg.setDiscoverySpi(disco); - cfg.setCacheConfiguration(cacheConfiguration()); + CacheConfiguration<?, ?> ccfg = cacheConfiguration(); + + CacheConfiguration<?, ?> ccfgEvtsDisabled = new CacheConfiguration<>(ccfg); + + ccfgEvtsDisabled.setName(DEFAULT_CACHE_NAME_EVTS_DISABLED); + ccfgEvtsDisabled.setEventsDisabled(true); + + cfg.setCacheConfiguration(ccfg, ccfgEvtsDisabled); return cfg; } @@ -83,16 +93,21 @@ public class GridCachePartitionedUnloadEventsSelfTest extends GridCommonAbstract Collection<Integer> allKeys = new ArrayList<>(EVENTS_COUNT); - IgniteCache<Integer, String> cache = g1.cache(DEFAULT_CACHE_NAME); + IgniteCache<Integer, String> cache1 = g1.cache(DEFAULT_CACHE_NAME); + IgniteCache<Integer, String> cache2 = g1.cache(DEFAULT_CACHE_NAME_EVTS_DISABLED); for (int i = 0; i < EVENTS_COUNT; i++) { - cache.put(i, "val"); + cache1.put(i, "val"); + + // Events should not be fired by this put. + cache2.put(i, "val"); + allKeys.add(i); } Ignite g2 = startGrid("g2"); - awaitPartitionMapExchange(); + awaitPartitionMapExchange(true, true, null); Map<ClusterNode, Collection<Object>> keysMap = g1.affinity(DEFAULT_CACHE_NAME).mapKeysToNodes(allKeys); Collection<Object> g2Keys = keysMap.get(g2.cluster().localNode()); @@ -156,4 +171,4 @@ public class GridCachePartitionedUnloadEventsSelfTest extends GridCommonAbstract assertEquals(g.cluster().localNode().id(), unloadEvt.node().id()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedEventDisabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedEventDisabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedEventDisabledSelfTest.java new file mode 100644 index 0000000..84cbbd1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedEventDisabledSelfTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.replicated; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.lang.IgniteBiTuple; + +/** + * Tests events. + */ +public class GridCacheReplicatedEventDisabledSelfTest extends GridCacheReplicatedEventSelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception { + return super.cacheConfiguration(igniteInstanceName).setEventsDisabled(true); + } + + /** {@inheritDoc} */ + @Override protected EventListener createEventListener() { + return new TestEventListener(); + } + + /** + * Test event listener. + */ + private static class TestEventListener implements EventListener { + /** {@inheritDoc} */ + @Override public void listen() { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public void stopListen() { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public int eventCount(int type) { + return 0; + } + + /** {@inheritDoc} */ + @Override public void reset() { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public void waitForEventCount(IgniteBiTuple<Integer, Integer>... evtCnts) throws IgniteCheckedException { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + fail("Cache events are disabled"); + + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java index e90b7e1..f114091 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java @@ -242,6 +242,13 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { GridCacheAdapter<Integer, String> cache1 = ((IgniteKernal)g1).internalCache(DEFAULT_CACHE_NAME); + // Cache rebalancing events should not be fired for this cache. + CacheConfiguration ccfg = cacheConfiguration(((IgniteKernal)g1).getInstanceName()) + .setName(DEFAULT_CACHE_NAME + "_evts_disabled") + .setEventsDisabled(true); + + g1.getOrCreateCache(ccfg); + cache1.getAndPut(1, "val1"); cache1.getAndPut(2, "val2"); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index 674b6a2..503f39a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCa import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicApiTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicOpSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicStoreSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEventDisabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEventSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEvictionEventSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedGetAndTransformStoreSelfTest; @@ -109,6 +110,7 @@ public class IgniteCacheTestSuite3 extends TestSuite { suite.addTestSuite(GridCacheReplicatedGetAndTransformStoreSelfTest.class); suite.addTestSuite(GridCacheReplicatedAtomicGetAndTransformStoreSelfTest.class); suite.addTestSuite(GridCacheReplicatedEventSelfTest.class); + suite.addTestSuite(GridCacheReplicatedEventDisabledSelfTest.class); suite.addTestSuite(GridCacheReplicatedSynchronousCommitTest.class); suite.addTestSuite(GridCacheReplicatedLockSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 52b641e..5fa90c1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -620,8 +620,7 @@ public class GridMapQueryExecutor { } } - qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null, - MapQueryLazyWorker.currentWorker()); + qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, MapQueryLazyWorker.currentWorker()); if (nodeRess.put(reqId, segmentId, qr) != null) throw new IllegalStateException(); @@ -660,7 +659,7 @@ public class GridMapQueryExecutor { // Run queries. int qryIdx = 0; - boolean evt = mainCctx != null && ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED); + boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED); for (GridCacheSqlQuery qry : qrys) { ResultSet rs = null; @@ -819,7 +818,7 @@ public class GridMapQueryExecutor { GridCacheContext<?, ?> mainCctx = !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null; - boolean evt = local && mainCctx != null && ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED); + boolean evt = local && mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED); if (evt) { ctx.event().record(new CacheQueryExecutedEvent<>( @@ -1073,4 +1072,4 @@ public class GridMapQueryExecutor { public int registeredLazyWorkers() { return lazyWorkers.size(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java index e54c784d..beeb054 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -17,8 +17,14 @@ package org.apache.ignite.internal.processors.query.h2.twostep; +import java.lang.reflect.Field; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; @@ -31,12 +37,6 @@ import org.h2.result.ResultInterface; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; -import java.lang.reflect.Field; -import java.sql.ResultSet; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; /** @@ -70,7 +70,7 @@ class MapQueryResult { private final ResultSet rs; /** */ - private final String cacheName; + private final GridCacheContext<?, ?> cctx; /** */ private final GridCacheSqlQuery qry; @@ -101,16 +101,16 @@ class MapQueryResult { /** * @param rs Result set. - * @param cacheName Cache name. + * @param cctx Cache context. * @param qrySrcNodeId Query source node. * @param qry Query. * @param params Query params. * @param lazyWorker Lazy worker. */ - MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable String cacheName, + MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable GridCacheContext cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) { this.h2 = h2; - this.cacheName = cacheName; + this.cctx = cctx; this.qry = qry; this.params = params; this.qrySrcNodeId = qrySrcNodeId; @@ -179,7 +179,7 @@ class MapQueryResult { if (closed) return true; - boolean readEvt = cacheName != null && h2.kernalContext().event().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + boolean readEvt = cctx != null && cctx.name() != null && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); page++; @@ -222,7 +222,7 @@ class MapQueryResult { "SQL fields query result set row read.", EVT_CACHE_QUERY_OBJECT_READ, CacheQueryType.SQL.name(), - cacheName, + cctx.name(), null, qry.query(), null, http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java index 99f1966..45f9c1f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java @@ -17,15 +17,15 @@ package org.apache.ignite.internal.processors.query.h2.twostep; +import java.sql.ResultSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReferenceArray; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.jetbrains.annotations.Nullable; -import java.sql.ResultSet; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReferenceArray; - /** * Mapper query results. */ @@ -43,7 +43,7 @@ class MapQueryResults { private final GridQueryCancel[] cancels; /** */ - private final String cacheName; + private final GridCacheContext<?, ?> cctx; /** Lazy worker. */ private final MapQueryLazyWorker lazyWorker; @@ -56,15 +56,15 @@ class MapQueryResults { * * @param qryReqId Query request ID. * @param qrys Number of queries. - * @param cacheName Cache name. + * @param cctx Cache context. * @param lazyWorker Lazy worker (if any). */ @SuppressWarnings("unchecked") - MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, @Nullable String cacheName, + MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, @Nullable GridCacheContext<?, ?> cctx, @Nullable MapQueryLazyWorker lazyWorker) { this.h2 = h2; this.qryReqId = qryReqId; - this.cacheName = cacheName; + this.cctx = cctx; this.lazyWorker = lazyWorker; results = new AtomicReferenceArray<>(qrys); @@ -108,7 +108,7 @@ class MapQueryResults { * @param rs Result set. */ void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) { - MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params, lazyWorker); + MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params, lazyWorker); if (lazyWorker != null) lazyWorker.result(res); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index 97ef8e5..f2fef29 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -1466,8 +1466,10 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac * @throws Exception If failed. */ private void checkSqlQueryEvents() throws Exception { - final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount()); final IgniteCache<Integer, Integer> cache = jcache(Integer.class, Integer.class); + final boolean evtsDisabled = cache.getConfiguration(CacheConfiguration.class).isEventsDisabled(); + final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 : + cacheMode() == REPLICATED ? 1 : gridCount()); IgnitePredicate[] lsnrs = new IgnitePredicate[gridCount()]; @@ -1476,6 +1478,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac @Override public boolean apply(Event evt) { assert evt instanceof CacheQueryExecutedEvent; + if (evtsDisabled) + fail("Cache events are disabled"); + CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; assertEquals(cache.getName(), qe.cacheName()); @@ -1517,9 +1522,11 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac */ public void testScanQueryEvents() throws Exception { final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(10); - final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount()); final IgniteCache<Integer, Integer> cache = jcache(Integer.class, Integer.class); + final boolean evtsDisabled = cache.getConfiguration(CacheConfiguration.class).isEventsDisabled(); + final CountDownLatch latch = new CountDownLatch(evtsDisabled ? 0 : 10); + final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 : + cacheMode() == REPLICATED ? 1 : gridCount()); IgnitePredicate[] objReadLsnrs = new IgnitePredicate[gridCount()]; IgnitePredicate[] qryExecLsnrs = new IgnitePredicate[gridCount()]; @@ -1529,6 +1536,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac @Override public boolean apply(Event evt) { assert evt instanceof CacheQueryReadEvent; + if (evtsDisabled) + fail("Cache events are disabled"); + CacheQueryReadEvent<Integer, Integer> qe = (CacheQueryReadEvent<Integer, Integer>)evt; assertEquals(SCAN.name(), qe.queryType()); @@ -1555,6 +1565,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac @Override public boolean apply(Event evt) { assert evt instanceof CacheQueryExecutedEvent; + if (evtsDisabled) + fail("Cache events are disabled"); + CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; assertEquals(SCAN.name(), qe.queryType()); @@ -1593,10 +1606,12 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac assert latch.await(1000, MILLISECONDS); assert execLatch.await(1000, MILLISECONDS); - assertEquals(10, map.size()); + if (!evtsDisabled) { + assertEquals(10, map.size()); - for (int i = 10; i < 20; i++) - assertEquals(i, map.get(i).intValue()); + for (int i = 10; i < 20; i++) + assertEquals(i, map.get(i).intValue()); + } } finally { for (int i = 0; i < gridCount(); i++) { @@ -1611,9 +1626,11 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac */ public void testTextQueryEvents() throws Exception { final Map<UUID, Person> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(2); - final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount()); final IgniteCache<UUID, Person> cache = jcache(UUID.class, Person.class); + final boolean evtsDisabled = cache.getConfiguration(CacheConfiguration.class).isEventsDisabled(); + final CountDownLatch latch = new CountDownLatch(evtsDisabled ? 0 : 2); + final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 : + cacheMode() == REPLICATED ? 1 : gridCount()); IgnitePredicate[] objReadLsnrs = new IgnitePredicate[gridCount()]; IgnitePredicate[] qryExecLsnrs = new IgnitePredicate[gridCount()]; @@ -1623,6 +1640,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac @Override public boolean apply(Event evt) { assert evt instanceof CacheQueryReadEvent; + if (evtsDisabled) + fail("Cache events are disabled"); + CacheQueryReadEvent<UUID, Person> qe = (CacheQueryReadEvent<UUID, Person>)evt; assertEquals(FULL_TEXT.name(), qe.queryType()); @@ -1649,6 +1669,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac @Override public boolean apply(Event evt) { assert evt instanceof CacheQueryExecutedEvent; + if (evtsDisabled) + fail("Cache events are disabled"); + CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; assertEquals(FULL_TEXT.name(), qe.queryType()); @@ -1686,10 +1709,12 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac assert latch.await(1000, MILLISECONDS); assert execLatch.await(1000, MILLISECONDS); - assertEquals(2, map.size()); + if (!evtsDisabled) { + assertEquals(2, map.size()); - assertEquals("Bob White", map.get(k1).name()); - assertEquals("Tom White", map.get(k2).name()); + assertEquals("Bob White", map.get(k1).name()); + assertEquals("Tom White", map.get(k2).name()); + } } finally { for (int i = 0; i < gridCount(); i++) { @@ -1703,8 +1728,10 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testFieldsQueryEvents() throws Exception { - final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount()); final IgniteCache<UUID, Person> cache = jcache(UUID.class, Person.class); + final boolean evtsDisabled = cache.getConfiguration(CacheConfiguration.class).isEventsDisabled(); + final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 : + cacheMode() == REPLICATED ? 1 : gridCount()); IgnitePredicate[] qryExecLsnrs = new IgnitePredicate[gridCount()]; @@ -1713,6 +1740,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac @Override public boolean apply(Event evt) { assert evt instanceof CacheQueryExecutedEvent; + if (evtsDisabled) + fail("Cache events are disabled"); + CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; assertEquals(cache.getName(), qe.cacheName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryEvtsDisabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryEvtsDisabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryEvtsDisabledSelfTest.java new file mode 100644 index 0000000..3ca2492 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryEvtsDisabledSelfTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.configuration.CacheConfiguration; + +/** + * Tests for partitioned cache queries with events disabled. + */ +public class IgniteCachePartitionedQueryEvtsDisabledSelfTest extends IgniteCachePartitionedQuerySelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration() { + return super.cacheConfiguration().setEventsDisabled(true); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQueryEvtsDisabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQueryEvtsDisabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQueryEvtsDisabledSelfTest.java new file mode 100644 index 0000000..09b7f0d --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQueryEvtsDisabledSelfTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.replicated; + +import org.apache.ignite.configuration.CacheConfiguration; + +/** + * Tests replicated query with disabled events. + */ +public class IgniteCacheReplicatedQueryEvtsDisabledSelfTest extends IgniteCacheReplicatedQuerySelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration() { + return super.cacheConfiguration().setEventsDisabled(true); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java index ec1a16d..13942c2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java @@ -38,9 +38,9 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractQuerySelfTest; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -241,20 +241,7 @@ public class IgniteCacheReplicatedQuerySelfTest extends IgniteCacheAbstractQuery * @throws Exception If test failed. */ public void testDistributedQuery() throws Exception { - int keyCnt = 4; - - final CountDownLatch latch = new CountDownLatch(keyCnt * 2); - - IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - latch.countDown(); - - return true; - } - }; - - ignite2.events().localListen(lsnr, EventType.EVT_CACHE_OBJECT_PUT); - ignite3.events().localListen(lsnr, EventType.EVT_CACHE_OBJECT_PUT); + final int keyCnt = 4; Transaction tx = ignite1.transactions().txStart(); @@ -272,7 +259,11 @@ public class IgniteCacheReplicatedQuerySelfTest extends IgniteCacheAbstractQuery throw e; } - latch.await(); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return cache2.size() == keyCnt && cache3.size() == keyCnt; + } + }, 5000); QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry = cache1.query(new SqlQuery<CacheKey, CacheValue>(CacheValue.class, "val > 1 and val < 4")); @@ -586,4 +577,4 @@ public class IgniteCacheReplicatedQuerySelfTest extends IgniteCacheAbstractQuery return S.toString(CacheValue.class, this); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 6295d8d..f004453 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -87,6 +87,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheD import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedQueryCancelSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQueryP2PEnabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQuerySelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQueryEvtsDisabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQueryP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest; @@ -96,6 +97,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryROSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQuerySelfTest; +import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQueryEvtsDisabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQueryP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest; import org.apache.ignite.internal.processors.cache.index.DuplicateKeyValueClassesSelfTest; @@ -104,6 +106,7 @@ import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerBasic import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerCoordinatorBasicSelfTest; import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerNodeFIlterBasicSelfTest; import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerNodeFilterCoordinatorBasicSelfTest; +import org.apache.ignite.internal.processors.cache.index.H2ConnectionLeaksSelfTest; import org.apache.ignite.internal.processors.cache.index.H2DynamicColumnsClientBasicSelfTest; import org.apache.ignite.internal.processors.cache.index.H2DynamicColumnsServerBasicSelfTest; import org.apache.ignite.internal.processors.cache.index.H2DynamicColumnsServerCoordinatorBasicSelfTest; @@ -122,7 +125,6 @@ import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComple import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComplexServerTransactionalPartitionedTest; import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComplexServerTransactionalReplicatedTest; import org.apache.ignite.internal.processors.cache.index.H2DynamicTableSelfTest; -import org.apache.ignite.internal.processors.cache.index.H2ConnectionLeaksSelfTest; import org.apache.ignite.internal.processors.cache.index.H2RowCachePageEvictionTest; import org.apache.ignite.internal.processors.cache.index.H2RowCacheSelfTest; import org.apache.ignite.internal.processors.cache.index.LongIndexNameTest; @@ -137,28 +139,28 @@ import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDe import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest; import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest; import org.apache.ignite.internal.processors.client.ClientConnectorConfigurationValidationSelfTest; -import org.apache.ignite.internal.processors.query.IgniteCachelessQueriesSelfTest; import org.apache.ignite.internal.processors.database.baseline.IgniteStableBaselineBinObjFieldsQuerySelfTest; +import org.apache.ignite.internal.processors.query.IgniteCachelessQueriesSelfTest; +import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest; import org.apache.ignite.internal.processors.query.IgniteSqlDefaultValueTest; import org.apache.ignite.internal.processors.query.IgniteSqlDistributedJoinSelfTest; -import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest; -import org.apache.ignite.internal.processors.query.IgniteSqlParameterizedQueryTest; -import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest; -import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest; -import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest; import org.apache.ignite.internal.processors.query.IgniteSqlKeyValueFieldsTest; import org.apache.ignite.internal.processors.query.IgniteSqlNotNullConstraintTest; +import org.apache.ignite.internal.processors.query.IgniteSqlParameterizedQueryTest; import org.apache.ignite.internal.processors.query.IgniteSqlRoutingTest; import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest; import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexMultiNodeSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest; +import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest; +import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest; import org.apache.ignite.internal.processors.query.LazyQuerySelfTest; import org.apache.ignite.internal.processors.query.MultipleStatementsSqlQuerySelfTest; import org.apache.ignite.internal.processors.query.SqlSchemaSelfTest; import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest; import org.apache.ignite.internal.processors.query.h2.GridH2IndexingOffheapSelfTest; +import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest; import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest; import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest; import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest; @@ -236,11 +238,13 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheLocalAtomicQuerySelfTest.class); suite.addTestSuite(IgniteCacheReplicatedQuerySelfTest.class); suite.addTestSuite(IgniteCacheReplicatedQueryP2PDisabledSelfTest.class); + suite.addTestSuite(IgniteCacheReplicatedQueryEvtsDisabledSelfTest.class); suite.addTestSuite(IgniteCachePartitionedQuerySelfTest.class); suite.addTestSuite(IgniteCachePartitionedSnapshotEnabledQuerySelfTest.class); suite.addTestSuite(IgniteCacheAtomicQuerySelfTest.class); suite.addTestSuite(IgniteCacheAtomicNearEnabledQuerySelfTest.class); suite.addTestSuite(IgniteCachePartitionedQueryP2PDisabledSelfTest.class); + suite.addTestSuite(IgniteCachePartitionedQueryEvtsDisabledSelfTest.class); suite.addTestSuite(IgniteCacheQueryIndexSelfTest.class); suite.addTestSuite(IgniteCacheCollocatedQuerySelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2531569/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheConfigurationParityTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheConfigurationParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheConfigurationParityTest.cs index 0022e96..94f52ce 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheConfigurationParityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheConfigurationParityTest.cs @@ -58,7 +58,8 @@ namespace Apache.Ignite.Core.Tests.ApiParity { "NodeFilter", // IGNITE-2890 "EvictionPolicyFactory", // IGNITE-6649, - "isSqlOnheapCacheEnabled" // IGNITE-7379 + "isSqlOnheapCacheEnabled", // IGNITE-7379 + "isEventsDisabled" // IGNITE-7346 }; /// <summary>