This is an automated email from the ASF dual-hosted git repository. ipavlukhin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 7272bb1 IGNITE-12466 Monitor query pool starvation - Fixes #7161. 7272bb1 is described below commit 7272bb18656b53d03b1ca7433eb9d1dbb08d18ca Author: ktkalenko <ktkale...@gridgain.com> AuthorDate: Mon Dec 23 18:17:00 2019 +0300 IGNITE-12466 Monitor query pool starvation - Fixes #7161. Signed-off-by: ipavlukhin <vololo...@gmail.com> --- .../org/apache/ignite/internal/IgniteKernal.java | 9 ++ .../query/IgniteQueryDedicatedPoolTest.java | 124 ++++++++++++++++++++- 2 files changed, 128 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index c44d2ca..0096190 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1421,6 +1421,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** Last completed task count. */ private long lastCompletedCntSys; + /** Last completed task count. */ + private long lastCompletedCntQry; + @Override public void run() { if (execSvc instanceof ThreadPoolExecutor) { ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc; @@ -1434,6 +1437,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { lastCompletedCntSys = checkPoolStarvation(exec, lastCompletedCntSys, "system"); } + if (qryExecSvc instanceof ThreadPoolExecutor) { + ThreadPoolExecutor exec = (ThreadPoolExecutor)qryExecSvc; + + lastCompletedCntQry = checkPoolStarvation(exec, lastCompletedCntQry, "query"); + } + if (stripedExecSvc != null) stripedExecSvc.detectStarvation(); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java index df0fc7c..15945b4 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java @@ -17,13 +17,14 @@ package org.apache.ignite.internal.processors.query; +import javax.cache.Cache; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; -import javax.cache.Cache; +import java.util.concurrent.CyclicBarrier; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.query.QueryCursor; @@ -33,6 +34,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; @@ -41,27 +43,48 @@ import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.spi.indexing.IndexingSpi; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.SystemPropertiesRule; +import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TestRule; + +import static java.util.Objects.nonNull; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * Ensures that SQL queries are executed in a dedicated thread pool. */ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest { + /** Class rule. */ + @ClassRule public static final TestRule classRule = new SystemPropertiesRule(); + /** Name of the cache for test */ private static final String CACHE_NAME = "query_pool_test"; + /** Listener log messages. */ + private static ListeningTestLogger testLog; + + /** Query thread pool size. */ + private Integer qryPoolSize; + /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); - startGrid("server"); + testLog = new ListeningTestLogger(false, log); } /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); + IgniteConfiguration cfg = super.getConfiguration(gridName) + .setGridLogger(testLog); CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); @@ -77,6 +100,9 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest { cfg.setIndexingSpi(new TestIndexingSpi()); + if (nonNull(qryPoolSize)) + cfg.setQueryThreadPoolSize(qryPoolSize); + return cfg; } @@ -85,6 +111,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest { super.afterTest(); stopAllGrids(); + + testLog.clearListeners(); } /** @@ -93,6 +121,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest { */ @Test public void testSqlQueryUsesDedicatedThreadPool() throws Exception { + startGrid("server"); + try (Ignite client = startGrid("client")) { IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME); @@ -122,6 +152,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest { */ @Test public void testScanQueryUsesDedicatedThreadPool() throws Exception { + startGrid("server"); + try (Ignite client = startGrid("client")) { IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME); @@ -146,6 +178,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest { */ @Test public void testSpiQueryUsesDedicatedThreadPool() throws Exception { + startGrid("server"); + try (Ignite client = startGrid("client")) { IgniteCache<Byte, Byte> cache = client.cache(CACHE_NAME); @@ -164,6 +198,86 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest { } /** + * Test for messages about query pool starvation in the logs. + * + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "10") + public void testContainsStarvationQryPoolInLog() throws Exception { + checkStarvationQryPoolInLog( + 10_000, + "Possible thread pool starvation detected (no task completed in last 10ms, is query thread pool size " + + "large enough?)", + true + ); + } + + /** + * Test to verify that there are no query pool starvation messages in log. + * + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "0") + public void testNotContainsStarvationQryPoolInLog() throws Exception { + checkStarvationQryPoolInLog( + 1_000, + "Possible thread pool starvation detected (no task completed in", + false + ); + } + + /** + * Check messages about starvation query pool in log. + * + * @param checkTimeout Check timeout. + * @param findLogMsg Log message of interest. + * @param contains Expect whether or not messages are in log. + * @throws Exception If failed. + */ + private void checkStarvationQryPoolInLog(long checkTimeout, String findLogMsg ,boolean contains) throws Exception { + assertNotNull(findLogMsg); + + qryPoolSize = 1; + + startGrid("server"); + + IgniteEx clientNode = startGrid("client"); + + IgniteCache<Integer, Integer> cache = clientNode.cache(CACHE_NAME); + cache.put(0, 0); + + int qrySize = 2; + + CyclicBarrier barrier = new CyclicBarrier(qrySize); + + LogListener logLsnr = LogListener.matches(findLogMsg).build(); + + testLog.registerListener(logLsnr); + + for (int i = 0; i < qrySize; i++) { + runAsync(() -> { + barrier.await(); + + cache.query(new ScanQuery<>((o, o2) -> { + doSleep(500); + + return true; + }) + ).getAll(); + + return null; + }); + } + + if (contains) + assertTrue(waitForCondition(logLsnr::check, checkTimeout)); + else + assertFalse(waitForCondition(logLsnr::check, checkTimeout)); + } + + /** * Custom SQL function to return current thread name from inside query executor * @return Current IO policy */