IGNITE-4540: IndexingSPI can be used without have default H2 Indexing enabled. This closes #1423.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a922ac9d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a922ac9d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a922ac9d Branch: refs/heads/master Commit: a922ac9d17f91f25aaa2bac9f0a2622dbd04c9bb Parents: 8e622e4 Author: Andrey V. Mashenkov <[email protected]> Authored: Tue Jan 17 15:31:04 2017 +0300 Committer: Andrey V. Mashenkov <[email protected]> Committed: Tue Jan 17 15:31:04 2017 +0300 ---------------------------------------------------------------------- .../cache/query/GridCacheQueryManager.java | 83 ++++++++++++++++++-- .../processors/query/GridQueryProcessor.java | 46 ----------- .../cache/query/IndexingSpiQuerySelfTest.java | 66 ++++++++-------- .../IndexingSpiQueryWithH2IndexingSelfTest.java | 36 +++++++++ 4 files changed, 145 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 85c01d9..d64dff4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1,4 +1,4 @@ -/* + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -45,6 +45,7 @@ import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.query.QueryMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -60,6 +61,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -167,6 +169,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } }; + /** Default is @{code true} */ + private final boolean isIndexingSpiAllowsBinary = !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI); + /** */ private GridQueryProcessor qryProc; @@ -205,15 +210,24 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private boolean enabled; /** */ + private boolean qryProcEnabled; + + + /** */ private AffinityTopologyVersion qryTopVer; /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { CacheConfiguration ccfg = cctx.config(); + qryProcEnabled = GridQueryProcessor.isEnabled(ccfg); + qryProc = cctx.kernalContext().query(); + space = cctx.name(); + enabled = qryProcEnabled || (isIndexingSpiEnabled() && !CU.isSystemCache(space)); + maxIterCnt = ccfg.getMaxQueryIteratorsCount(); detailMetricsSz = ccfg.getQueryDetailMetricsSize(); @@ -259,8 +273,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); - enabled = GridQueryProcessor.isEnabled(ccfg); - qryTopVer = cctx.startTopologyVersion(); if (qryTopVer == null) @@ -369,11 +381,21 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @throws IgniteCheckedException If failed. */ public void onSwap(CacheObject key) throws IgniteCheckedException { + if(!enabled) + return; + if (!enterBusy()) return; // Ignore index update when node is stopping. try { - qryProc.onSwap(space, key); + if (isIndexingSpiEnabled()) { + Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext()); + + cctx.kernalContext().indexing().onSwap(space, key0); + } + + if(qryProcEnabled) + qryProc.onSwap(space, key); } finally { leaveBusy(); @@ -381,6 +403,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** + * Checks if IndexinSPI is enabled. + * @return IndexingSPI enabled flag. + */ + private boolean isIndexingSpiEnabled() { + return cctx.kernalContext().indexing().enabled(); + } + + /** * Entry for given key unswapped. * * @param key Key. @@ -388,11 +418,25 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @throws IgniteCheckedException If failed. */ public void onUnswap(CacheObject key, CacheObject val) throws IgniteCheckedException { + if(!enabled) + return; + if (!enterBusy()) return; // Ignore index update when node is stopping. try { - qryProc.onUnswap(space, key, val); + if (isIndexingSpiEnabled()) { + CacheObjectContext coctx = cctx.cacheObjectContext(); + + Object key0 = unwrapIfNeeded(key, coctx); + + Object val0 = unwrapIfNeeded(val, coctx); + + cctx.kernalContext().indexing().onUnswap(space, key0, val0); + } + + if(qryProcEnabled) + qryProc.onUnswap(space, key, val); } finally { leaveBusy(); @@ -429,7 +473,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return; // Ignore index update when node is stopping. try { - qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime); + if (isIndexingSpiEnabled()) { + CacheObjectContext coctx = cctx.cacheObjectContext(); + + Object key0 = unwrapIfNeeded(key, coctx); + + Object val0 = unwrapIfNeeded(val, coctx); + + cctx.kernalContext().indexing().store(space, key0, val0, expirationTime); + } + + if(qryProcEnabled) + qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime); } finally { invalidateResultCache(); @@ -454,7 +509,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return; // Ignore index update when node is stopping. try { - qryProc.remove(space, key, val); + if (isIndexingSpiEnabled()) { + Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext()); + + cctx.kernalContext().indexing().remove(space, key0); + } + + if(qryProcEnabled) + qryProc.remove(space, key, val); } finally { invalidateResultCache(); @@ -561,6 +623,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte public abstract CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes); /** + * Unwrap CacheObject if needed. + */ + private Object unwrapIfNeeded(CacheObject obj, CacheObjectContext coctx) { + return isIndexingSpiAllowsBinary && cctx.cacheObjects().isBinaryObject(obj) ? obj : obj.value(coctx, false); + } + + /** * Performs query. * * @param qry Query. http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index f4ac4ae..48ca2b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -160,9 +160,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** */ private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>(); - /** Default is @{true} */ - private final boolean isIndexingSpiAllowsBinary = !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI); - /** * @param ctx Kernal context. */ @@ -682,16 +679,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { CacheObjectContext coctx = null; - if (ctx.indexing().enabled()) { - coctx = cacheObjectContext(space); - - Object key0 = unwrap(key, coctx); - - Object val0 = unwrap(val, coctx); - - ctx.indexing().store(space, key0, val0, expirationTime); - } - if (idx == null) return; @@ -745,13 +732,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * Unwrap CacheObject if needed. - */ - private Object unwrap(CacheObject obj, CacheObjectContext coctx) { - return isIndexingSpiAllowsBinary && ctx.cacheObjects().isBinaryObject(obj) ? obj : obj.value(coctx, false); - } - - /** * @throws IgniteCheckedException If failed. */ private void checkEnabled() throws IgniteCheckedException { @@ -1039,14 +1019,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Remove [space=" + space + ", key=" + key + ", val=" + val + "]"); - if (ctx.indexing().enabled()) { - CacheObjectContext coctx = cacheObjectContext(space); - - Object key0 = unwrap(key, coctx); - - ctx.indexing().remove(space, key0); - } - if (idx == null) return; @@ -1184,14 +1156,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Swap [space=" + spaceName + ", key=" + key + "]"); - if (ctx.indexing().enabled()) { - CacheObjectContext coctx = cacheObjectContext(spaceName); - - Object key0 = unwrap(key, coctx); - - ctx.indexing().onSwap(spaceName, key0); - } - if (idx == null) return; @@ -1221,16 +1185,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Unswap [space=" + spaceName + ", key=" + key + ", val=" + val + "]"); - if (ctx.indexing().enabled()) { - CacheObjectContext coctx = cacheObjectContext(spaceName); - - Object key0 = unwrap(key, coctx); - - Object val0 = unwrap(val, coctx); - - ctx.indexing().onUnswap(spaceName, key0, val0); - } - if (idx == null) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java index f66b99e..84a13df 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java @@ -55,15 +55,40 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** - * Indexing Spi query test + * Indexing Spi query only test */ public class IndexingSpiQuerySelfTest extends TestCase { + public static final String CACHE_NAME = "test-cache"; + /** {@inheritDoc} */ @Override public void tearDown() throws Exception { Ignition.stopAll(true); } /** + * @return Configuration. + */ + protected IgniteConfiguration configuration() { + IgniteConfiguration cfg = new IgniteConfiguration(); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** */ + protected <K,V> CacheConfiguration<K, V> cacheConfiguration(String cacheName) { + return new CacheConfiguration<>(cacheName); + } + + /** * @throws Exception If failed. */ public void testSimpleIndexingSpi() throws Exception { @@ -73,9 +98,7 @@ public class IndexingSpiQuerySelfTest extends TestCase { Ignite ignite = Ignition.start(cfg); - CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache"); - - ccfg.setIndexedTypes(Integer.class, Integer.class); + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME); IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg); @@ -98,7 +121,7 @@ public class IndexingSpiQuerySelfTest extends TestCase { Ignite ignite = Ignition.start(cfg); - CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache"); + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME); IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg); @@ -121,9 +144,7 @@ public class IndexingSpiQuerySelfTest extends TestCase { Ignite ignite = Ignition.start(cfg); - CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache"); - - ccfg.setIndexedTypes(PersonKey.class, Person.class); + CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME); IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg); @@ -155,9 +176,7 @@ public class IndexingSpiQuerySelfTest extends TestCase { Ignite ignite = Ignition.start(cfg); - CacheConfiguration<PersonKey, Person> ccfg = new CacheConfiguration<>("test-binary-cache"); - - ccfg.setIndexedTypes(PersonKey.class, Person.class); + CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME); IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg); @@ -187,10 +206,9 @@ public class IndexingSpiQuerySelfTest extends TestCase { Ignite ignite = Ignition.start(cfg); - CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("test-cache"); + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME); ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - ccfg.setIndexedTypes(Integer.class, Integer.class); final IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg); @@ -219,24 +237,6 @@ public class IndexingSpiQuerySelfTest extends TestCase { } /** - * @return Configuration. - */ - private IgniteConfiguration configuration() { - IgniteConfiguration cfg = new IgniteConfiguration(); - - TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); - - disco.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(disco); - - return cfg; - } - - /** * Indexing Spi implementation for test */ private static class MyIndexingSpi extends IgniteSpiAdapter implements IndexingSpi { @@ -350,7 +350,7 @@ public class IndexingSpiQuerySelfTest extends TestCase { /** * */ - private static class PersonKey implements Serializable, Comparable<PersonKey> { + static class PersonKey implements Serializable, Comparable<PersonKey> { /** */ private int id; @@ -385,7 +385,7 @@ public class IndexingSpiQuerySelfTest extends TestCase { /** * */ - private static class Person implements Serializable { + static class Person implements Serializable { /** */ private String name; http://git-wip-us.apache.org/repos/asf/ignite/blob/a922ac9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java new file mode 100644 index 0000000..800c5a2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryWithH2IndexingSelfTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.configuration.CacheConfiguration; + +/** + * Indexing Spi query with configured default indexer test + */ +public class IndexingSpiQueryWithH2IndexingSelfTest extends IndexingSpiQuerySelfTest { + /** */ + protected <K, V> CacheConfiguration<K, V> cacheConfiguration(String cacheName) { + CacheConfiguration<K, V> ccfg = super.cacheConfiguration(cacheName); + + ccfg.setIndexedTypes(PersonKey.class, Person.class); + + ccfg.setIndexedTypes(Integer.class, Integer.class); + + return ccfg; + } +} \ No newline at end of file
