This is an automated email from the ASF dual-hosted git repository.
rainyu pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new dcb0db7f33 fix(cluster): During the service provider's release period,
concurrent read routes from consumers were rejected (#15883)
dcb0db7f33 is described below
commit dcb0db7f33a9618d5ea78f3bce9995702a5a9a42
Author: Joile Mike <[email protected]>
AuthorDate: Mon Jan 12 00:00:09 2026 +0800
fix(cluster): During the service provider's release period, concurrent read
routes from consumers were rejected (#15883)
* fix(cluster): Changing invokerRefreshLock from ReentrantLock to
ReentrantReadWriteLock avoids concurrency issues, and using
invokerRefreshReadLock avoids lock blocking during high concurrency reads #15881
* Formatting
* Fix the issues raised by Copilot
* exception message
* test case
* use fair lock
---------
Co-authored-by: wangwei <[email protected]>
---
.../rpc/cluster/directory/AbstractDirectory.java | 66 +++--
.../AbstractDirectoryConcurrencyTest.java | 276 +++++++++++++++++++++
2 files changed, 319 insertions(+), 23 deletions(-)
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
index be68810f4a..ab0b44e3eb 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
@@ -58,7 +58,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
@@ -125,7 +125,10 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
private volatile ScheduledFuture<?> connectivityCheckFuture;
- private final ReentrantLock invokerRefreshLock = new ReentrantLock();
+ private final ReentrantReadWriteLock invokerRefreshLock = new
ReentrantReadWriteLock(true);
+
+ private final ReentrantReadWriteLock.ReadLock invokerRefreshReadLock =
invokerRefreshLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock invokerRefreshWriteLock =
invokerRefreshLock.writeLock();
/**
* The max count of invokers for each reconnect task select to try to
reconnect.
@@ -208,27 +211,41 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
BitList<Invoker<T>> availableInvokers;
SingleRouterChain<T> singleChain = null;
try {
+ if (routerChain != null) {
+ routerChain.getLock().readLock().lock();
+ }
+ boolean lockAcquired = false;
try {
- if (routerChain != null) {
- routerChain.getLock().readLock().lock();
+ if (!invokerRefreshReadLock.tryLock(LockUtils.DEFAULT_TIMEOUT,
TimeUnit.MILLISECONDS)) {
+ throw new RpcException(
+ "Failed to acquire read lock on invokerRefreshLock
within timeout. " + "Timeout: "
+ + LockUtils.DEFAULT_TIMEOUT + "ms, " +
"Lock state: [readLockHeld="
+ + invokerRefreshLock.getReadLockCount() +
", writeLockHeld="
+ + invokerRefreshLock.isWriteLocked() + ",
writeLockHeldByCurrentThread="
+ +
invokerRefreshLock.isWriteLockedByCurrentThread() + "], Service: "
+ + getConsumerUrl().getServiceKey());
}
+ lockAcquired = true;
// use clone to avoid being modified at doList().
if (invokersInitialized) {
availableInvokers = validInvokers.clone();
} else {
availableInvokers = invokers.clone();
}
-
- if (routerChain != null) {
- singleChain = routerChain.getSingleChain(getConsumerUrl(),
availableInvokers, invocation);
- singleChain.getLock().readLock().lock();
- }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RpcException(
+ "Interrupted while acquiring read lock for invoker
access, cause: " + e.getMessage(), e);
} finally {
- if (routerChain != null) {
- routerChain.getLock().readLock().unlock();
+ if (lockAcquired) {
+ invokerRefreshReadLock.unlock();
}
}
+ if (routerChain != null) {
+ singleChain = routerChain.getSingleChain(getConsumerUrl(),
availableInvokers, invocation);
+ singleChain.getLock().readLock().lock();
+ }
List<Invoker<T>> routedResult = doList(singleChain,
availableInvokers, invocation);
if (routedResult.isEmpty()) {
// 2-2 - No provider available.
@@ -249,6 +266,9 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
if (singleChain != null) {
singleChain.getLock().readLock().unlock();
}
+ if (routerChain != null) {
+ routerChain.getLock().readLock().unlock();
+ }
}
}
@@ -298,7 +318,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
@Override
public void addInvalidateInvoker(Invoker<T> invoker) {
- LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT,
() -> {
// 1. remove this invoker from validInvokers list, this invoker
will not be listed in the next time
if (removeValidInvoker(invoker)) {
// 2. add this invoker to reconnect list
@@ -329,7 +349,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
// 1. pick invokers from invokersToReconnect
// limit max reconnectTaskTryCount, prevent this
task hang up all the connectivityExecutor
// for long time
- LockUtils.safeLock(invokerRefreshLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
+ LockUtils.safeLock(invokerRefreshWriteLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
if (invokersToReconnect.size() <
reconnectTaskTryCount) {
invokersToTry.addAll(invokersToReconnect);
} else {
@@ -348,7 +368,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
// 2. try to check the invoker's status
for (Invoker<T> invoker : invokersToTry) {
AtomicBoolean invokerExist = new
AtomicBoolean(false);
- LockUtils.safeLock(invokerRefreshLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
+ LockUtils.safeLock(invokerRefreshWriteLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
invokerExist.set(invokers.contains(invoker));
});
// Should not lock here, `invoker.isAvailable`
may need some time to check
@@ -362,7 +382,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
}
// 3. recover valid invoker
- LockUtils.safeLock(invokerRefreshLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
+ LockUtils.safeLock(invokerRefreshWriteLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
for (Invoker<T> tInvoker : needDeleteList) {
if (invokers.contains(tInvoker)) {
addValidInvoker(tInvoker);
@@ -388,7 +408,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
}
// 4. submit new task if it has more to recover
- LockUtils.safeLock(invokerRefreshLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
+ LockUtils.safeLock(invokerRefreshWriteLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
if (!invokersToReconnect.isEmpty()) {
checkConnectivity();
}
@@ -411,7 +431,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
* 4. all the invokers disappeared from total invokers should be removed
in the disabled invokers list
*/
public void refreshInvoker() {
- LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT,
() -> {
if (invokersInitialized) {
refreshInvokerInternal();
}
@@ -445,7 +465,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
@Override
public void addDisabledInvoker(Invoker<T> invoker) {
- LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT,
() -> {
if (invokers.contains(invoker)) {
disabledInvokers.add(invoker);
removeValidInvoker(invoker);
@@ -458,7 +478,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
@Override
public void recoverDisabledInvoker(Invoker<T> invoker) {
- LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT,
() -> {
if (disabledInvokers.remove(invoker)) {
try {
addValidInvoker(invoker);
@@ -526,7 +546,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
}
protected void setInvokers(BitList<Invoker<T>> invokers) {
- LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT,
() -> {
this.invokers = invokers;
refreshInvokerInternal();
this.invokersInitialized = true;
@@ -538,7 +558,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
protected void destroyInvokers() {
// set empty instead of clearing to support concurrent access.
- LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT,
() -> {
this.invokers = BitList.emptyList();
this.validInvokers = BitList.emptyList();
this.invokersInitialized = false;
@@ -547,7 +567,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
private boolean addValidInvoker(Invoker<T> invoker) {
AtomicBoolean result = new AtomicBoolean(false);
- LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT,
() -> {
result.set(this.validInvokers.add(invoker));
});
MetricsEventBus.publish(
@@ -557,7 +577,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
private boolean removeValidInvoker(Invoker<T> invoker) {
AtomicBoolean result = new AtomicBoolean(false);
- LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT,
() -> {
result.set(this.validInvokers.remove(invoker));
});
MetricsEventBus.publish(
diff --git
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java
new file mode 100644
index 0000000000..0840e1bddb
--- /dev/null
+++
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.directory;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.RouterChain;
+import org.apache.dubbo.rpc.cluster.SingleRouterChain;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.Mockito.mock;
+
+class AbstractDirectoryConcurrencyTest {
+
+ private TestDirectory directory;
+ private URL url;
+ private ExecutorService executor;
+
+ @BeforeEach
+ void setUp() {
+ url = URL.valueOf("dubbo://" + NetUtils.getLocalHost() +
":20880/com.foo.BarService");
+ directory = new TestDirectory(url);
+ executor = Executors.newFixedThreadPool(10);
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (directory != null) {
+ directory.destroy();
+ }
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ void testMultipleReadLocks() throws InterruptedException {
+ int threadCount = 5;
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(threadCount);
+ AtomicBoolean failed = new AtomicBoolean(false);
+
+ // Setup the directory with a slow list implementation to simulate
work holding the read lock
+ directory.setListAction(() -> {
+ try {
+ // Wait for the latch to ensure all threads are in doList
+ latch.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ directory.list(mock(Invocation.class));
+ } catch (Exception e) {
+ e.printStackTrace();
+ failed.set(true);
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ // Give threads time to start and acquire read lock
+ Thread.sleep(100);
+ // Release the latch, letting them proceed
+ latch.countDown();
+
+ Assertions.assertTrue(doneLatch.await(5, TimeUnit.SECONDS), "All list
calls should complete");
+ Assertions.assertFalse(failed.get(), "No exceptions should occur
during concurrent reads");
+ }
+
+ @Test
+ void testWriteBlocksRead() throws InterruptedException {
+ CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1);
+ CountDownLatch releaseWriteLockLatch = new CountDownLatch(1);
+ AtomicReference<Boolean> readBlocked = new AtomicReference<>(false);
+
+ // Thread to hold write lock
+ executor.submit(() -> {
+ directory.simulateWriteLock(writeLockAcquiredLatch,
releaseWriteLockLatch);
+ });
+
+ // Wait for write lock to be acquired
+ Assertions.assertTrue(writeLockAcquiredLatch.await(5,
TimeUnit.SECONDS));
+
+ // Try to read in another thread
+ Future<?> readFuture = executor.submit(() -> {
+ long start = System.currentTimeMillis();
+ directory.list(mock(Invocation.class));
+ long duration = System.currentTimeMillis() - start;
+ // If duration is > 100ms, we assume it was blocked
+ readBlocked.set(duration >= 100);
+ });
+
+ // Sleep to ensure read thread tries to acquire lock and blocks
+ Thread.sleep(200);
+
+ // Release write lock
+ releaseWriteLockLatch.countDown();
+
+ try {
+ readFuture.get(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ Assertions.fail("Read execution failed");
+ }
+
+ Assertions.assertTrue(readBlocked.get(), "Read operation should be
blocked by write lock");
+ }
+
+ @Test
+ void testConcurrentReadAndWrite() throws InterruptedException {
+ int readThreads = 10;
+ int writeThreads = 2;
+ int iterations = 100;
+ CountDownLatch doneLatch = new CountDownLatch(readThreads +
writeThreads);
+ AtomicBoolean failed = new AtomicBoolean(false);
+
+ directory.setListAction(() -> {
+ // Simulate some work
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ }
+ });
+
+ // Start read threads
+ for (int i = 0; i < readThreads; i++) {
+ executor.submit(() -> {
+ try {
+ for (int j = 0; j < iterations; j++) {
+ directory.list(mock(Invocation.class));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ failed.set(true);
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ // Start write threads
+ for (int i = 0; i < writeThreads; i++) {
+ executor.submit(() -> {
+ try {
+ for (int j = 0; j < iterations; j++) {
+ // Use setInvokers to trigger write lock
+ directory.setInvokers(new
BitList<>(Collections.emptyList()));
+ Thread.sleep(2);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ failed.set(true);
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ Assertions.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "All
operations should complete");
+ Assertions.assertFalse(failed.get(), "No exceptions should occur
during concurrent read/write");
+ }
+
+ // Helper class to expose protected methods and hook into list()
+ static class TestDirectory extends AbstractDirectory<Object> {
+ private Runnable listAction = () -> {};
+
+ public TestDirectory(URL url) {
+ super(url);
+ // Initialize with empty router chain to avoid NPE
+ setRouterChain(RouterChain.buildChain(Object.class, url));
+ }
+
+ public void setListAction(Runnable listAction) {
+ this.listAction = listAction;
+ }
+
+ @Override
+ public Class<Object> getInterface() {
+ return Object.class;
+ }
+
+ @Override
+ public List<Invoker<Object>> getAllInvokers() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ protected List<Invoker<Object>> doList(
+ SingleRouterChain<Object> singleRouterChain,
BitList<Invoker<Object>> invokers, Invocation invocation)
+ throws RpcException {
+ listAction.run();
+ return Collections.emptyList();
+ }
+
+ // Helper to simulate holding write lock
+ public void simulateWriteLock(CountDownLatch acquired, CountDownLatch
release) {
+ // We use refreshInvoker to acquire write lock, but we need to
inject our blocking logic
+ // Since we can't easily inject into refreshInvoker without
complex mocking,
+ // we'll use a trick: override setInvokers logic? No, setInvokers
uses lock internally.
+ // But we can use the fact that addRouters/etc might not use the
same lock? No.
+ // We can't access the lock directly.
+ // However, we can use 'addInvalidateInvoker' or similar if we can
hook into it.
+
+ // Actually, we can use a method that holds the lock and calls
something we can override?
+ // AbstractDirectory doesn't call many overridable methods inside
the lock.
+ // refreshInvoker calls refreshInvokerInternal (private).
+
+ // Wait, we can use reflection to get the lock and lock it
manually for this test helper.
+ try {
+ java.lang.reflect.Field lockField =
AbstractDirectory.class.getDeclaredField("invokerRefreshLock");
+ lockField.setAccessible(true);
+ java.util.concurrent.locks.ReadWriteLock lock =
+ (java.util.concurrent.locks.ReadWriteLock)
lockField.get(this);
+
+ lock.writeLock().lock();
+ try {
+ acquired.countDown();
+ release.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // Expose setInvokers for test
+ @Override
+ public void setInvokers(BitList<Invoker<Object>> invokers) {
+ super.setInvokers(invokers);
+ }
+ }
+}