This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 95b88ff8fc [ISSUE #8442][RIP-70-3] Extract adaptive lock mechanism
(#8663)
95b88ff8fc is described below
commit 95b88ff8fcfecbd1942e1a35460f7417ee620673
Author: hqbfz <[email protected]>
AuthorDate: Wed Oct 23 19:17:37 2024 +0800
[ISSUE #8442][RIP-70-3] Extract adaptive lock mechanism (#8663)
* extract the adaptive lock
* extract the adaptive lock
* feat(): perfect the adaptive lock
* feat(): perfect the adaptive lock
* Optimized code type
* Optimized code type
* Optimized code type
* fix fail test
* Optimize the adaptive locking mechanism logic
* Optimize the adaptive locking mechanism logic
* feat:Adaptive locking mechanism adjustment
* feat:Adaptive locking mechanism adjustment
* feat:Adaptive locking mechanism adjustment
* Optimize the adaptive locking mechanism logic
* Optimize the adaptive locking mechanism logic
* Optimize the adaptive locking mechanism logic
* feat:Supports the hot activation of ABS locks
* feat:Supports the hot activation of ABS locks
* feat:Supports the hot activation of ABS locks
* feat:Supports the hot activation of ABS locks
* Optimize code style
* Optimize code style
* Optimize code style
* Optimize code style
* Optimize code style
* Optimize code style
* Updated the locking mechanism name
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* delete unused import
* Optimize the logic of switching to spin locks
* Revert "Optimize the logic of switching to spin locks"
This reverts commit 1d7bac5c2fea0531af01d4c57c843084ba4fea61.
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimize the logic of switching to spin locks
* Optimized locking logic
* Optimized locking logic
* Optimized locking logic
* fix test
* fix test
* fix test
* fix test
* Optimize code style
* Optimize code style
* fix test
* fix test
* optimize client rebalancing logic
---------
Co-authored-by: wanghuaiyuan <[email protected]>
---
.../java/org/apache/rocketmq/store/CommitLog.java | 7 +-
.../rocketmq/store/config/MessageStoreConfig.java | 27 +++
.../store/lock/AdaptiveBackOffSpinLock.java | 35 ++++
.../store/lock/AdaptiveBackOffSpinLockImpl.java | 207 +++++++++++++++++++++
.../rocketmq/store/lock/BackOffReentrantLock.java | 33 ++++
.../rocketmq/store/lock/BackOffSpinLock.java | 110 +++++++++++
.../rocketmq/store/lock/AdaptiveLockTest.java | 86 +++++++++
7 files changed, 504 insertions(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 153215c98a..63022520e2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -62,6 +62,7 @@ import
org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.exception.StoreException;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+import org.apache.rocketmq.store.lock.AdaptiveBackOffSpinLockImpl;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.util.LibC;
import org.rocksdb.RocksDBException;
@@ -130,7 +131,11 @@ public class CommitLog implements Swappable {
return new
PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig());
}
};
- this.putMessageLock =
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new
PutMessageReentrantLock() : new PutMessageSpinLock();
+
+ PutMessageLock adaptiveBackOffSpinLock = new
AdaptiveBackOffSpinLockImpl();
+
+ this.putMessageLock =
messageStore.getMessageStoreConfig().getUseABSLock() ? adaptiveBackOffSpinLock :
+
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new
PutMessageReentrantLock() : new PutMessageSpinLock();
this.flushDiskWatcher = new FlushDiskWatcher();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 8effe35bab..e31c03dd22 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -445,6 +445,17 @@ public class MessageStoreConfig {
*/
private String bottomMostCompressionTypeForConsumeQueueStore = "zstd";
+ /**
+ * Spin number in the retreat strategy of spin lock
+ * Default is 1000
+ */
+ private int spinLockCollisionRetreatOptimalDegree = 1000;
+
+ /**
+ * Use AdaptiveBackOffLock
+ **/
+ private boolean useABSLock = false;
+
public boolean isRocksdbCQDoubleWriteEnable() {
return rocksdbCQDoubleWriteEnable;
}
@@ -1898,4 +1909,20 @@ public class MessageStoreConfig {
public void setBottomMostCompressionTypeForConsumeQueueStore(String
bottomMostCompressionTypeForConsumeQueueStore) {
this.bottomMostCompressionTypeForConsumeQueueStore =
bottomMostCompressionTypeForConsumeQueueStore;
}
+
+ public int getSpinLockCollisionRetreatOptimalDegree() {
+ return spinLockCollisionRetreatOptimalDegree;
+ }
+
+ public void setSpinLockCollisionRetreatOptimalDegree(int
spinLockCollisionRetreatOptimalDegree) {
+ this.spinLockCollisionRetreatOptimalDegree =
spinLockCollisionRetreatOptimalDegree;
+ }
+
+ public void setUseABSLock(boolean useABSLock) {
+ this.useABSLock = useABSLock;
+ }
+
+ public boolean getUseABSLock() {
+ return useABSLock;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLock.java
b/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLock.java
new file mode 100644
index 0000000000..96200bcc15
--- /dev/null
+++
b/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLock.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.lock;
+
+import org.apache.rocketmq.store.PutMessageLock;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+public interface AdaptiveBackOffSpinLock extends PutMessageLock {
+ /**
+ * Configuration update
+ * @param messageStoreConfig
+ */
+ default void update(MessageStoreConfig messageStoreConfig) {
+ }
+
+ /**
+ * Locking mechanism switching
+ */
+ default void swap() {
+ }
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLockImpl.java
b/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLockImpl.java
new file mode 100644
index 0000000000..b4abb08271
--- /dev/null
+++
b/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLockImpl.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.lock;
+
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AdaptiveBackOffSpinLockImpl implements AdaptiveBackOffSpinLock {
+ private AdaptiveBackOffSpinLock adaptiveLock;
+ //state
+ private AtomicBoolean state = new AtomicBoolean(true);
+
+ // Used to determine the switchover between a mutex lock and a spin lock
+ private final static float SWAP_SPIN_LOCK_RATIO = 0.8f;
+
+ // It is used to adjust the spin number K of the escape spin lock
+ // When (retreat number / TPS) <= (1 / BASE_SWAP_ADAPTIVE_RATIO *
SPIN_LOCK_ADAPTIVE_RATIO), K is decreased
+ private final static int SPIN_LOCK_ADAPTIVE_RATIO = 4;
+
+ // It is used to adjust the spin number K of the escape spin lock
+ // When (retreat number / TPS) >= (1 / BASE_SWAP_ADAPTIVE_RATIO), K is
increased
+ private final static int BASE_SWAP_LOCK_RATIO = 320;
+
+ private final static String BACK_OFF_SPIN_LOCK = "SpinLock";
+
+ private final static String REENTRANT_LOCK = "ReentrantLock";
+
+ private Map<String, AdaptiveBackOffSpinLock> locks;
+
+ private final List<AtomicInteger> tpsTable;
+
+ private final List<Map<Thread, Byte>> threadTable;
+
+ private int swapCriticalPoint;
+
+ private AtomicInteger currentThreadNum = new AtomicInteger(0);
+
+ private AtomicBoolean isOpen = new AtomicBoolean(true);
+
+ public AdaptiveBackOffSpinLockImpl() {
+ this.locks = new HashMap<>();
+ this.locks.put(REENTRANT_LOCK, new BackOffReentrantLock());
+ this.locks.put(BACK_OFF_SPIN_LOCK, new BackOffSpinLock());
+
+ this.threadTable = new ArrayList<>(2);
+ this.threadTable.add(new ConcurrentHashMap<>());
+ this.threadTable.add(new ConcurrentHashMap<>());
+
+ this.tpsTable = new ArrayList<>(2);
+ this.tpsTable.add(new AtomicInteger(0));
+ this.tpsTable.add(new AtomicInteger(0));
+
+ adaptiveLock = this.locks.get(BACK_OFF_SPIN_LOCK);
+ }
+
+ @Override
+ public void lock() {
+ int slot = LocalTime.now().getSecond() % 2;
+ this.threadTable.get(slot).putIfAbsent(Thread.currentThread(),
Byte.MAX_VALUE);
+ this.tpsTable.get(slot).getAndIncrement();
+ boolean state;
+ do {
+ state = this.state.get();
+ } while (!state);
+
+ currentThreadNum.incrementAndGet();
+ this.adaptiveLock.lock();
+ }
+
+ @Override
+ public void unlock() {
+ this.adaptiveLock.unlock();
+ currentThreadNum.decrementAndGet();
+ if (isOpen.get()) {
+ swap();
+ }
+ }
+
+ @Override
+ public void update(MessageStoreConfig messageStoreConfig) {
+ this.adaptiveLock.update(messageStoreConfig);
+ }
+
+ @Override
+ public void swap() {
+ if (!this.state.get()) {
+ return;
+ }
+ boolean needSwap = false;
+ int slot = 1 - LocalTime.now().getSecond() % 2;
+ int tps = this.tpsTable.get(slot).get() + 1;
+ int threadNum = this.threadTable.get(slot).size();
+ this.tpsTable.get(slot).set(-1);
+ this.threadTable.get(slot).clear();
+ if (tps == 0) {
+ return;
+ }
+
+ if (this.adaptiveLock instanceof BackOffSpinLock) {
+ BackOffSpinLock lock = (BackOffSpinLock) this.adaptiveLock;
+ // Avoid frequent adjustment of K, and make a reasonable range
through experiments
+ // reasonable range : (retreat number / TPS) > (1 /
BASE_SWAP_ADAPTIVE_RATIO * SPIN_LOCK_ADAPTIVE_RATIO) &&
+ // (retreat number / TPS) < (1 / BASE_SWAP_ADAPTIVE_RATIO)
+ if (lock.getNumberOfRetreat(slot) * BASE_SWAP_LOCK_RATIO >= tps) {
+ if (lock.isAdapt()) {
+ lock.adapt(true);
+ } else {
+ // It is used to switch between mutex lock and spin lock
+ this.swapCriticalPoint = tps * threadNum;
+ needSwap = true;
+ }
+ } else if (lock.getNumberOfRetreat(slot) * BASE_SWAP_LOCK_RATIO *
SPIN_LOCK_ADAPTIVE_RATIO <= tps) {
+ lock.adapt(false);
+ }
+ lock.setNumberOfRetreat(slot, 0);
+ } else {
+ if (tps * threadNum <= this.swapCriticalPoint *
SWAP_SPIN_LOCK_RATIO) {
+ needSwap = true;
+ }
+ }
+
+ if (needSwap) {
+ if (this.state.compareAndSet(true, false)) {
+ // Ensures that no threads are in contention locks as well as
in critical zones
+ int currentThreadNum;
+ do {
+ currentThreadNum = this.currentThreadNum.get();
+ } while (currentThreadNum != 0);
+
+ try {
+ if (this.adaptiveLock instanceof BackOffSpinLock) {
+ this.adaptiveLock = this.locks.get(REENTRANT_LOCK);
+ } else {
+ this.adaptiveLock = this.locks.get(BACK_OFF_SPIN_LOCK);
+ ((BackOffSpinLock) this.adaptiveLock).adapt(false);
+ }
+ } catch (Exception e) {
+ //ignore
+ } finally {
+ this.state.compareAndSet(false, true);
+ }
+ }
+ }
+ }
+
+ public List<AdaptiveBackOffSpinLock> getLocks() {
+ return (List<AdaptiveBackOffSpinLock>) this.locks.values();
+ }
+
+ public void setLocks(Map<String, AdaptiveBackOffSpinLock> locks) {
+ this.locks = locks;
+ }
+
+ public boolean getState() {
+ return this.state.get();
+ }
+
+ public void setState(boolean state) {
+ this.state.set(state);
+ }
+
+ public AdaptiveBackOffSpinLock getAdaptiveLock() {
+ return adaptiveLock;
+ }
+
+ public List<AtomicInteger> getTpsTable() {
+ return tpsTable;
+ }
+
+ public void setSwapCriticalPoint(int swapCriticalPoint) {
+ this.swapCriticalPoint = swapCriticalPoint;
+ }
+
+ public int getSwapCriticalPoint() {
+ return swapCriticalPoint;
+ }
+
+ public boolean isOpen() {
+ return this.isOpen.get();
+ }
+
+ public void setOpen(boolean open) {
+ this.isOpen.set(open);
+ }
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/lock/BackOffReentrantLock.java
b/store/src/main/java/org/apache/rocketmq/store/lock/BackOffReentrantLock.java
new file mode 100644
index 0000000000..90e416419b
--- /dev/null
+++
b/store/src/main/java/org/apache/rocketmq/store/lock/BackOffReentrantLock.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.lock;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+public class BackOffReentrantLock implements AdaptiveBackOffSpinLock {
+ private ReentrantLock putMessageNormalLock = new ReentrantLock(); //
NonfairSync
+
+ @Override
+ public void lock() {
+ putMessageNormalLock.lock();
+ }
+
+ @Override
+ public void unlock() {
+ putMessageNormalLock.unlock();
+ }
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/lock/BackOffSpinLock.java
b/store/src/main/java/org/apache/rocketmq/store/lock/BackOffSpinLock.java
new file mode 100644
index 0000000000..f754970a05
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/lock/BackOffSpinLock.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.lock;
+
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class BackOffSpinLock implements AdaptiveBackOffSpinLock {
+
+ private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
+
+ private int optimalDegree;
+
+ private final static int INITIAL_DEGREE = 1000;
+
+ private final static int MAX_OPTIMAL_DEGREE = 10000;
+
+ private final List<AtomicInteger> numberOfRetreat;
+
+ public BackOffSpinLock() {
+ this.optimalDegree = INITIAL_DEGREE;
+
+ numberOfRetreat = new ArrayList<>(2);
+ numberOfRetreat.add(new AtomicInteger(0));
+ numberOfRetreat.add(new AtomicInteger(0));
+ }
+
+ @Override
+ public void lock() {
+ int spinDegree = this.optimalDegree;
+ while (true) {
+ for (int i = 0; i < spinDegree; i++) {
+ if (this.putMessageSpinLock.compareAndSet(true, false)) {
+ return;
+ }
+ }
+ numberOfRetreat.get(LocalTime.now().getSecond() %
2).getAndIncrement();
+ try {
+ Thread.sleep(0);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void unlock() {
+ this.putMessageSpinLock.compareAndSet(false, true);
+ }
+
+ @Override
+ public void update(MessageStoreConfig messageStoreConfig) {
+ this.optimalDegree =
messageStoreConfig.getSpinLockCollisionRetreatOptimalDegree();
+ }
+
+ public int getOptimalDegree() {
+ return this.optimalDegree;
+ }
+
+ public void setOptimalDegree(int optimalDegree) {
+ this.optimalDegree = optimalDegree;
+ }
+
+ public boolean isAdapt() {
+ return optimalDegree < MAX_OPTIMAL_DEGREE;
+ }
+
+ public synchronized void adapt(boolean isRise) {
+ if (isRise) {
+ if (optimalDegree * 2 <= MAX_OPTIMAL_DEGREE) {
+ optimalDegree *= 2;
+ } else {
+ if (optimalDegree + INITIAL_DEGREE <= MAX_OPTIMAL_DEGREE) {
+ optimalDegree += INITIAL_DEGREE;
+ }
+ }
+ } else {
+ if (optimalDegree >= 2 * INITIAL_DEGREE) {
+ optimalDegree -= INITIAL_DEGREE;
+ }
+ }
+ }
+
+ public int getNumberOfRetreat(int pos) {
+ return numberOfRetreat.get(pos).get();
+ }
+
+ public void setNumberOfRetreat(int pos, int size) {
+ this.numberOfRetreat.get(pos).set(size);
+ }
+}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/lock/AdaptiveLockTest.java
b/store/src/test/java/org/apache/rocketmq/store/lock/AdaptiveLockTest.java
new file mode 100644
index 0000000000..ac1b3c60cc
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/lock/AdaptiveLockTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.lock;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AdaptiveLockTest {
+
+ AdaptiveBackOffSpinLockImpl adaptiveLock;
+
+ @Before
+ public void init() {
+ adaptiveLock = new AdaptiveBackOffSpinLockImpl();
+ }
+
+ @Test
+ public void testAdaptiveLock() throws InterruptedException {
+ assertTrue(adaptiveLock.getAdaptiveLock() instanceof BackOffSpinLock);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ adaptiveLock.lock();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ adaptiveLock.lock();
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ adaptiveLock.unlock();
+ countDownLatch.countDown();
+ }
+ }).start();
+ Thread.sleep(1000);
+ adaptiveLock.unlock();
+ assertEquals(2000, ((BackOffSpinLock)
adaptiveLock.getAdaptiveLock()).getOptimalDegree());
+ countDownLatch.await();
+
+ for (int i = 0; i <= 5; i++) {
+ CountDownLatch countDownLatch1 = new CountDownLatch(1);
+ adaptiveLock.lock();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ adaptiveLock.lock();
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ adaptiveLock.unlock();
+ countDownLatch1.countDown();
+ }
+ }).start();
+ Thread.sleep(1000);
+ adaptiveLock.unlock();
+ countDownLatch1.await();
+ }
+ assertTrue(adaptiveLock.getAdaptiveLock() instanceof
BackOffReentrantLock);
+
+ adaptiveLock.lock();
+ Thread.sleep(1000);
+ adaptiveLock.unlock();
+ assertTrue(adaptiveLock.getAdaptiveLock() instanceof BackOffSpinLock);
+ }
+}