This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 12ab7e0  IGNITE-12451 Introduce deadlock detection for atomic cache 
putAll operations - Fixes #8268.
12ab7e0 is described below

commit 12ab7e0de6cb5a579f7350079aeb53ea8706b0f2
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Oct 28 15:02:27 2020 +0300

    IGNITE-12451 Introduce deadlock detection for atomic cache putAll 
operations - Fixes #8268.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../processors/cache/LockedEntriesInfo.java        | 150 +++++++++++++++++++++
 .../distributed/dht/atomic/GridDhtAtomicCache.java |  39 ++----
 .../cache/local/atomic/GridLocalAtomicCache.java   |  38 ++----
 ...acheAtomicConcurrentUnorderedUpdateAllTest.java | 133 ++++++++++++++++++
 .../testsuites/IgniteCacheMvccTestSuite1.java      |   2 +
 .../ignite/testsuites/IgniteCacheTestSuite.java    |   2 +
 6 files changed, 309 insertions(+), 55 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LockedEntriesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LockedEntriesInfo.java
new file mode 100644
index 0000000..039c6bd
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LockedEntriesInfo.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Helper class to acquire java level locks on unordered set of entries and 
avoid deadlocks.
+ */
+public class LockedEntriesInfo {
+    /** Deadlock detection timeout in milliseconds. */
+    private static final long DEADLOCK_DETECTION_TIMEOUT = 500L;
+
+    /** Locked entries info for each thread. */
+    private final Map<Long, LockedEntries> lockedEntriesPerThread = new 
ConcurrentHashMap<>();
+
+    /**
+     * Attempt to lock all provided entries avoiding deadlocks.
+     *
+     * @param entries Entries to lock.
+     * @return {@code True} if entries were successfully locked, {@code false} 
if possible deadlock detected or
+     *      some entries are obsolete (lock attempt should be retried in this 
case).
+     */
+    public boolean tryLockEntries(GridCacheEntryEx[] entries) {
+        long threadId = Thread.currentThread().getId();
+
+        LockedEntries lockedEntries = new LockedEntries(entries);
+
+        lockedEntriesPerThread.put(threadId, lockedEntries);
+
+        boolean wasInterrupted = false;
+
+        try {
+            for (int i = 0; i < entries.length; i++) {
+                GridCacheEntryEx entry = entries[i];
+
+                if (entry == null)
+                    continue;
+
+                boolean retry = false;
+
+                while (true) {
+                    if (entry.tryLockEntry(DEADLOCK_DETECTION_TIMEOUT))
+                        break; // Successfully locked.
+                    else {
+                        wasInterrupted |= Thread.interrupted(); // Clear 
thread interruption flag.
+
+                        if (hasLockCollisions(entry, lockedEntries)) {
+                            // Possible deadlock detected, unlock all locked 
entries and retry again.
+                            retry = true;
+
+                            break;
+                        }
+                        // Possible deadlock not detected, just retry lock on 
current entry.
+                    }
+                }
+
+                if (!retry && entry.obsolete()) {
+                    entry.unlockEntry();
+
+                    retry = true;
+                }
+
+                if (retry) {
+                    lockedEntries.lockedIdx = -1;
+
+                    // Unlock all previously locked.
+                    for (int j = 0; j < i; j++) {
+                        if (entries[j] != null)
+                            entries[j].unlockEntry();
+                    }
+
+                    return false;
+                }
+
+                lockedEntries.lockedIdx = i;
+            }
+
+            return true;
+        }
+        finally {
+            if (wasInterrupted)
+                Thread.currentThread().interrupt();
+
+            // Already acuired all locks or released all locks here, deadlock 
is not possible by this thread anymore,
+            // can safely delete locks information.
+            lockedEntriesPerThread.remove(threadId);
+        }
+    }
+
+    /**
+     * @param entry Entry.
+     * @param curLockedEntries Current locked entries info.
+     * @return {@code True} if another thread holds lock for this entry and 
started to lock entries earlier.
+     */
+    private boolean hasLockCollisions(GridCacheEntryEx entry, LockedEntries 
curLockedEntries) {
+        for (Map.Entry<Long, LockedEntries> other : 
lockedEntriesPerThread.entrySet()) {
+            LockedEntries otherLockedEntries = other.getValue();
+
+            if (otherLockedEntries == curLockedEntries || 
otherLockedEntries.ts > curLockedEntries.ts)
+                // Skip current thread and threads started to lock after the 
current thread.
+                continue;
+
+            GridCacheEntryEx[] otherThreadLocks = otherLockedEntries.entries;
+
+            int otherThreadLockedIdx = otherLockedEntries.lockedIdx;
+
+            // Visibility guarantees provided by volatile lockedIdx field.
+            for (int i = 0; i <= otherThreadLockedIdx; i++) {
+                if (otherThreadLocks[i] == entry)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /** Per-thread locked entries info. */
+    private static class LockedEntries {
+        /** Timestamp of lock. */
+        private final long ts = System.nanoTime();
+
+        /** Entries to lock. */
+        private final GridCacheEntryEx[] entries;
+
+        /** Current locked entry index. */
+        private volatile int lockedIdx = -1;
+
+        /** */
+        private LockedEntries(GridCacheEntryEx[] entries) {
+            this.entries = entries;
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5249aa9..fec3967 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -65,6 +65,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.LockedEntriesInfo;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -168,6 +169,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             }
         };
 
+    /** Locked entries info for each thread. */
+    private final LockedEntriesInfo lockedEntriesInfo = new 
LockedEntriesInfo();
+
     /** Update reply closure. */
     @GridToStringExclude
     private UpdateReplyClosure updateReplyClos;
@@ -3111,44 +3115,17 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             }
         }
         else {
-            List<GridDhtCacheEntry> locked = new ArrayList<>(req.size());
+            GridDhtCacheEntry[] locked = new GridDhtCacheEntry[req.size()];
 
             while (true) {
                 for (int i = 0; i < req.size(); i++) {
                     GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
 
-                    locked.add(entry);
-                }
-
-                boolean retry = false;
-
-                for (int i = 0; i < locked.size(); i++) {
-                    GridCacheMapEntry entry = locked.get(i);
-
-                    if (entry == null)
-                        continue;
-
-                    entry.lockEntry();
-
-                    if (entry.obsolete()) {
-                        // Unlock all locked.
-                        for (int j = 0; j <= i; j++) {
-                            if (locked.get(j) != null)
-                                locked.get(j).unlockEntry();
-                        }
-
-                        // Clear entries.
-                        locked.clear();
-
-                        // Retry.
-                        retry = true;
-
-                        break;
-                    }
+                    locked[i] = entry;
                 }
 
-                if (!retry)
-                    return locked;
+                if (lockedEntriesInfo.tryLockEntries(locked))
+                    return Arrays.asList(locked);
             }
         }
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 6b2128d..e430a71 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.LockedEntriesInfo;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -90,6 +91,9 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
     /** */
     private GridCachePreloader preldr;
 
+    /** Locked entries info for each thread. */
+    private final LockedEntriesInfo lockedEntriesInfo = new 
LockedEntriesInfo();
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -1476,11 +1480,13 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
      * @return Collection of locked entries.
      */
     private List<GridCacheEntryEx> lockEntries(Collection<? extends K> keys) {
-        List<GridCacheEntryEx> locked = new ArrayList<>(keys.size());
+        GridCacheEntryEx[] locked = new GridCacheEntryEx[keys.size()];
 
         boolean nullKeys = false;
 
         while (true) {
+            int i = 0;
+
             for (K key : keys) {
                 if (key == null) {
                     nullKeys = true;
@@ -1490,40 +1496,24 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
 
                 GridCacheEntryEx entry = entryEx(ctx.toCacheKeyObject(key));
 
-                locked.add(entry);
+                locked[i++] = entry;
             }
 
             if (nullKeys)
                 break;
 
-            for (int i = 0; i < locked.size(); i++) {
-                GridCacheEntryEx entry = locked.get(i);
-
-                entry.lockEntry();
-
-                if (entry.obsolete()) {
-                    // Unlock all locked.
-                    for (int j = 0; j <= i; j++)
-                        locked.get(j).unlockEntry();
-
-                    // Clear entries.
-                    locked.clear();
-
-                    // Retry.
-                    break;
-                }
-            }
-
-            if (!locked.isEmpty())
-                return locked;
+            if (lockedEntriesInfo.tryLockEntries(locked))
+                return Arrays.asList(locked);
         }
 
         assert nullKeys;
 
         AffinityTopologyVersion topVer = 
ctx.affinity().affinityTopologyVersion();
 
-        for (GridCacheEntryEx entry : locked)
-            entry.touch();
+        for (GridCacheEntryEx entry : locked) {
+            if (entry != null)
+                entry.touch();
+        }
 
         throw new NullPointerException("Null key.");
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicConcurrentUnorderedUpdateAllTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicConcurrentUnorderedUpdateAllTest.java
new file mode 100644
index 0000000..6863c949
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicConcurrentUnorderedUpdateAllTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.configuration.Factory;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/** Test concurrent putAll/removeAll operations with unordered set of keys on 
atomic caches. */
+@RunWith(Parameterized.class)
+public class IgniteCacheAtomicConcurrentUnorderedUpdateAllTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final int NODES_CNT = 3;
+
+    /** */
+    private static final int THREADS_CNT = 20;
+
+    /** */
+    private static final String CACHE_NAME = "test-cache";
+
+    /** */
+    private static final int CACHE_SIZE = 1_000;
+
+    /** Parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, writeThrough={1}")
+    public static Iterable<Object[]> data() {
+        return Arrays.asList(
+            new Object[] {CacheMode.PARTITIONED, Boolean.FALSE},
+            new Object[] {CacheMode.PARTITIONED, Boolean.TRUE},
+            new Object[] {CacheMode.REPLICATED, Boolean.FALSE},
+            new Object[] {CacheMode.REPLICATED, Boolean.TRUE},
+            new Object[] {CacheMode.LOCAL, Boolean.FALSE},
+            new Object[] {CacheMode.LOCAL, Boolean.TRUE}
+        );
+    }
+
+    /** Cache mode. */
+    @Parameterized.Parameter()
+    public CacheMode cacheMode;
+
+    /** Write through. */
+    @Parameterized.Parameter(1)
+    public Boolean writeThrough;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConcurrentUpdateAll() throws Exception {
+        Ignite ignite = startGridsMultiThreaded(NODES_CNT);
+
+        Factory<CacheStore<Object, Object>> cacheStoreFactory = writeThrough ?
+                new MapCacheStoreStrategy.MapStoreFactory() : null;
+
+        IgniteCache<Object, Object> cache = ignite.createCache(new 
CacheConfiguration<>(CACHE_NAME)
+            
.setWriteThrough(writeThrough).setCacheStoreFactory(cacheStoreFactory)
+            .setCacheMode(cacheMode).setAtomicityMode(ATOMIC).setBackups(1));
+
+        CyclicBarrier barrier = new CyclicBarrier(THREADS_CNT);
+
+        AtomicInteger threadCnt = new AtomicInteger();
+
+        GridTestUtils.runMultiThreaded(() -> {
+            int threadIdx = threadCnt.incrementAndGet();
+
+            IgniteCache<Object, Object> cache0 = 
grid(ThreadLocalRandom.current().nextInt(NODES_CNT)).cache(CACHE_NAME);
+
+            Map<Object, Object> map = new LinkedHashMap<>();
+
+            if (threadIdx % 2 == 0) {
+                for (int i = 0; i < CACHE_SIZE; i++)
+                    map.put(i, i);
+            } else {
+                for (int i = CACHE_SIZE - 1; i >= 0; i--)
+                    map.put(i, i);
+            }
+
+            for (int i = 0; i < 20; i++) {
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    fail(e.getMessage());
+                }
+
+                cache0.putAll(map);
+
+                cache0.removeAll(map.keySet());
+
+                log.info("Thread " + threadIdx + " iteration " + i + " 
finished");
+            }
+        }, THREADS_CNT, "update-all-runner");
+
+        assertEquals(0, cache.size());
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
index 52ca648..e0589f7 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheReplicatedLocalStore
 import org.apache.ignite.internal.processors.cache.GridCacheStopSelfTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheTcpClientDiscoveryMultiThreadedTest;
 import 
org.apache.ignite.internal.processors.cache.GridDataStorageConfigurationConsistencySelfTest;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicConcurrentUnorderedUpdateAllTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicInvokeTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicLocalInvokeTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicLocalWithStoreInvokeTest;
@@ -151,6 +152,7 @@ public class IgniteCacheMvccTestSuite1 {
         ignoredTests.add(IgniteCacheAtomicWithStoreInvokeTest.class);
         ignoredTests.add(IgniteCacheAtomicLocalInvokeTest.class);
         ignoredTests.add(IgniteCacheAtomicLocalWithStoreInvokeTest.class);
+        
ignoredTests.add(IgniteCacheAtomicConcurrentUnorderedUpdateAllTest.class);
         ignoredTests.add(GridCachePartitionedLocalStoreSelfTest.class);
         ignoredTests.add(GridCacheReplicatedLocalStoreSelfTest.class);
         ignoredTests.add(CacheStoreReadFromBackupTest.class);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 5876bee..f89759a 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -102,6 +102,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheTtlManagerSelfTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheTxPartitionedLocalStoreSelfTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheTxUsersAffinityMapperSelfTest;
 import 
org.apache.ignite.internal.processors.cache.GridDataStorageConfigurationConsistencySelfTest;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicConcurrentUnorderedUpdateAllTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicInvokeTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicLocalInvokeTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicLocalWithStoreInvokeTest;
@@ -215,6 +216,7 @@ public class IgniteCacheTestSuite {
         GridTestUtils.addTestIfNeeded(suite, 
IgniteCacheAtomicWithStoreInvokeTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteCacheAtomicLocalInvokeTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteCacheAtomicLocalWithStoreInvokeTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
IgniteCacheAtomicConcurrentUnorderedUpdateAllTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheTxInvokeTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
CacheEntryProcessorNonSerializableTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
CacheEntryProcessorExternalizableFailedTest.class, ignoredTests);

Reply via email to