This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 12ab7e0 IGNITE-12451 Introduce deadlock detection for atomic cache
putAll operations - Fixes #8268.
12ab7e0 is described below
commit 12ab7e0de6cb5a579f7350079aeb53ea8706b0f2
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Oct 28 15:02:27 2020 +0300
IGNITE-12451 Introduce deadlock detection for atomic cache putAll
operations - Fixes #8268.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../processors/cache/LockedEntriesInfo.java | 150 +++++++++++++++++++++
.../distributed/dht/atomic/GridDhtAtomicCache.java | 39 ++----
.../cache/local/atomic/GridLocalAtomicCache.java | 38 ++----
...acheAtomicConcurrentUnorderedUpdateAllTest.java | 133 ++++++++++++++++++
.../testsuites/IgniteCacheMvccTestSuite1.java | 2 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
6 files changed, 309 insertions(+), 55 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LockedEntriesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LockedEntriesInfo.java
new file mode 100644
index 0000000..039c6bd
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LockedEntriesInfo.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Helper class to acquire java level locks on unordered set of entries and
avoid deadlocks.
+ */
+public class LockedEntriesInfo {
+ /** Deadlock detection timeout in milliseconds. */
+ private static final long DEADLOCK_DETECTION_TIMEOUT = 500L;
+
+ /** Locked entries info for each thread. */
+ private final Map<Long, LockedEntries> lockedEntriesPerThread = new
ConcurrentHashMap<>();
+
+ /**
+ * Attempt to lock all provided entries avoiding deadlocks.
+ *
+ * @param entries Entries to lock.
+ * @return {@code True} if entries were successfully locked, {@code false}
if possible deadlock detected or
+ * some entries are obsolete (lock attempt should be retried in this
case).
+ */
+ public boolean tryLockEntries(GridCacheEntryEx[] entries) {
+ long threadId = Thread.currentThread().getId();
+
+ LockedEntries lockedEntries = new LockedEntries(entries);
+
+ lockedEntriesPerThread.put(threadId, lockedEntries);
+
+ boolean wasInterrupted = false;
+
+ try {
+ for (int i = 0; i < entries.length; i++) {
+ GridCacheEntryEx entry = entries[i];
+
+ if (entry == null)
+ continue;
+
+ boolean retry = false;
+
+ while (true) {
+ if (entry.tryLockEntry(DEADLOCK_DETECTION_TIMEOUT))
+ break; // Successfully locked.
+ else {
+ wasInterrupted |= Thread.interrupted(); // Clear
thread interruption flag.
+
+ if (hasLockCollisions(entry, lockedEntries)) {
+ // Possible deadlock detected, unlock all locked
entries and retry again.
+ retry = true;
+
+ break;
+ }
+ // Possible deadlock not detected, just retry lock on
current entry.
+ }
+ }
+
+ if (!retry && entry.obsolete()) {
+ entry.unlockEntry();
+
+ retry = true;
+ }
+
+ if (retry) {
+ lockedEntries.lockedIdx = -1;
+
+ // Unlock all previously locked.
+ for (int j = 0; j < i; j++) {
+ if (entries[j] != null)
+ entries[j].unlockEntry();
+ }
+
+ return false;
+ }
+
+ lockedEntries.lockedIdx = i;
+ }
+
+ return true;
+ }
+ finally {
+ if (wasInterrupted)
+ Thread.currentThread().interrupt();
+
+ // Already acuired all locks or released all locks here, deadlock
is not possible by this thread anymore,
+ // can safely delete locks information.
+ lockedEntriesPerThread.remove(threadId);
+ }
+ }
+
+ /**
+ * @param entry Entry.
+ * @param curLockedEntries Current locked entries info.
+ * @return {@code True} if another thread holds lock for this entry and
started to lock entries earlier.
+ */
+ private boolean hasLockCollisions(GridCacheEntryEx entry, LockedEntries
curLockedEntries) {
+ for (Map.Entry<Long, LockedEntries> other :
lockedEntriesPerThread.entrySet()) {
+ LockedEntries otherLockedEntries = other.getValue();
+
+ if (otherLockedEntries == curLockedEntries ||
otherLockedEntries.ts > curLockedEntries.ts)
+ // Skip current thread and threads started to lock after the
current thread.
+ continue;
+
+ GridCacheEntryEx[] otherThreadLocks = otherLockedEntries.entries;
+
+ int otherThreadLockedIdx = otherLockedEntries.lockedIdx;
+
+ // Visibility guarantees provided by volatile lockedIdx field.
+ for (int i = 0; i <= otherThreadLockedIdx; i++) {
+ if (otherThreadLocks[i] == entry)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /** Per-thread locked entries info. */
+ private static class LockedEntries {
+ /** Timestamp of lock. */
+ private final long ts = System.nanoTime();
+
+ /** Entries to lock. */
+ private final GridCacheEntryEx[] entries;
+
+ /** Current locked entry index. */
+ private volatile int lockedIdx = -1;
+
+ /** */
+ private LockedEntries(GridCacheEntryEx[] entries) {
+ this.entries = entries;
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5249aa9..fec3967 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -65,6 +65,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.LockedEntriesInfo;
import
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -168,6 +169,9 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
}
};
+ /** Locked entries info for each thread. */
+ private final LockedEntriesInfo lockedEntriesInfo = new
LockedEntriesInfo();
+
/** Update reply closure. */
@GridToStringExclude
private UpdateReplyClosure updateReplyClos;
@@ -3111,44 +3115,17 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
}
}
else {
- List<GridDhtCacheEntry> locked = new ArrayList<>(req.size());
+ GridDhtCacheEntry[] locked = new GridDhtCacheEntry[req.size()];
while (true) {
for (int i = 0; i < req.size(); i++) {
GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
- locked.add(entry);
- }
-
- boolean retry = false;
-
- for (int i = 0; i < locked.size(); i++) {
- GridCacheMapEntry entry = locked.get(i);
-
- if (entry == null)
- continue;
-
- entry.lockEntry();
-
- if (entry.obsolete()) {
- // Unlock all locked.
- for (int j = 0; j <= i; j++) {
- if (locked.get(j) != null)
- locked.get(j).unlockEntry();
- }
-
- // Clear entries.
- locked.clear();
-
- // Retry.
- retry = true;
-
- break;
- }
+ locked[i] = entry;
}
- if (!retry)
- return locked;
+ if (lockedEntriesInfo.tryLockEntries(locked))
+ return Arrays.asList(locked);
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 6b2128d..e430a71 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -54,6 +54,7 @@ import
org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.LockedEntriesInfo;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -90,6 +91,9 @@ public class GridLocalAtomicCache<K, V> extends
GridLocalCache<K, V> {
/** */
private GridCachePreloader preldr;
+ /** Locked entries info for each thread. */
+ private final LockedEntriesInfo lockedEntriesInfo = new
LockedEntriesInfo();
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -1476,11 +1480,13 @@ public class GridLocalAtomicCache<K, V> extends
GridLocalCache<K, V> {
* @return Collection of locked entries.
*/
private List<GridCacheEntryEx> lockEntries(Collection<? extends K> keys) {
- List<GridCacheEntryEx> locked = new ArrayList<>(keys.size());
+ GridCacheEntryEx[] locked = new GridCacheEntryEx[keys.size()];
boolean nullKeys = false;
while (true) {
+ int i = 0;
+
for (K key : keys) {
if (key == null) {
nullKeys = true;
@@ -1490,40 +1496,24 @@ public class GridLocalAtomicCache<K, V> extends
GridLocalCache<K, V> {
GridCacheEntryEx entry = entryEx(ctx.toCacheKeyObject(key));
- locked.add(entry);
+ locked[i++] = entry;
}
if (nullKeys)
break;
- for (int i = 0; i < locked.size(); i++) {
- GridCacheEntryEx entry = locked.get(i);
-
- entry.lockEntry();
-
- if (entry.obsolete()) {
- // Unlock all locked.
- for (int j = 0; j <= i; j++)
- locked.get(j).unlockEntry();
-
- // Clear entries.
- locked.clear();
-
- // Retry.
- break;
- }
- }
-
- if (!locked.isEmpty())
- return locked;
+ if (lockedEntriesInfo.tryLockEntries(locked))
+ return Arrays.asList(locked);
}
assert nullKeys;
AffinityTopologyVersion topVer =
ctx.affinity().affinityTopologyVersion();
- for (GridCacheEntryEx entry : locked)
- entry.touch();
+ for (GridCacheEntryEx entry : locked) {
+ if (entry != null)
+ entry.touch();
+ }
throw new NullPointerException("Null key.");
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicConcurrentUnorderedUpdateAllTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicConcurrentUnorderedUpdateAllTest.java
new file mode 100644
index 0000000..6863c949
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicConcurrentUnorderedUpdateAllTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.configuration.Factory;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/** Test concurrent putAll/removeAll operations with unordered set of keys on
atomic caches. */
+@RunWith(Parameterized.class)
+public class IgniteCacheAtomicConcurrentUnorderedUpdateAllTest extends
GridCommonAbstractTest {
+ /** */
+ private static final int NODES_CNT = 3;
+
+ /** */
+ private static final int THREADS_CNT = 20;
+
+ /** */
+ private static final String CACHE_NAME = "test-cache";
+
+ /** */
+ private static final int CACHE_SIZE = 1_000;
+
+ /** Parameters. */
+ @Parameterized.Parameters(name = "cacheMode={0}, writeThrough={1}")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(
+ new Object[] {CacheMode.PARTITIONED, Boolean.FALSE},
+ new Object[] {CacheMode.PARTITIONED, Boolean.TRUE},
+ new Object[] {CacheMode.REPLICATED, Boolean.FALSE},
+ new Object[] {CacheMode.REPLICATED, Boolean.TRUE},
+ new Object[] {CacheMode.LOCAL, Boolean.FALSE},
+ new Object[] {CacheMode.LOCAL, Boolean.TRUE}
+ );
+ }
+
+ /** Cache mode. */
+ @Parameterized.Parameter()
+ public CacheMode cacheMode;
+
+ /** Write through. */
+ @Parameterized.Parameter(1)
+ public Boolean writeThrough;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testConcurrentUpdateAll() throws Exception {
+ Ignite ignite = startGridsMultiThreaded(NODES_CNT);
+
+ Factory<CacheStore<Object, Object>> cacheStoreFactory = writeThrough ?
+ new MapCacheStoreStrategy.MapStoreFactory() : null;
+
+ IgniteCache<Object, Object> cache = ignite.createCache(new
CacheConfiguration<>(CACHE_NAME)
+
.setWriteThrough(writeThrough).setCacheStoreFactory(cacheStoreFactory)
+ .setCacheMode(cacheMode).setAtomicityMode(ATOMIC).setBackups(1));
+
+ CyclicBarrier barrier = new CyclicBarrier(THREADS_CNT);
+
+ AtomicInteger threadCnt = new AtomicInteger();
+
+ GridTestUtils.runMultiThreaded(() -> {
+ int threadIdx = threadCnt.incrementAndGet();
+
+ IgniteCache<Object, Object> cache0 =
grid(ThreadLocalRandom.current().nextInt(NODES_CNT)).cache(CACHE_NAME);
+
+ Map<Object, Object> map = new LinkedHashMap<>();
+
+ if (threadIdx % 2 == 0) {
+ for (int i = 0; i < CACHE_SIZE; i++)
+ map.put(i, i);
+ } else {
+ for (int i = CACHE_SIZE - 1; i >= 0; i--)
+ map.put(i, i);
+ }
+
+ for (int i = 0; i < 20; i++) {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ cache0.putAll(map);
+
+ cache0.removeAll(map.keySet());
+
+ log.info("Thread " + threadIdx + " iteration " + i + "
finished");
+ }
+ }, THREADS_CNT, "update-all-runner");
+
+ assertEquals(0, cache.size());
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
index 52ca648..e0589f7 100755
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite1.java
@@ -63,6 +63,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheReplicatedLocalStore
import org.apache.ignite.internal.processors.cache.GridCacheStopSelfTest;
import
org.apache.ignite.internal.processors.cache.GridCacheTcpClientDiscoveryMultiThreadedTest;
import
org.apache.ignite.internal.processors.cache.GridDataStorageConfigurationConsistencySelfTest;
+import
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicConcurrentUnorderedUpdateAllTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicInvokeTest;
import
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicLocalInvokeTest;
import
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicLocalWithStoreInvokeTest;
@@ -151,6 +152,7 @@ public class IgniteCacheMvccTestSuite1 {
ignoredTests.add(IgniteCacheAtomicWithStoreInvokeTest.class);
ignoredTests.add(IgniteCacheAtomicLocalInvokeTest.class);
ignoredTests.add(IgniteCacheAtomicLocalWithStoreInvokeTest.class);
+
ignoredTests.add(IgniteCacheAtomicConcurrentUnorderedUpdateAllTest.class);
ignoredTests.add(GridCachePartitionedLocalStoreSelfTest.class);
ignoredTests.add(GridCacheReplicatedLocalStoreSelfTest.class);
ignoredTests.add(CacheStoreReadFromBackupTest.class);
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 5876bee..f89759a 100755
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -102,6 +102,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheTtlManagerSelfTest;
import
org.apache.ignite.internal.processors.cache.GridCacheTxPartitionedLocalStoreSelfTest;
import
org.apache.ignite.internal.processors.cache.GridCacheTxUsersAffinityMapperSelfTest;
import
org.apache.ignite.internal.processors.cache.GridDataStorageConfigurationConsistencySelfTest;
+import
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicConcurrentUnorderedUpdateAllTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicInvokeTest;
import
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicLocalInvokeTest;
import
org.apache.ignite.internal.processors.cache.IgniteCacheAtomicLocalWithStoreInvokeTest;
@@ -215,6 +216,7 @@ public class IgniteCacheTestSuite {
GridTestUtils.addTestIfNeeded(suite,
IgniteCacheAtomicWithStoreInvokeTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteCacheAtomicLocalInvokeTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteCacheAtomicLocalWithStoreInvokeTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
IgniteCacheAtomicConcurrentUnorderedUpdateAllTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheTxInvokeTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
CacheEntryProcessorNonSerializableTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
CacheEntryProcessorExternalizableFailedTest.class, ignoredTests);