IGNITE-2876: Fixed possible starvation in system pool caused by IgfsBlockMessage. This closes #575.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01a6e86e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01a6e86e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01a6e86e Branch: refs/heads/gridgain-7.5.11-vk Commit: 01a6e86ec4e19d372b8efbc5c497c84f4238a46c Parents: 00a0e4b Author: vozerov-gridgain <[email protected]> Authored: Thu Mar 24 13:28:30 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Mar 29 15:14:27 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 11 + .../managers/communication/GridIoPolicy.java | 3 + .../processors/igfs/IgfsDataManager.java | 29 +- .../processors/igfs/IgfsDeleteWorker.java | 2 +- .../igfs/IgfsFragmentizerManager.java | 4 +- ...lockMessageSystemPoolStarvationSelfTest.java | 299 +++++++++++++++++++ .../ignite/testsuites/IgniteIgfsTestSuite.java | 3 + 7 files changed, 329 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/01a6e86e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 0438b64..8ba6a1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -87,6 +87,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL; @@ -146,6 +147,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Marshaller cache pool. */ private ExecutorService marshCachePool; + /** IGFS pool. */ + private ExecutorService igfsPool; + /** Discovery listener. */ private GridLocalEventListener discoLsnr; @@ -244,6 +248,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa mgmtPool = ctx.getManagementExecutorService(); utilityCachePool = ctx.utilityCachePool(); marshCachePool = ctx.marshallerCachePool(); + igfsPool = ctx.getIgfsExecutorService(); affPool = new IgniteThreadPoolExecutor( "aff", ctx.gridName(), @@ -635,6 +640,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case AFFINITY_POOL: case UTILITY_CACHE_POOL: case MARSH_CACHE_POOL: + case IGFS_POOL: { if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); @@ -695,6 +701,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return marshCachePool; + case IGFS_POOL: + assert igfsPool != null : "IGFS pool is not configured."; + + return igfsPool; + default: { assert plc >= 0 : "Negative policy: " + plc; http://git-wip-us.apache.org/repos/asf/ignite/blob/01a6e86e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java index 57622c9..00590ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java @@ -43,6 +43,9 @@ public class GridIoPolicy { /** Marshaller cache execution pool. */ public static final byte MARSH_CACHE_POOL = 6; + /** Marshaller cache execution pool. */ + public static final byte IGFS_POOL = 7; + /** * Defines the range of reserved pools that are not available for plugins. * @param key The key. http://git-wip-us.apache.org/repos/asf/ignite/blob/01a6e86e/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index 3825086..16fbeb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -91,7 +91,7 @@ import java.util.concurrent.locks.ReentrantLock; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -1014,26 +1014,18 @@ public class IgfsDataManager extends IgfsManager { if (!node.isLocal()) { final IgfsBlocksMessage msg = new IgfsBlocksMessage(fileId, batchId, blocks); - callIgfsLocalSafe(new GridPlainCallable<Object>() { - @Override @Nullable public Object call() throws Exception { - try { - igfsCtx.send(nodeId, topic, msg, SYSTEM_POOL); - } catch (IgniteCheckedException e) { - completionFut.onError(nodeId, e); - } - - return null; - } - }); + try { + igfsCtx.send(nodeId, topic, msg, IGFS_POOL); + } + catch (IgniteCheckedException e) { + completionFut.onError(nodeId, e); + } } else { callIgfsLocalSafe(new GridPlainCallable<Object>() { - @Override - @Nullable - public Object call() throws Exception { + @Override @Nullable public Object call() throws Exception { storeBlocksAsync(blocks).listen(new CI1<IgniteInternalFuture<?>>() { - @Override - public void apply(IgniteInternalFuture<?> fut) { + @Override public void apply(IgniteInternalFuture<?> fut) { try { fut.get(); @@ -1276,8 +1268,7 @@ public class IgfsDataManager extends IgfsManager { try { // Send reply back to node. - igfsCtx.send(nodeId, topic, new IgfsAckMessage(blocksMsg.fileId(), blocksMsg.id(), err), - SYSTEM_POOL); + igfsCtx.send(nodeId, topic, new IgfsAckMessage(blocksMsg.fileId(), blocksMsg.id(), err), IGFS_POOL); } catch (IgniteCheckedException e) { U.warn(log, "Failed to send batch acknowledgement (did node leave the grid?) [nodeId=" + nodeId + http://git-wip-us.apache.org/repos/asf/ignite/blob/01a6e86e/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java index 7e4dac8..e5914e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java @@ -357,7 +357,7 @@ public class IgfsDeleteWorker extends IgfsThread { for (ClusterNode node : nodes) { try { - igfsCtx.send(node, topic, msg, GridIoPolicy.SYSTEM_POOL); + igfsCtx.send(node, topic, msg, GridIoPolicy.IGFS_POOL); } catch (IgniteCheckedException e) { U.warn(log, "Failed to send IGFS delete message to node [nodeId=" + node.id() + http://git-wip-us.apache.org/repos/asf/ignite/blob/01a6e86e/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java index 99e7cd6..d64c64a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java @@ -59,7 +59,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; import static org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange.RANGE_STATUS_INITIAL; import static org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange.RANGE_STATUS_MOVED; import static org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange.RANGE_STATUS_MOVING; @@ -186,7 +186,7 @@ public class IgfsFragmentizerManager extends IgfsManager { private void sendWithRetries(UUID nodeId, IgfsCommunicationMessage msg) throws IgniteCheckedException { for (int i = 0; i < MESSAGE_SEND_RETRY_COUNT; i++) { try { - igfsCtx.send(nodeId, topic, msg, SYSTEM_POOL); + igfsCtx.send(nodeId, topic, msg, IGFS_POOL); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/01a6e86e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java new file mode 100644 index 0000000..ec3b808 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; +import org.apache.ignite.igfs.IgfsMode; +import org.apache.ignite.igfs.IgfsOutputStream; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.lang.GridAbsPredicateX; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Test to check for system pool starvation due to {@link IgfsBlocksMessage}. + */ +public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbstractTest { + /** First node name. */ + private static final String NODE_1_NAME = "node1"; + + /** Second node name. */ + private static final String NODE_2_NAME = "node2"; + + /** Data cache name. */ + private static final String DATA_CACHE_NAME = "data"; + + /** Meta cache name. */ + private static final String META_CACHE_NAME = "meta"; + + /** Key in data caceh we will use to reproduce the issue. */ + private static final Integer DATA_KEY = 1; + + /** First node. */ + private Ignite victim; + + /** Second node. */ + private Ignite attacker; + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "ConstantConditions"}) + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + // Start nodes. + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + victim = Ignition.start(config(NODE_1_NAME, ipFinder)); + attacker = Ignition.start(config(NODE_2_NAME, ipFinder)); + + // Check if we selected victim correctly. + if (F.eq(dataCache(victim).affinity().mapKeyToNode(DATA_KEY).id(), attacker.cluster().localNode().id())) { + Ignite tmp = victim; + + victim = attacker; + + attacker = tmp; + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + G.stopAll(true); + + victim = null; + attacker = null; + + super.afterTest(); + } + + /** + * Test starvation. + * + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testStarvation() throws Exception { + // 1. Create two IGFS file to make all system threads busy. + CountDownLatch fileWriteLatch = new CountDownLatch(1); + + final IgniteInternalFuture fileFut1 = createFileAsync(new IgfsPath("/file1"), fileWriteLatch); + final IgniteInternalFuture fileFut2 = createFileAsync(new IgfsPath("/file2"), fileWriteLatch); + + // 2. Start transaction and keep it opened. + final CountDownLatch txStartLatch = new CountDownLatch(1); + final CountDownLatch txCommitLatch = new CountDownLatch(1); + + IgniteInternalFuture<Void> txFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + GridCacheAdapter dataCache = dataCache(attacker); + + try (IgniteInternalTx tx = + dataCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + dataCache.put(DATA_KEY, 0); + + txStartLatch.countDown(); + + txCommitLatch.await(); + + tx.commit(); + } + + return null; + } + }); + + txStartLatch.await(); + + // 3. Start async operation to drain semaphore permits. + final IgniteInternalFuture putFut = dataCache(victim).putAsync(DATA_KEY, 1); + + assert !awaitFuture(putFut); + + // 4. Write data to files and ensure we stuck. + fileWriteLatch.countDown(); + + assert !awaitFuture(fileFut1); + assert !awaitFuture(fileFut2); + + // 5. Finish transaction. + txCommitLatch.countDown(); + + assert awaitFuture(txFut); + + // 6. Async put must succeed. + assert awaitFuture(putFut); + + // 7. Writes must succeed. + assert awaitFuture(fileFut1); + assert awaitFuture(fileFut2); + } + + /** + * Await future completion. + * + * @param fut Future. + * @return {@code True} if future completed. + * @throws Exception If failed. + */ + private static boolean awaitFuture(final IgniteInternalFuture fut) throws Exception { + return GridTestUtils.waitForCondition(new GridAbsPredicateX() { + @Override public boolean applyx() throws IgniteCheckedException { + return fut.isDone(); + } + }, 1000); + } + + /** + * Create IGFS file asynchronously. + * + * @param path Path. + * @return Future. + */ + private IgniteInternalFuture<Void> createFileAsync(final IgfsPath path, final CountDownLatch writeStartLatch) { + return GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteFileSystem igfs = attacker.fileSystem(null); + + try (IgfsOutputStream out = igfs.create(path, true)) { + writeStartLatch.await(); + + out.write(new byte[1024]); + + out.flush(); + } + + return null; + } + }); + } + + /** + * Get data cache for node. + * + * @param node Node. + * @return Data cache. + * @throws Exception If failed. + */ + private GridCacheAdapter dataCache(Ignite node) throws Exception { + return ((IgniteKernal)node).internalCache(DATA_CACHE_NAME); + } + + /** + * Create node configuration. + * + * @param name Node name. + * @return Configuration. + * @throws Exception If failed. + */ + private IgniteConfiguration config(String name, TcpDiscoveryVmIpFinder ipFinder) throws Exception { + // Data cache configuration. + CacheConfiguration dataCcfg = new CacheConfiguration(); + + dataCcfg.setName(DATA_CACHE_NAME); + dataCcfg.setCacheMode(CacheMode.REPLICATED); + dataCcfg.setAtomicityMode(TRANSACTIONAL); + dataCcfg.setWriteSynchronizationMode(FULL_SYNC); + dataCcfg.setAffinityMapper(new DummyAffinityMapper(1)); + dataCcfg.setMaxConcurrentAsyncOperations(1); + + // Meta cache configuration. + CacheConfiguration metaCcfg = new CacheConfiguration(); + + metaCcfg.setName(META_CACHE_NAME); + metaCcfg.setCacheMode(CacheMode.REPLICATED); + metaCcfg.setAtomicityMode(TRANSACTIONAL); + metaCcfg.setWriteSynchronizationMode(FULL_SYNC); + + // File system configuration. + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDefaultMode(IgfsMode.PRIMARY); + igfsCfg.setDataCacheName(DATA_CACHE_NAME); + igfsCfg.setMetaCacheName(META_CACHE_NAME); + igfsCfg.setFragmentizerEnabled(false); + igfsCfg.setBlockSize(1024); + + // Ignite configuration. + IgniteConfiguration cfg = getConfiguration(name); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCcfg, metaCcfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + cfg.setSystemThreadPoolSize(2); + cfg.setRebalanceThreadPoolSize(1); + cfg.setPublicThreadPoolSize(1); + + return cfg; + } + + /** + * Dimmy affinity mapper. + */ + private static class DummyAffinityMapper extends IgfsGroupDataBlocksKeyMapper { + /** */ + private static final long serialVersionUID = 0L; + + /** Dummy affinity key. */ + private static final Integer KEY = 1; + + /** + * Constructor. + * + * @param grpSize Group size. + */ + public DummyAffinityMapper(int grpSize) { + super(grpSize); + } + + /** {@inheritDoc} */ + @Override public Object affinityKey(Object key) { + return KEY; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/01a6e86e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java index aff3ad7..25c54e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsAttributesSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsBackupsDualAsyncSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsBackupsDualSyncSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsBackupsPrimarySelfTest; +import org.apache.ignite.internal.processors.igfs.IgfsBlockMessageSystemPoolStarvationSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsCachePerBlockLruEvictionPolicySelfTest; import org.apache.ignite.internal.processors.igfs.IgfsCacheSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsClientCacheSelfTest; @@ -118,6 +119,8 @@ public class IgniteIgfsTestSuite extends TestSuite { suite.addTestSuite(IgfsBackupsDualSyncSelfTest.class); suite.addTestSuite(IgfsBackupsDualAsyncSelfTest.class); + suite.addTestSuite(IgfsBlockMessageSystemPoolStarvationSelfTest.class); + // TODO: Enable when IGFS failover is fixed. //suite.addTestSuite(IgfsBackupFailoverSelfTest.class);
