Repository: ignite Updated Branches: refs/heads/master eab8334bb -> 3f8cdea95
IGNITE-4650: Now queries are executed in QUERY_POOL. This closes #1534. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3f8cdea9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3f8cdea9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3f8cdea9 Branch: refs/heads/master Commit: 3f8cdea95596cf11d51db2ad3fa773ecc174f13a Parents: eab8334 Author: tledkov-gridgain <[email protected]> Authored: Mon Mar 27 17:43:32 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Mar 27 17:43:32 2017 +0300 ---------------------------------------------------------------------- .../query/GridCacheDistributedQueryManager.java | 7 +- .../cache/query/GridCacheLocalQueryFuture.java | 3 +- .../query/IgniteQueryDedicatedPoolTest.java | 223 +++++++++++++++++++ .../query/IgniteSqlQueryDedicatedPoolTest.java | 110 --------- .../IgniteCacheQuerySelfTestSuite.java | 4 +- 5 files changed, 231 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3f8cdea9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index eb5e214..06a3416 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet; @@ -324,7 +325,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage node, topic, res, - cctx.ioPolicy(), + GridIoPolicy.QUERY_POOL, timeout > 0 ? timeout : Long.MAX_VALUE); return true; @@ -799,7 +800,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage // For example, a remote reducer has a state, we should not serialize and then send // the reducer changed by the local node. if (!F.isEmpty(rmtNodes)) { - cctx.io().safeSend(rmtNodes, req, cctx.ioPolicy(), new P1<ClusterNode>() { + cctx.io().safeSend(rmtNodes, req, GridIoPolicy.QUERY_POOL, new P1<ClusterNode>() { @Override public boolean apply(ClusterNode node) { fut.onNodeLeft(node.id()); @@ -817,7 +818,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage return null; } - }); + }, GridIoPolicy.QUERY_POOL); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f8cdea9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java index 6eaca29..3762ef7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.internal.U; @@ -54,7 +55,7 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap * Executes query runnable. */ void execute() { - fut = cctx.kernalContext().closure().runLocalSafe(run, true); + fut = cctx.kernalContext().closure().runLocalSafe(run, GridIoPolicy.QUERY_POOL); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3f8cdea9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java new file mode 100644 index 0000000..711db2f --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SpiQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoManager; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.apache.ignite.spi.indexing.IndexingSpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +/** + * Ensures that SQL queries are executed in a dedicated thread pool. + */ +public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Name of the cache for test */ + private static final String CACHE_NAME = "query_pool_test"; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGrid("server"); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setIndexedTypes(Integer.class, Integer.class); + ccfg.setIndexedTypes(Byte.class, Byte.class); + ccfg.setSqlFunctionClasses(IgniteQueryDedicatedPoolTest.class); + ccfg.setName(CACHE_NAME); + + cfg.setCacheConfiguration(ccfg); + + if ("client".equals(gridName)) + cfg.setClientMode(true); + + + cfg.setIndexingSpi(new TestIndexingSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Tests that SQL queries are executed in dedicated pool + * @throws Exception If failed. + */ + public void testSqlQueryUsesDedicatedThreadPool() throws Exception { + try (Ignite client = startGrid("client")) { + IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME); + + QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery("select currentPolicy()")); + + List<List<?>> result = cursor.getAll(); + + cursor.close(); + + assertEquals(1, result.size()); + + Byte plc = (Byte)result.get(0).get(0); + + assert plc != null; + assert plc == GridIoPolicy.QUERY_POOL; + } + } + + /** + * Tests that Scan queries are executed in dedicated pool + * @throws Exception If failed. + */ + public void testScanQueryUsesDedicatedThreadPool() throws Exception { + try (Ignite client = startGrid("client")) { + IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME); + + cache.put(0, 0); + + QueryCursor<Cache.Entry<Object, Object>> cursor = cache.query( + new ScanQuery<>(new IgniteBiPredicate<Object, Object>() { + @Override public boolean apply(Object o, Object o2) { + return F.eq(GridIoManager.currentPolicy(), GridIoPolicy.QUERY_POOL); + } + })); + + assertEquals(1, cursor.getAll().size()); + + cursor.close(); + } + } + + /** + * Tests that SPI queries are executed in dedicated pool + * @throws Exception If failed. + */ + public void testSpiQueryUsesDedicatedThreadPool() throws Exception { + try (Ignite client = startGrid("client")) { + IgniteCache<Byte, Byte> cache = client.cache(CACHE_NAME); + + for (byte b = 0; b < Byte.MAX_VALUE; ++b) + cache.put(b, b); + + QueryCursor<Cache.Entry<Byte, Byte>> cursor = cache.query(new SpiQuery<Byte, Byte>()); + + List<Cache.Entry<Byte, Byte>> all = cursor.getAll(); + + assertEquals(1, all.size()); + assertEquals(GridIoPolicy.QUERY_POOL, (byte)all.get(0).getValue()); + + cursor.close(); + } + } + + /** + * Custom SQL function to return current thread name from inside query executor + * @return Current IO policy + */ + @SuppressWarnings("unused") + @QuerySqlFunction(alias = "currentPolicy") + public static Byte currentPolicy() { + return GridIoManager.currentPolicy(); + } + + /** + * Indexing Spi implementation for test + */ + private static class TestIndexingSpi extends IgniteSpiAdapter implements IndexingSpi { + /** Index. */ + private final SortedMap<Object, Object> idx = new TreeMap<>(); + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void spiStop() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Iterator<Cache.Entry<?, ?>> query(@Nullable String spaceName, Collection<Object> params, + @Nullable IndexingQueryFilter filters) { + return idx.containsKey(GridIoPolicy.QUERY_POOL) ? + Collections.<Cache.Entry<?, ?>>singletonList( + new CacheEntryImpl<>(GridIoPolicy.QUERY_POOL, GridIoPolicy.QUERY_POOL)).iterator() + : Collections.<Cache.Entry<?, ?>>emptyList().iterator(); + } + + /** {@inheritDoc} */ + @Override public void store(@Nullable String spaceName, Object key, Object val, long expirationTime) { + idx.put(key, val); + } + + /** {@inheritDoc} */ + @Override public void remove(@Nullable String spaceName, Object key) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onSwap(@Nullable String spaceName, Object key) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onUnswap(@Nullable String spaceName, Object key, Object val) { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f8cdea9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java deleted file mode 100644 index bba3642..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query; - -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.cache.query.annotations.QuerySqlFunction; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.managers.communication.GridIoManager; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import java.util.List; - -/** - * Ensures that SQL queries are executed in a dedicated thread pool. - */ -public class IgniteSqlQueryDedicatedPoolTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Name of the cache for test */ - private static final String CACHE_NAME = "query_pool_test"; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - startGrid("server"); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); - - spi.setIpFinder(IP_FINDER); - - CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); - - ccfg.setIndexedTypes(Integer.class, Integer.class); - ccfg.setSqlFunctionClasses(IgniteSqlQueryDedicatedPoolTest.class); - ccfg.setName(CACHE_NAME); - - cfg.setCacheConfiguration(ccfg); - - if ("client".equals(gridName)) - cfg.setClientMode(true); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - stopAllGrids(); - } - - /** - * Test that SQL queries are executed in dedicated pool - */ - public void testSqlQueryUsesDedicatedThreadPool() throws Exception { - try (Ignite client = startGrid("client")) { - IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME); - - QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery("select currentPolicy()")); - - List<List<?>> result = cursor.getAll(); - - cursor.close(); - - assertEquals(1, result.size()); - - Byte plc = (Byte)result.get(0).get(0); - - assert plc != null; - assert plc == GridIoPolicy.QUERY_POOL; - } - } - - /** - * Custom SQL function to return current thread name from inside query executor - */ - @SuppressWarnings("unused") - @QuerySqlFunction(alias = "currentPolicy") - public static Byte currentPolicy() { - return GridIoManager.currentPolicy(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f8cdea9/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 7d49fb4..5b74de7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -98,7 +98,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQueryS import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDestroySelfTest; import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest; import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest; -import org.apache.ignite.internal.processors.query.IgniteSqlQueryDedicatedPoolTest; +import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest; import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest; import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest; import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest; @@ -249,7 +249,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(CacheOffheapBatchIndexingSingleTypeTest.class); suite.addTestSuite(CacheSqlQueryValueCopySelfTest.class); suite.addTestSuite(IgniteCacheQueryCacheDestroySelfTest.class); - suite.addTestSuite(IgniteSqlQueryDedicatedPoolTest.class); + suite.addTestSuite(IgniteQueryDedicatedPoolTest.class); suite.addTestSuite(IgniteSqlEntryCacheModeAgnosticTest.class); suite.addTestSuite(QueryEntityCaseMismatchTest.class);
