IGNITE-4239: add GridInternal annotaion for tasks instead of jobs. This closes #1250.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/861fab9d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/861fab9d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/861fab9d Branch: refs/heads/ignite-4242 Commit: 861fab9d0598ca2f06c4a6f293bf2866af31967c Parents: fc9ee6a Author: tledkov-gridgain <[email protected]> Authored: Tue Nov 22 14:52:03 2016 +0500 Committer: tledkov-gridgain <[email protected]> Committed: Tue Nov 22 14:52:03 2016 +0500 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 11 +- .../compute/PublicThreadpoolStarvationTest.java | 129 +++++++++++++++++++ .../testsuites/IgniteComputeGridTestSuite.java | 2 + 3 files changed, 135 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/861fab9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 55400ab..2e24e67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -5443,7 +5443,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Global clear all. */ - @GridInternal private static class GlobalClearAllJob extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -5482,7 +5481,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Global clear keys. */ - @GridInternal private static class GlobalClearKeySetJob<K> extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -5527,7 +5525,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Global clear all for near cache. */ - @GridInternal private static class GlobalClearAllNearJob extends GlobalClearAllJob { /** */ private static final long serialVersionUID = 0L; @@ -5558,7 +5555,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Global clear keys for near cache. */ - @GridInternal private static class GlobalClearKeySetNearJob<K> extends GlobalClearKeySetJob<K> { /** */ private static final long serialVersionUID = 0L; @@ -5590,7 +5586,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Internal callable for partition size calculation. */ - @GridInternal private static class PartitionSizeLongJob extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -5636,7 +5631,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Internal callable for global size calculation. */ - @GridInternal private static class SizeJob extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -5677,7 +5671,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Internal callable for global size calculation. */ - @GridInternal private static class SizeLongJob extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -6523,6 +6516,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Size task. */ + @GridInternal private static class SizeTask extends ComputeTaskAdapter<Object, Integer> { /** */ private static final long serialVersionUID = 0L; @@ -6588,6 +6582,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Size task. */ + @GridInternal private static class SizeLongTask extends ComputeTaskAdapter<Object, Long> { /** */ private static final long serialVersionUID = 0L; @@ -6653,6 +6648,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Partition Size Long task. */ + @GridInternal private static class PartitionSizeLongTask extends ComputeTaskAdapter<Object, Long> { /** */ private static final long serialVersionUID = 0L; @@ -6737,6 +6733,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Clear task. */ + @GridInternal private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/861fab9d/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/PublicThreadpoolStarvationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/PublicThreadpoolStarvationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/PublicThreadpoolStarvationTest.java new file mode 100644 index 0000000..e587310 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/PublicThreadpoolStarvationTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compute; + +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * Test to validate https://issues.apache.org/jira/browse/IGNITE-4239 + * Jobs hang when a lot of jobs calculate cache. + */ +public class PublicThreadpoolStarvationTest extends GridCacheAbstractSelfTest { + /** Cache size. */ + private static final int CACHE_SIZE = 10; + + /** Cache size. */ + private static final String CACHE_NAME = "test"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPublicThreadPoolSize(1); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setMarshaller(new BinaryMarshaller()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected Class<?>[] indexedTypes() { + return new Class<?>[] { + Integer.class, String.class, + }; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + info("Fill caches begin..."); + + fillCaches(); + + info("Caches are filled."); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + grid(0).destroyCache(CACHE_NAME); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + private void fillCaches() throws Exception { + grid(0).createCache(CACHE_NAME); + + try ( + IgniteDataStreamer<Integer, String> streamer = + grid(0).dataStreamer(CACHE_NAME)) { + + for (int i = 0; i < CACHE_SIZE; ++i) + streamer.addData(i, "Data " + i); + } + + awaitPartitionMapExchange(); + } + + /** + * @throws Exception If failed. + */ + public void testCacheSizeOnPublicThreadpoolStarvation() throws Exception { + grid(0).compute().run(new IgniteRunnable() { + @Override public void run() { + try { + Thread.sleep(500); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + grid(0).cache(CACHE_NAME).size(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/861fab9d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java index a1a75f8..8a501fd 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java @@ -71,6 +71,7 @@ import org.apache.ignite.internal.TaskNodeRestartTest; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfTest; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest; import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest; +import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest; import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest; import org.apache.ignite.p2p.GridMultinodeRedeployIsolatedModeSelfTest; import org.apache.ignite.p2p.GridMultinodeRedeployPrivateModeSelfTest; @@ -150,6 +151,7 @@ public class IgniteComputeGridTestSuite { suite.addTestSuite(GridTaskFailoverAffinityRunTest.class); suite.addTestSuite(TaskNodeRestartTest.class); suite.addTestSuite(IgniteRoundRobinErrorAfterClientReconnectTest.class); + suite.addTestSuite(PublicThreadpoolStarvationTest.class); return suite; }
