Repository: ignite Updated Branches: refs/heads/ignite-1.5.1-2 fad8b7a5a -> 7049f52ee
ignite-1.5 Corrected fix for hang on metadata update. Fix for ignite-647 (issues with dynamic cache start when fair affinity is used). Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/383f317d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/383f317d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/383f317d Branch: refs/heads/ignite-1.5.1-2 Commit: 383f317d03aca8903aeaa00da903366911103cef Parents: fe14099 Author: sboikov <sboi...@gridgain.com> Authored: Thu Dec 24 13:12:23 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Dec 24 13:12:23 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 3 +- .../binary/CacheObjectBinaryProcessorImpl.java | 3 + .../dht/atomic/GridDhtAtomicCache.java | 89 ++++++++---------- .../GridDhtPartitionsExchangeFuture.java | 20 +++- ...ridNearOptimisticTxPrepareFutureAdapter.java | 10 +- .../ignite/IgniteCacheAffinitySelfTest.java | 7 -- .../fair/FairAffinityDynamicCacheSelfTest.java | 17 +--- .../cache/CrossCacheTxRandomOperationsTest.java | 2 - ...yMetadataUpdateChangingTopologySelfTest.java | 97 +++++++++++++------- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 3 +- .../TcpDiscoveryMulticastIpFinderSelfTest.java | 21 ++++- .../IgniteCacheRestartTestSuite2.java | 3 + .../stream/mqtt/IgniteMqttStreamerTest.java | 33 ++++--- 13 files changed, 173 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 380c163..ff02e70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1961,7 +1961,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (req.initiatingNodeId() == null) desc.staticallyConfigured(true); - desc.receivedOnDiscovery(true); + if (joiningNodeId.equals(ctx.localNodeId())) + desc.receivedOnDiscovery(true); DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc); http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 7586a42..bcc2ab7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -491,6 +491,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null); + if (topVer == null) + topVer = ctx.cache().context().exchange().readyAffinityVersion(); + BinaryObjectException err = metaDataCache.invoke(topVer, key, new MetadataProcessor(mergedMeta)); if (err != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 634a9ea..393413e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1290,59 +1290,48 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn retVal = null; - IgniteTxManager tm = ctx.tm(); + if (keys.size() > 1 && // Several keys ... + writeThrough() && !req.skipStore() && // and store is enabled ... + !ctx.store().isLocal() && // and this is not local store ... + !ctx.dr().receiveEnabled() // and no DR. + ) { + // This method can only be used when there are no replicated entries in the batch. + UpdateBatchResult updRes = updateWithBatch(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + completionCb, + ctx.isDrEnabled(), + taskName, + expiry, + sndPrevVal); - // Needed for metadata cache transaction. - boolean set = tm.setTxTopologyHint(req.topologyVersion()); + deleted = updRes.deleted(); + dhtFut = updRes.dhtFuture(); - try { - if (keys.size() > 1 && // Several keys ... - writeThrough() && !req.skipStore() && // and store is enabled ... - !ctx.store().isLocal() && // and this is not local store ... - !ctx.dr().receiveEnabled() // and no DR. - ) { - // This method can only be used when there are no replicated entries in the batch. - UpdateBatchResult updRes = updateWithBatch(node, - hasNear, - req, - res, - locked, - ver, - dhtFut, - completionCb, - ctx.isDrEnabled(), - taskName, - expiry, - sndPrevVal); - - deleted = updRes.deleted(); - dhtFut = updRes.dhtFuture(); - - if (req.operation() == TRANSFORM) - retVal = updRes.invokeResults(); - } - else { - UpdateSingleResult updRes = updateSingle(node, - hasNear, - req, - res, - locked, - ver, - dhtFut, - completionCb, - ctx.isDrEnabled(), - taskName, - expiry, - sndPrevVal); - - retVal = updRes.returnValue(); - deleted = updRes.deleted(); - dhtFut = updRes.dhtFuture(); - } + if (req.operation() == TRANSFORM) + retVal = updRes.invokeResults(); } - finally { - if (set) - tm.setTxTopologyHint(null); + else { + UpdateSingleResult updRes = updateSingle(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + completionCb, + ctx.isDrEnabled(), + taskName, + expiry, + sndPrevVal); + + retVal = updRes.returnValue(); + deleted = updRes.deleted(); + dhtFut = updRes.dhtFuture(); } if (retVal == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 854726f..a10294f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -329,6 +329,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @return {@code True} if cache was added during this exchange. */ public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) { + if (cacheStarted(cacheId)) + return true; + + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer); + } + + /** + * @param cacheId Cache ID. + * @return {@code True} if non-client cache was added during this exchange. + */ + private boolean cacheStarted(int cacheId) { if (!F.isEmpty(reqs)) { for (DynamicCacheChangeRequest req : reqs) { if (req.start() && !req.clientStartOnly()) { @@ -338,9 +351,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer); + return false; } /** @@ -419,7 +430,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT // If local node did not initiate exchange or local node is the only cache node in grid. Collection<ClusterNode> affNodes = CU.affinityNodes(cacheCtx, exchId.topologyVersion()); - return !exchId.nodeId().equals(cctx.localNodeId()) || + return cacheStarted(cacheCtx.cacheId()) || + !exchId.nodeId().equals(cctx.localNodeId()) || (affNodes.size() == 1 && affNodes.contains(cctx.localNode())); } http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index fa7020b..fe6180a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -52,10 +52,16 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT // Obtain the topology version to use. long threadId = Thread.currentThread().getId(); - AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); + AffinityTopologyVersion topVer = null; + + if (tx.system()) + topVer = tx.topologyVersionSnapshot(); + + if (topVer == null) + topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) + if (topVer == null && tx.system()) topVer = cctx.tm().lockedTopologyVersion(threadId, tx); if (topVer != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java index 3d76268..5b08f62 100644 --- a/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java @@ -92,17 +92,10 @@ public class IgniteCacheAffinitySelfTest extends IgniteCacheAbstractTest { return new NearCacheConfiguration(); } - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - fail("Enable when https://issues.apache.org/jira/browse/IGNITE-647 is fixed."); - } - /** * @throws Exception if failed. */ public void testAffinity() throws Exception { - fail("Enable when https://issues.apache.org/jira/browse/IGNITE-647 is fixed."); - checkAffinity(); stopGrid(gridCount() - 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java index ef67495..4299935 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java @@ -37,22 +37,11 @@ public class FairAffinityDynamicCacheSelfTest extends GridCommonAbstractTest { /** */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - /** */ - public FairAffinityDynamicCacheSelfTest(){ - super(false); - } - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(IP_FINDER); - - cfg.getTransactionConfiguration().setTxSerializableEnabled(true); - - cfg.setDiscoverySpi(disco); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); return cfg; } @@ -71,8 +60,6 @@ public class FairAffinityDynamicCacheSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testStartStopCache() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-647"); - CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); cacheCfg.setCacheMode(CacheMode.PARTITIONED); @@ -94,6 +81,6 @@ public class FairAffinityDynamicCacheSelfTest extends GridCommonAbstractTest { } }); - destFut.get(2000L); + destFut.get(5000L); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java index d88f12f..2577d93 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java @@ -126,8 +126,6 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCrossCacheTxOperationsFairAffinity() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-647"); - txOperations(PARTITIONED, FULL_SYNC, true, true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java index c95c586..9eaa848 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java @@ -25,10 +25,12 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -48,7 +50,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; /** * Tests specific scenario when binary metadata should be updated from a system thread @@ -105,7 +106,7 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm IgniteCache<Object, Object> cache = ignite(0).cache("cache").withAsync(); - cache.putAll(F.asMap(key1, "val1", key2, new TestValue())); + cache.putAll(F.asMap(key1, "val1", key2, new TestValue1())); try { Thread.sleep(500); @@ -118,8 +119,47 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm } }); + Thread.sleep(1000); + + spi.stopBlock(); + + cache.future().get(); + + fut.get(); + } + finally { + stopGrid(4); + } + } + + /** + * @throws Exception If failed. + */ + public void testNoDeadlockInvoke() throws Exception { + int key1 = primaryKey(ignite(1).cache("cache")); + int key2 = primaryKey(ignite(2).cache("cache")); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearTxPrepareResponse.class, ignite(0).cluster().localNode().id()); + + IgniteCache<Object, Object> cache = ignite(0).cache("cache").withAsync(); + + cache.invokeAll(F.asSet(key1, key2), new TestEntryProcessor()); + + try { Thread.sleep(500); + IgniteInternalFuture<Void> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(4); + + return null; + } + }); + + Thread.sleep(1000); + spi.stopBlock(); cache.future().get(); @@ -145,12 +185,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm /** */ private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>(); - /** */ - private Class<?> recordCls; - - /** */ - private List<Object> recordedMsgs = new ArrayList<>(); - /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { @@ -158,9 +192,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm Object msg0 = ((GridIoMessage)msg).message(); synchronized (this) { - if (recordCls != null && msg0.getClass().equals(recordCls)) - recordedMsgs.add(msg0); - Set<UUID> blockNodes = blockCls.get(msg0.getClass()); if (F.contains(blockNodes, node.id())) { @@ -178,28 +209,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm } /** - * @param recordCls Message class to record. - */ - void record(@Nullable Class<?> recordCls) { - synchronized (this) { - this.recordCls = recordCls; - } - } - - /** - * @return Recorded messages. - */ - List<Object> recordedMessages() { - synchronized (this) { - List<Object> msgs = recordedMsgs; - - recordedMsgs = new ArrayList<>(); - - return msgs; - } - } - - /** * @param cls Message class. * @param nodeId Node ID. */ @@ -241,7 +250,27 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm /** * */ - private static class TestValue { + static class TestEntryProcessor implements CacheEntryProcessor<Object, Object, Object> { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Object, Object> e, Object... arguments) { + e.setValue(new TestValue2()); + + return null; + } + } + + /** + * + */ + private static class TestValue1 { + /** Field1. */ + private String field1; + } + + /** + * + */ + private static class TestValue2 { /** Field1. */ private String field1; } http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index 38e3d98..9e78fb9 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -179,8 +179,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS @Override public boolean apply() { return recoveryDesc.messagesFutures().isEmpty(); } - }, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() + 7000 : - 10_000); + }, 10_000); assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, recoveryDesc.messagesFutures().size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java index b39be56..90fdb0a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java @@ -101,11 +101,11 @@ public class TcpDiscoveryMulticastIpFinderSelfTest assertEquals(1, addrs1.size()); assertEquals(2, addrs2.size()); - assertEquals(3, addrs3.size()); + assertTrue("Unexpected number of addresses: " + addrs3, addrs3.size() == 2 || addrs3.size() == 3); - assertEquals(3, ipFinder1.getRegisteredAddresses().size()); - assertEquals(3, ipFinder2.getRegisteredAddresses().size()); - assertEquals(3, ipFinder3.getRegisteredAddresses().size()); + checkRequestAddresses(ipFinder1, 3); + checkRequestAddresses(ipFinder2, 3); + checkRequestAddresses(ipFinder3, 3); } finally { if (ipFinder1 != null) @@ -118,4 +118,17 @@ public class TcpDiscoveryMulticastIpFinderSelfTest ipFinder3.close(); } } + + /** + * @param ipFinder IP finder. + * @param exp Expected number of addresses. + */ + private void checkRequestAddresses(TcpDiscoveryMulticastIpFinder ipFinder, int exp) { + for (int i = 0; i < 10; i++) { + if (ipFinder.getRegisteredAddresses().size() == exp) + return; + } + + assertEquals(exp, ipFinder.getRegisteredAddresses().size()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java index c9e9467..de87e99 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.GridCachePutAllFailoverSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPutAllFailoverSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCachePutAllRestartTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest; @@ -42,6 +43,8 @@ public class IgniteCacheRestartTestSuite2 extends TestSuite { suite.addTestSuite(IgniteCachePutAllRestartTest.class); suite.addTestSuite(GridCachePutAllFailoverSelfTest.class); + suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java index 891866d..92a530d 100644 --- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java @@ -87,7 +87,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { private MqttStreamer<Integer, String> streamer; /** The UUID of the currently active remote listener. */ - private UUID remoteListener; + private UUID remoteLsnr; /** The Ignite data streamer. */ private IgniteDataStreamer<Integer, String> dataStreamer; @@ -105,7 +105,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - @Before @SuppressWarnings("unchecked") + @Before + @SuppressWarnings("unchecked") public void beforeTest() throws Exception { grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration()); @@ -121,13 +122,13 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { broker.setPersistenceAdapter(null); broker.setPersistenceFactory(null); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry policy = new PolicyEntry(); + PolicyMap plcMap = new PolicyMap(); + PolicyEntry plc = new PolicyEntry(); - policy.setQueuePrefetch(1); + plc.setQueuePrefetch(1); - broker.setDestinationPolicy(policyMap); - broker.getDestinationPolicy().setDefaultEntry(policy); + broker.setDestinationPolicy(plcMap); + broker.getDestinationPolicy().setDefaultEntry(plc); broker.setSchedulerSupport(false); // add the MQTT transport connector to the broker @@ -194,7 +195,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testConnectionStatusWithBrokerDisconnection() throws Exception { - // configure streamer + fail("https://issues.apache.org/jira/browse/IGNITE-2255"); + + // Configure streamer. streamer.setSingleTupleExtractor(singleTupleExtractor()); streamer.setTopic(SINGLE_TOPIC_NAME); streamer.setBlockUntilConnected(true); @@ -202,8 +205,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { streamer.start(); - // action time: repeat 5 times; make sure the connection state is kept correctly every time + // Action time: repeat 5 times; make sure the connection state is kept correctly every time. for (int i = 0; i < 5; i++) { + log.info("Iteration: " + i); + assertTrue(streamer.isConnected()); broker.stop(); @@ -355,7 +360,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } /** - * @throws Exception + * @throws Exception If failed. */ public void testSingleTopic_NoQoS_Reconnect() throws Exception { // configure streamer @@ -557,7 +562,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { // Listen to cache PUT events and expect as many as messages as test data items final CountDownLatch latch = new CountDownLatch(expect); - IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() { + IgniteBiPredicate<UUID, CacheEvent> cb = new IgniteBiPredicate<UUID, CacheEvent>() { @Override public boolean apply(UUID uuid, CacheEvent evt) { latch.countDown(); @@ -565,8 +570,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { } }; - remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)) - .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); + remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(null)) + .remoteListen(cb, null, EVT_CACHE_OBJECT_PUT); return latch; } @@ -586,7 +591,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { assertEquals(cnt, cache.size(CachePeekMode.ALL)); // remove the event listener - grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener); + grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteLsnr); } /**