Repository: ignite Updated Branches: refs/heads/ignite-1758 98de9d49a -> c15eb405f
ignite-1758 debug Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c15eb405 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c15eb405 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c15eb405 Branch: refs/heads/ignite-1758 Commit: c15eb405f97e4c0fd3b34af3a8fd08b3dfe711c5 Parents: 98de9d4 Author: sboikov <[email protected]> Authored: Mon Nov 2 13:51:22 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 2 13:51:22 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 5 +- .../messages/TcpDiscoveryDiscardMessage.java | 1 + ...gniteClientReconnectMassiveShutdownTest.java | 69 +++++++++++--------- 3 files changed, 43 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c15eb405/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index d5de093..0fe2881 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3241,6 +3241,9 @@ class ServerImpl extends TcpDiscoveryImpl { } } else { + if (isLocalNodeCoordinator()) + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); + if (isLocNodeRouter) { ClientMessageWorker wrk = clientMsgWorkers.get(nodeId); @@ -3251,7 +3254,7 @@ class ServerImpl extends TcpDiscoveryImpl { locNodeId + ", clientNodeId=" + nodeId + ']'); } else { - if (ring.hasRemoteNodes() && !locNodeId.equals(msg.verifierNodeId())) + if (ring.hasRemoteNodes() && !isLocalNodeCoordinator()) sendMessageAcrossRing(msg); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c15eb405/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java index 145f19e..4b4eb9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java @@ -40,6 +40,7 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage { * * @param creatorNodeId Creator node ID. * @param msgId Message ID. + * @param customMsgDiscard Flag indicating whether the ID to discard is for a custom message or not. */ public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard) { super(creatorNodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/c15eb405/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java index b31fd49..30eb691 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java @@ -164,55 +164,62 @@ public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstract IgniteInternalFuture<?> clientsFut = multithreadedAsync( new Callable<Object>() { @Override public Object call() throws Exception { - int idx = clientIdx.take(); + try { + int idx = clientIdx.take(); - Ignite ignite = grid(idx); + Ignite ignite = grid(idx); - Thread.currentThread().setName("client-thread-" + ignite.name()); + Thread.currentThread().setName("client-thread-" + ignite.name()); - assertTrue(ignite.configuration().isClientMode()); + assertTrue(ignite.configuration().isClientMode()); - IgniteCache<String, Integer> cache = ignite.cache(null); + IgniteCache<String, Integer> cache = ignite.cache(null); - assertNotNull(cache); + assertNotNull(cache); - IgniteTransactions txs = ignite.transactions(); + IgniteTransactions txs = ignite.transactions(); - Random rand = new Random(); + Random rand = new Random(); - latch.countDown(); + latch.countDown(); - while (!done.get()) { - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000)); + while (!done.get()) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000)); - tx.commit(); - } - catch (ClusterTopologyException ex) { - ex.retryReadyFuture().get(); - } - catch (IgniteException | CacheException e) { - if (X.hasCause(e, IgniteClientDisconnectedException.class)) { - IgniteClientDisconnectedException cause = X.cause(e, - IgniteClientDisconnectedException.class); + tx.commit(); + } + catch (ClusterTopologyException ex) { + ex.retryReadyFuture().get(); + } + catch (IgniteException | CacheException e) { + if (X.hasCause(e, IgniteClientDisconnectedException.class)) { + IgniteClientDisconnectedException cause = X.cause(e, + IgniteClientDisconnectedException.class); - assert cause != null; + assert cause != null; - cause.reconnectFuture().get(); - } - else if (X.hasCause(e, ClusterTopologyException.class)) { - ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); + cause.reconnectFuture().get(); + } + else if (X.hasCause(e, ClusterTopologyException.class)) { + ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); - assert cause != null; + assert cause != null; - cause.retryReadyFuture().get(); + cause.retryReadyFuture().get(); + } + else + throw e; } - else - throw e; } + + return null; } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); - return null; + throw e; + } } }, CLIENT_GRID_CNT, "client-thread");
