Repository: ignite
Updated Branches:
  refs/heads/master aed482df5 -> 3fc5d578d


IGNITE-7090 Semaphore Stuck when no acquirers to assign permit - Fixes #3443.

Signed-off-by: dspavlov <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3fc5d578
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3fc5d578
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3fc5d578

Branch: refs/heads/master
Commit: 3fc5d578db37d0d1e61daff0cdfc3d71729ffa41
Parents: aed482d
Author: Tim Onyschak <[email protected]>
Authored: Sat Mar 31 15:58:25 2018 +0300
Committer: dspavlov <[email protected]>
Committed: Sat Mar 31 15:58:25 2018 +0300

----------------------------------------------------------------------
 .../datastructures/DataStructuresProcessor.java |  32 +++++
 .../SemaphoreFailoverNoWaitingAcquirerTest.java | 134 +++++++++++++++++++
 .../IgniteCacheDataStructuresSelfTestSuite.java |   2 +
 3 files changed, 168 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3fc5d578/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index a37797c..b942dc2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.datastructures;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
@@ -1235,6 +1236,37 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter implemen
 
                 GridCacheSemaphoreEx sem0 = new GridCacheSemaphoreImpl(name, 
key, cache);
 
+                //check Cluster state against semaphore state
+                if (val != null && failoverSafe) {
+                    GridCacheSemaphoreState semState = 
(GridCacheSemaphoreState) val;
+
+                    boolean updated = false;
+
+                    Map<UUID,Integer> waiters = semState.getWaiters();
+
+                    Integer permit = ((GridCacheSemaphoreState) 
val).getCount();
+
+                    for (UUID nodeId : new HashSet<>(waiters.keySet())) {
+
+                        ClusterNode node = ctx.cluster().get().node(nodeId);
+
+                        if (node == null) {
+
+                            permit += waiters.get(nodeId);
+
+                            waiters.remove(nodeId);
+
+                            updated = true;
+                        }
+                    }
+                    if (updated) {
+                        semState.setWaiters(waiters);
+                        semState.setCount(permit);
+
+                        retVal = semState;
+                    }
+                }
+
                 return new T2<>(sem0, retVal);
             }
         }, cfg, name, SEMAPHORE, create, GridCacheSemaphoreEx.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3fc5d578/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/SemaphoreFailoverNoWaitingAcquirerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/SemaphoreFailoverNoWaitingAcquirerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/SemaphoreFailoverNoWaitingAcquirerTest.java
new file mode 100644
index 0000000..862d240
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/SemaphoreFailoverNoWaitingAcquirerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.AtomicConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ * Class to test the retrieval of a permit on a semaphore after initial 
semaphore owner has been closed. 
+ *
+ * IGNITE-7090
+ *
+ * <p>
+ * <b><pre>
+ * </pre></b>
+ *
+ */
+public class SemaphoreFailoverNoWaitingAcquirerTest extends 
GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Grid count. */
+    private static final int GRID_CNT = 3;
+
+    /** Atomics cache mode. */
+    private CacheMode atomicsCacheMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        AtomicConfiguration atomicCfg = atomicConfiguration();
+
+        assertNotNull(atomicCfg);
+
+        cfg.setAtomicConfiguration(atomicCfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReleasePermitsPartitioned() throws Exception {
+        atomicsCacheMode = PARTITIONED;
+
+        doTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReleasePermitsReplicated() throws Exception {
+        atomicsCacheMode = REPLICATED;
+
+        doTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest() throws Exception {
+        try {
+            startGrids(GRID_CNT);
+
+            Ignite ignite = grid(0);
+
+            IgniteSemaphore sem = ignite.semaphore("sem", 1, true, true);
+
+            assertEquals(1, sem.availablePermits());
+
+            sem.acquire(1);
+
+            assertEquals(0, sem.availablePermits());
+
+            ignite.close();
+
+            awaitPartitionMapExchange();
+            IgniteSemaphore sem2 = grid(1).semaphore("sem", 1, true, true);
+
+            assertTrue("Could not aquire after 'restart'",sem2.tryAcquire(1, 
5000, TimeUnit.MILLISECONDS));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @return Atomic configuration.
+     */
+    protected AtomicConfiguration atomicConfiguration() {
+        AtomicConfiguration atomicCfg = new AtomicConfiguration();
+
+        atomicCfg.setCacheMode(atomicsCacheMode);
+
+        if (atomicsCacheMode == PARTITIONED)
+            atomicCfg.setBackups(1);
+
+        return atomicCfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3fc5d578/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
index 414f463..9583143 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
@@ -26,6 +26,7 @@ import 
org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDa
 import 
org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDiscoveryDataStructuresTest;
 import 
org.apache.ignite.internal.processors.cache.datastructures.IgniteDataStructureUniqueNameTest;
 import 
org.apache.ignite.internal.processors.cache.datastructures.IgniteDataStructureWithJobTest;
+import 
org.apache.ignite.internal.processors.cache.datastructures.SemaphoreFailoverNoWaitingAcquirerTest;
 import 
org.apache.ignite.internal.processors.cache.datastructures.SemaphoreFailoverSafeReleasePermitsTest;
 import 
org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalAtomicQueueApiSelfTest;
 import 
org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalAtomicSetSelfTest;
@@ -133,6 +134,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends 
TestSuite {
         suite.addTest(new TestSuite(IgniteDataStructureWithJobTest.class));
         suite.addTest(new TestSuite(IgnitePartitionedSemaphoreSelfTest.class));
         suite.addTest(new 
TestSuite(SemaphoreFailoverSafeReleasePermitsTest.class));
+        suite.addTest(new 
TestSuite(SemaphoreFailoverNoWaitingAcquirerTest.class));
         // TODO IGNITE-3141, enabled when fixed.
         // suite.addTest(new TestSuite(IgnitePartitionedLockSelfTest.class));
 

Reply via email to