Repository: ignite Updated Branches: refs/heads/ignite-1537 118f29ae0 -> 70c182f8b
ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70c182f8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70c182f8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70c182f8 Branch: refs/heads/ignite-1537 Commit: 70c182f8be9c5a8aa6af3b3c6b501bb9ea19b78d Parents: 118f29a Author: sboikov <[email protected]> Authored: Wed Dec 23 09:18:13 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Dec 23 09:51:53 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 39 +++++++++++--------- .../dht/atomic/GridDhtAtomicCache.java | 1 + .../ignite/internal/util/lang/GridFunc.java | 1 + ...niteClientReconnectFailoverAbstractTest.java | 3 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 20 +++++++--- .../cache/GridCacheAbstractSelfTest.java | 3 +- .../GridServiceProcessorStopSelfTest.java | 21 ++++++----- 7 files changed, 54 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index bf7c7e4..42f8dae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -666,6 +666,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * * @param plc Policy. * @return Execution pool. + * @throws IgniteCheckedException If failed. */ private Executor pool(byte plc) throws IgniteCheckedException { switch (plc) { @@ -767,6 +768,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param msg Message. * @param plc Execution policy. * @param msgC Closure to call when message processing finished. + * @throws IgniteCheckedException If failed. */ private void processRegularMessage( final UUID nodeId, @@ -824,6 +826,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param msg Ordered message. * @param plc Execution policy. * @param msgC Closure to call when message processing finished ({@code null} for sync processing). + * @throws IgniteCheckedException If failed. */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private void processOrderedMessage( @@ -1029,7 +1032,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param ordered Ordered flag. * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ private void send( @@ -1041,7 +1044,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa boolean ordered, long timeout, boolean skipOnTimeout, - IgniteInClosure<IgniteException> ackClosure + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -1062,8 +1065,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa else processRegularMessage0(ioMsg, locNodeId); - if (ackClosure != null) - ackClosure.apply(null); + if (ackC != null) + ackC.apply(null); } else { if (topicOrd < 0) @@ -1071,7 +1074,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa try { if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) - ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure); + ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC); else getSpi().sendMessage(node, ioMsg); } @@ -1197,12 +1200,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, - IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure); + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC); } /** @@ -1233,12 +1236,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackClosure) + public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, ackClosure); + send(node, topic, -1, msg, plc, false, 0, false, ackC); } /** @@ -1280,7 +1283,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. * @param skipOnTimeout Whether message can be skipped on timeout. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ public void sendOrderedMessage( @@ -1290,11 +1293,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa byte plc, long timeout, boolean skipOnTimeout, - IgniteInClosure<IgniteException> ackClosure + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); } /** @@ -1385,6 +1388,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to subscribe to. * @param p Message predicate. */ + @SuppressWarnings("unchecked") public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) { if (p != null) { try { @@ -1406,6 +1410,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to unsubscribe from. * @param p Message predicate. */ + @SuppressWarnings("unchecked") public void removeUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) { try { removeMessageListener(TOPIC_COMM_USER, @@ -1423,7 +1428,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. * @param skipOnTimeout Whether message can be skipped on timeout. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ public void sendOrderedMessage( @@ -1433,7 +1438,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa byte plc, long timeout, boolean skipOnTimeout, - IgniteInClosure<IgniteException> ackClosure + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; @@ -1442,7 +1447,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 6942d87..634a9ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1292,6 +1292,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { IgniteTxManager tm = ctx.tm(); + // Needed for metadata cache transaction. boolean set = tm.setTxTopologyHint(req.topologyVersion()); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 8d5a8e7..8eeca6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -3408,6 +3408,7 @@ public class GridFunc { * @return First element in given collection for which predicate evaluates to * {@code true} - or {@code null} if such element cannot be found. */ + @SafeVarargs @Nullable public static <V> V find(Iterable<? extends V> c, @Nullable V dfltVal, @Nullable IgnitePredicate<? super V>... p) { A.notNull(c, "c"); http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java index f050c72..7e217b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java @@ -117,7 +117,8 @@ public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteCl } return null; - } catch (Throwable e) { + } + catch (Throwable e) { log.error("Unexpected error in operation thread: " + e, e); stop.set(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 5b294cc..2c2ec3f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -3277,9 +3277,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testPeekExpired() throws Exception { - IgniteCache<String, Integer> c = jcache(); + final IgniteCache<String, Integer> c = jcache(); - String key = primaryKeysForCache(c, 1).get(0); + final String key = primaryKeysForCache(c, 1).get(0); info("Using key: " + key); @@ -3295,6 +3295,12 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract Thread.sleep(ttl + 100); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return peek(c, key) == null; + } + }, 2000); + assert peek(c, key) == null; assert c.localSize() == 0 : "Cache is not empty."; @@ -3307,9 +3313,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract */ public void testPeekExpiredTx() throws Exception { if (txShouldBeUsed()) { - IgniteCache<String, Integer> c = jcache(); + final IgniteCache<String, Integer> c = jcache(); - String key = "1"; + final String key = "1"; int ttl = 500; try (Transaction tx = grid(0).transactions().txStart()) { @@ -3320,7 +3326,11 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract tx.commit(); } - Thread.sleep(ttl + 100); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return c.localPeek(key, ONHEAP) == null; + } + }, 2000); assertNull(c.localPeek(key, ONHEAP)); http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java index 52fbf4c..b3d1384 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java @@ -416,9 +416,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { * @param cache Cache projection. * @param key Key. * @return Value. - * @throws Exception If failed. */ - @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) throws Exception { + @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) { return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.SWAP, CachePeekMode.OFFHEAP) : cache.localPeek(key, CachePeekMode.ONHEAP); } http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java index dfea37a..92b18ab 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java @@ -17,15 +17,18 @@ package org.apache.ignite.internal.processors.service; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteServices; import org.apache.ignite.Ignition; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** @@ -49,10 +52,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest { final Ignite ignite = startGrid(0); - Thread t = new Thread(new Runnable() { - @Override public void run() { - Thread.currentThread().setName("deploy-thread"); - + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { IgniteServices svcs = ignite.services(); IgniteServices services = svcs.withAsync(); @@ -67,13 +68,13 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest { catch (IgniteException e) { finishLatch.countDown(); } - catch (Throwable e) { - log.error("Service deployment error: ", e); + finally { + finishLatch.countDown(); } - } - }); - t.start(); + return null; + } + }, "deploy-thread"); depLatch.await(); @@ -85,6 +86,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest { U.dumpThreads(log); assertTrue("Deploy future isn't completed", wait); + + fut.get(); } /**
