ignite-4450 Need release locks for failing nodes during exchange processing.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/41dddb87 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/41dddb87 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/41dddb87 Branch: refs/heads/ignite-2.0 Commit: 41dddb87da53bbe72594b0b4bb9e2a396a57b986 Parents: c864fe4 Author: sboikov <[email protected]> Authored: Thu Dec 22 11:26:18 2016 +0300 Committer: sboikov <[email protected]> Committed: Thu Dec 22 11:26:18 2016 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 6 +- .../CacheLockReleaseNodeLeaveTest.java | 135 +++++++++++++++++++ 2 files changed, 139 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/41dddb87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4f34401..2cfc0d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -805,8 +805,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } catch (IgniteFutureTimeoutCheckedException ignored) { if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) { - U.warn(log, "Failed to wait for locks release future. " + - "Dumping pending objects that might be the cause: " + cctx.localNodeId()); + U.warn(log, "Failed to wait for locks release future. Dumping pending objects that might be the " + + "cause [topVer=" + topologyVersion() + ", nodeId=" + cctx.localNodeId() + "]: "); U.warn(log, "Locked keys:"); @@ -1556,6 +1556,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (isDone() || !enterBusy()) return; + cctx.mvcc().removeExplicitNodeLocks(node.id(), topologyVersion()); + try { onDiscoveryEvent(new IgniteRunnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/ignite/blob/41dddb87/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java index e84fd3f..969b991 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java @@ -20,9 +20,13 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -158,4 +162,135 @@ public class CacheLockReleaseNodeLeaveTest extends GridCommonAbstractTest { fut2.get(5, SECONDS); } + + /** + * @throws Exception If failed. + */ + public void testLockRelease2() throws Exception { + final Ignite ignite0 = startGrid(0); + + Ignite ignite1 = startGrid(1); + + Lock lock = ignite1.cache(null).lock("key"); + lock.lock(); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(2); + + return null; + } + }); + + final AffinityTopologyVersion topVer = new AffinityTopologyVersion(2, 0); + + // Wait when affinity change exchange start. + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + AffinityTopologyVersion topVer0 = + ((IgniteKernal)ignite0).context().cache().context().exchange().topologyVersion(); + + return topVer.compareTo(topVer0) < 0; + } + }, 10_000); + + assertTrue(wait); + + assertFalse(fut.isDone()); + + ignite1.close(); + + fut.get(10_000); + + Ignite ignite2 = ignite(2); + + lock = ignite2.cache(null).lock("key"); + lock.lock(); + lock.unlock(); + } + + /** + * @throws Exception If failed. + */ + public void testLockRelease3() throws Exception { + startGrid(0); + + Ignite ignite1 = startGrid(1); + + awaitPartitionMapExchange(); + + Lock lock = ignite1.cache(null).lock("key"); + lock.lock(); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(2); + + return null; + } + }); + + assertFalse(fut.isDone()); + + ignite1.close(); + + fut.get(10_000); + + Ignite ignite2 = ignite(2); + + lock = ignite2.cache(null).lock("key"); + lock.lock(); + lock.unlock(); + } + + /** + * @throws Exception If failed. + */ + public void testTxLockRelease2() throws Exception { + final Ignite ignite0 = startGrid(0); + + Ignite ignite1 = startGrid(1); + + IgniteCache cache = ignite1.cache(null); + ignite1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + cache.get(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(2); + + return null; + } + }); + + final AffinityTopologyVersion topVer = new AffinityTopologyVersion(2, 0); + + // Wait when affinity change exchange start. + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + AffinityTopologyVersion topVer0 = + ((IgniteKernal)ignite0).context().cache().context().exchange().topologyVersion(); + + return topVer.compareTo(topVer0) < 0; + } + }, 10_000); + + assertTrue(wait); + + assertFalse(fut.isDone()); + + ignite1.close(); + + fut.get(10_000); + + Ignite ignite2 = ignite(2); + + cache = ignite2.cache(null); + + try (Transaction tx = ignite2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.get(1); + + tx.commit(); + } + } }
