ignite-1.5 Properly handle duplicated job responses in GridTaskWorker.onResponse. Use correct 'initialRebalanceFuture' for client nodes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1f90655 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1f90655 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1f90655 Branch: refs/heads/ignite-gg-10837 Commit: b1f906555cea8990dd39e8050ca4348f09da7f7f Parents: 301e7a1 Author: sboikov <[email protected]> Authored: Fri Dec 18 12:08:20 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 18 12:08:20 2015 +0300 ---------------------------------------------------------------------- .../ClientAbstractConnectivitySelfTest.java | 2 +- .../internal/cluster/ClusterGroupAdapter.java | 10 +- .../dht/preloader/GridDhtPreloader.java | 2 +- .../processors/task/GridTaskWorker.java | 8 +- .../internal/util/lang/GridNodePredicate.java | 13 +- .../ignite/internal/util/nio/GridNioServer.java | 11 +- .../util/nio/GridSelectorNioSessionImpl.java | 7 + .../TcpDiscoveryMulticastIpFinder.java | 12 +- .../ignite/internal/ClusterGroupSelfTest.java | 32 ++- .../IgniteClientReconnectCacheTest.java | 7 +- .../ignite/internal/TaskNodeRestartTest.java | 230 +++++++++++++++++++ .../IgniteCacheSizeFailoverTest.java | 4 +- .../random/RandomEvictionPolicySelfTest.java | 4 +- .../GridServiceProcessorStopSelfTest.java | 18 +- .../IgniteMessagingWithClientTest.java | 2 - .../GridSessionCheckpointAbstractSelfTest.java | 3 +- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 +- .../testsuites/IgniteComputeGridTestSuite.java | 2 + 18 files changed, 340 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java index ef18a29..8207ccf 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java @@ -134,7 +134,7 @@ public abstract class ClientAbstractConnectivitySelfTest extends GridCommonAbstr /** * Simple test of address list filtering. - * @throws Exception + * @throws Exception If failed. */ public void testResolveReachableOneAddress() throws Exception { InetAddress addr = InetAddress.getByAddress(new byte[] {127, 0, 0, 1} ); http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java index 9039ed8..75168a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java @@ -626,7 +626,15 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { /** {@inheritDoc} */ @Override public final ClusterGroup forRandom() { - return ids != null ? forNodeId(F.rand(ids)) : forNode(F.rand(nodes())); + if (!F.isEmpty(ids)) + return forNodeId(F.rand(ids)); + + Collection<ClusterNode> nodes = nodes(); + + if (nodes.isEmpty()) + return new ClusterGroupAdapter(ctx, null, Collections.<UUID>emptySet()); + + return forNode(F.rand(nodes)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 9a6246f..c46a66c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -482,7 +482,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> initialRebalanceFuture() { - return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : initRebalanceFut; + return cctx.kernalContext().clientNode() ? startFut : initRebalanceFut; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 9315d7c..59d3f90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -696,8 +696,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { if (log.isDebugEnabled()) U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res); - selfOccupied = true; + res = delayedRess.poll(); + // We can not return here because there can be more delayed messages in the queue. continue; } @@ -708,7 +709,10 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { if (log.isDebugEnabled()) log.debug("Received redundant response for a job (will ignore): " + res); - return; + res = delayedRess.poll(); + + // We can not return here because there can be more delayed messages in the queue. + continue; } if (!jobRes.getNode().id().equals(res.getNodeId())) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridNodePredicate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridNodePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridNodePredicate.java index 4ce0b35..edec862 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridNodePredicate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridNodePredicate.java @@ -100,13 +100,18 @@ public class GridNodePredicate implements IgnitePredicate<ClusterNode>, Iterable public GridNodePredicate(@Nullable ClusterNode... nodes) { if (F.isEmpty(nodes)) ids = Collections.emptySet(); - else if (nodes.length == 1) - ids = Collections.singleton(nodes[0].id()); + else if (nodes.length == 1) { + ClusterNode node = nodes[0]; + + ids = node != null ? Collections.singleton(node.id()) : Collections.<UUID>emptySet(); + } else { ids = U.newHashSet(nodes.length); - for (ClusterNode n : nodes) - ids.add(n.id()); + for (ClusterNode n : nodes) { + if (n != null) + ids.add(n.id()); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 53cec84..be28c30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -425,10 +425,10 @@ public class GridNioServer<T> { int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut); - IgniteInClosure<IgniteException> ackClosure; + IgniteInClosure<IgniteException> ackC; - if (!sys && (ackClosure = ses.removeMeta(ACK_CLOSURE.ordinal())) != null) - fut.ackClosure(ackClosure); + if (!sys && (ackC = ses.removeMeta(ACK_CLOSURE.ordinal())) != null) + fut.ackClosure(ackC); if (ses.closed()) { if (ses.removeFuture(fut)) @@ -1609,15 +1609,14 @@ public class GridNioServer<T> { sessions.remove(ses); - if (closed) - ses.onServerStopped(); - SelectionKey key = ses.key(); // Shutdown input and output so that remote client will see correct socket close. Socket sock = ((SocketChannel)key.channel()).socket(); if (ses.setClosed()) { + ses.onClosed(); + if (directBuf) { if (ses.writeBuffer() != null) ((DirectBuffer)ses.writeBuffer()).cleaner().clean(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 6b1f6a7..deb7d2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -294,6 +294,13 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { * */ void onServerStopped() { + onClosed(); + } + + /** + * + */ + void onClosed() { if (sem != null) sem.release(1_000_000); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java index 77bb99d..8402cbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -136,6 +136,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { /** */ private boolean mcastErr; + @GridToStringExclude + private Set<InetSocketAddress> locNodeAddrs; + /** * Constructs new IP finder. */ @@ -369,6 +372,8 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { } if (!clientMode) { + locNodeAddrs = new HashSet<>(addrs); + if (addrSnds.isEmpty()) { try { // Create non-bound socket if local host is loopback or failed to create sockets explicitly @@ -403,8 +408,11 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { else mcastErr = true; } - else + else { assert addrSnds.isEmpty() : addrSnds; + + locNodeAddrs = Collections.emptySet(); + } } /** {@inheritDoc} */ @@ -607,7 +615,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { U.close(sock); } - if (!rmtAddrs.isEmpty()) + if (rmtAddrs.size() > locNodeAddrs.size()) break; if (i < addrReqAttempts - 1) // Wait some time before re-sending address request. http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java index d916d78..18eb3b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java @@ -109,6 +109,10 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { } assertEquals(oldest.node(), ignite.cluster().forNode(node).node()); + + ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val"); + + assertEquals(0, emptyGrp.forOldest().nodes().size()); } /** @@ -130,6 +134,10 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { } assertEquals(youngest.node(), ignite.cluster().forNode(node).node()); + + ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val"); + + assertEquals(0, emptyGrp.forYoungest().nodes().size()); } /** @@ -187,8 +195,7 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { assertEquals(grid(gridMaxOrder(clusterSize, false)).localNode().id(), oddYoungest.node().id()); assertEquals(grid(2).localNode().id(), oddOldest.node().id()); - try (Ignite g4 = startGrid(NODES_CNT); - Ignite g5 = startGrid(NODES_CNT + 1)) + try (Ignite g4 = startGrid(NODES_CNT); Ignite g5 = startGrid(NODES_CNT + 1)) { clusterSize = g4.cluster().nodes().size(); @@ -241,6 +248,27 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { } /** + * @throws Exception If failed. + */ + public void testEmptyGroup() throws Exception { + ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val"); + + assertEquals(0, emptyGrp.forOldest().nodes().size()); + assertEquals(0, emptyGrp.forYoungest().nodes().size()); + assertEquals(0, emptyGrp.forAttribute("nonExistent2", "val").nodes().size()); + assertEquals(0, emptyGrp.forCacheNodes("cacheName").nodes().size()); + assertEquals(0, emptyGrp.forClientNodes("cacheName").nodes().size()); + assertEquals(0, emptyGrp.forClients().nodes().size()); + assertEquals(0, emptyGrp.forDaemons().nodes().size()); + assertEquals(0, emptyGrp.forDataNodes("cacheName").nodes().size()); + assertEquals(0, emptyGrp.forRandom().nodes().size()); + assertEquals(0, emptyGrp.forRemotes().nodes().size()); + assertEquals(0, emptyGrp.forServers().nodes().size()); + assertEquals(0, emptyGrp.forHost(ignite.cluster().localNode()).nodes().size()); + assertEquals(0, emptyGrp.forHost("127.0.0.1").nodes().size()); + } + + /** * @param cnt Count. * @param even Even. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 5dbf75a..5234d6e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -73,6 +73,9 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.values; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -169,6 +172,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setName("nearCache"); + ccfg.setAtomicWriteOrderMode(PRIMARY); final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>()); @@ -786,8 +790,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) { CacheAtomicWriteOrderMode[] writeOrders = - atomicityMode == ATOMIC ? CacheAtomicWriteOrderMode.values() : - new CacheAtomicWriteOrderMode[]{CacheAtomicWriteOrderMode.CLOCK}; + atomicityMode == ATOMIC ? values() : new CacheAtomicWriteOrderMode[]{CLOCK}; for (CacheAtomicWriteOrderMode writeOrder : writeOrders) { for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java new file mode 100644 index 0000000..1e3b213 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeJobResultPolicy; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.ComputeTaskAdapter; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class TaskNodeRestartTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 3; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testTaskNodeRestart() throws Exception { + final AtomicBoolean finished = new AtomicBoolean(); + + final AtomicInteger stopIdx = new AtomicInteger(); + + IgniteInternalFuture<?> restartFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = stopIdx.getAndIncrement(); + + int node = NODES + idx; + + while (!finished.get()) { + log.info("Start node: " + node); + + startGrid(node); + + U.sleep(300); + + log.info("Stop node: " + node); + + stopGrid(node); + } + + return null; + } + }, 2, "stop-thread"); + + IgniteInternalFuture<?> fut = null; + + try { + final long stopTime = System.currentTimeMillis() + 60_000; + + final AtomicInteger idx = new AtomicInteger(); + + fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int node = idx.getAndIncrement() % NODES; + + Ignite ignite = ignite(node); + + log.info("Start thread: " + ignite.name()); + + IgniteCompute compute = ignite.compute(); + + while (U.currentTimeMillis() < stopTime) { + try { + compute.broadcast(new TestCallable()); + + compute.call(new TestCallable()); + + compute.execute(new TestTask1(), null); + + compute.execute(new TestTask2(), null); + } + catch (IgniteException e) { + log.info("Error: " + e); + } + } + + return null; + } + }, 20, "test-thread"); + + fut.get(90_000); + + finished.set(true); + + restartFut.get(); + } + finally { + finished.set(true); + + if (fut != null) + fut.cancel(); + + restartFut.get(5000); + } + } + + /** + * + */ + private static class TestTask1 extends ComputeTaskAdapter<Void, Void> { + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Void arg) + throws IgniteException { + Map<TestJob, ClusterNode> jobs = new HashMap<>(); + + for (ClusterNode node : subgrid) + jobs.put(new TestJob(), node); + + return jobs; + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException { + return null; + } + } + + /** + * + */ + private static class TestTask2 implements ComputeTask<Void, Void> { + /** {@inheritDoc} */ + @Nullable public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Void arg) + throws IgniteException { + Map<TestJob, ClusterNode> jobs = new HashMap<>(); + + for (ClusterNode node : subgrid) + jobs.put(new TestJob(), node); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) { + return null; + } + } + + /** + * + */ + private static class TestJob extends ComputeJobAdapter { + /** {@inheritDoc} */ + @Override public Object execute() throws IgniteException { + return null; + } + } + + /** + * + */ + private static class TestCallable implements IgniteCallable<Void> { + /** {@inheritDoc} */ + @Nullable @Override public Void call() throws Exception { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java index 1738a0d..5d074e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java @@ -82,7 +82,7 @@ public class IgniteCacheSizeFailoverTest extends GridCommonAbstractTest { IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { @Override public Object call() throws Exception { - int idx = cntr.getAndIncrement(); + int idx = cntr.getAndIncrement() % 2; IgniteCache<Object, Object> cache = ignite(idx).cache(null); @@ -97,7 +97,7 @@ public class IgniteCacheSizeFailoverTest extends GridCommonAbstractTest { return null; } - }, 2, "size-thread"); + }, 10, "size-thread"); try { for (int i = 0; i < 10; i++) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicySelfTest.java index af04cdc..a253a25 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicySelfTest.java @@ -197,7 +197,9 @@ public class RandomEvictionPolicySelfTest extends } }, 10); - assert g.cache(null).size() <= max; + int size = g.cache(null).size(); + + assertTrue("Unexpected cache size [size=" + size + ", max=" + max + ']', size <= max); info(policy(0)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java index 16ea5e4..dfea37a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java @@ -23,6 +23,7 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteServices; import org.apache.ignite.Ignition; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceContext; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -31,6 +32,13 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; * Tests that {@link GridServiceProcessor} completes deploy/undeploy futures during node stop. */ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + /** * @throws Exception If failed. */ @@ -43,6 +51,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest { Thread t = new Thread(new Runnable() { @Override public void run() { + Thread.currentThread().setName("deploy-thread"); + IgniteServices svcs = ignite.services(); IgniteServices services = svcs.withAsync(); @@ -69,13 +79,19 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest { Ignition.stopAll(true); - assertTrue("Deploy future isn't completed", finishLatch.await(15, TimeUnit.SECONDS)); + boolean wait = finishLatch.await(15, TimeUnit.SECONDS); + + if (!wait) + U.dumpThreads(log); + + assertTrue("Deploy future isn't completed", wait); } /** * Simple map service. */ public interface TestService { + // No-op. } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java index 62f4c1a..e885f48 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java @@ -78,8 +78,6 @@ public class IgniteMessagingWithClientTest extends GridCommonAbstractTest implem * @throws Exception If failed. */ public void testMessageSendWithClientJoin() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-996"); - startGrid(0); Ignite ignite1 = startGrid(1); http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointAbstractSelfTest.java index 06cbf1c..c087d38 100644 --- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointAbstractSelfTest.java @@ -36,6 +36,7 @@ import org.apache.ignite.compute.ComputeTaskSessionScope; import org.apache.ignite.compute.ComputeTaskSplitAdapter; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.TaskSessionResource; @@ -55,7 +56,7 @@ public abstract class GridSessionCheckpointAbstractSelfTest extends GridCommonAb private static final int SPLIT_COUNT = 5; /** */ - private static CountDownLatch taskLatch; + private static volatile CountDownLatch taskLatch; /** */ protected GridSessionCheckpointAbstractSelfTest() { http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 5475f25..5af0596 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -980,7 +980,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } } - assertTrue("GridTcpDiscoveryMulticastIpFinder should register port." , found); + assertTrue("TcpDiscoveryMulticastIpFinder should register port." , found); } } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java index 23f2edc..e2c7e26 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.IgniteComputeEmptyClusterGroupTest; import org.apache.ignite.internal.IgniteComputeTopologyExceptionTest; import org.apache.ignite.internal.IgniteExecutorServiceTest; import org.apache.ignite.internal.IgniteExplicitImplicitDeploymentSelfTest; +import org.apache.ignite.internal.TaskNodeRestartTest; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfTest; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest; import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest; @@ -144,6 +145,7 @@ public class IgniteComputeGridTestSuite { suite.addTestSuite(IgniteComputeEmptyClusterGroupTest.class); suite.addTestSuite(IgniteComputeTopologyExceptionTest.class); suite.addTestSuite(GridTaskFailoverAffinityRunTest.class); + suite.addTestSuite(TaskNodeRestartTest.class); return suite; }
