This is an automated email from the ASF dual-hosted git repository.

rainyu pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new dcb0db7f33 fix(cluster): During the service provider's release period, 
concurrent read routes from consumers were rejected (#15883)
dcb0db7f33 is described below

commit dcb0db7f33a9618d5ea78f3bce9995702a5a9a42
Author: Joile Mike <[email protected]>
AuthorDate: Mon Jan 12 00:00:09 2026 +0800

    fix(cluster): During the service provider's release period, concurrent read 
routes from consumers were rejected (#15883)
    
    * fix(cluster): Changing invokerRefreshLock from ReentrantLock to 
ReentrantReadWriteLock avoids concurrency issues, and using 
invokerRefreshReadLock avoids lock blocking during high concurrency reads #15881
    
    * Formatting
    
    * Fix the issues raised by Copilot
    
    * exception message
    
    * test case
    
    * use fair lock
    
    ---------
    
    Co-authored-by: wangwei <[email protected]>
---
 .../rpc/cluster/directory/AbstractDirectory.java   |  66 +++--
 .../AbstractDirectoryConcurrencyTest.java          | 276 +++++++++++++++++++++
 2 files changed, 319 insertions(+), 23 deletions(-)

diff --git 
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
 
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
index be68810f4a..ab0b44e3eb 100644
--- 
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
+++ 
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
@@ -58,7 +58,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
@@ -125,7 +125,10 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     private volatile ScheduledFuture<?> connectivityCheckFuture;
 
-    private final ReentrantLock invokerRefreshLock = new ReentrantLock();
+    private final ReentrantReadWriteLock invokerRefreshLock = new 
ReentrantReadWriteLock(true);
+
+    private final ReentrantReadWriteLock.ReadLock invokerRefreshReadLock = 
invokerRefreshLock.readLock();
+    private final ReentrantReadWriteLock.WriteLock invokerRefreshWriteLock = 
invokerRefreshLock.writeLock();
 
     /**
      * The max count of invokers for each reconnect task select to try to 
reconnect.
@@ -208,27 +211,41 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
         BitList<Invoker<T>> availableInvokers;
         SingleRouterChain<T> singleChain = null;
         try {
+            if (routerChain != null) {
+                routerChain.getLock().readLock().lock();
+            }
+            boolean lockAcquired = false;
             try {
-                if (routerChain != null) {
-                    routerChain.getLock().readLock().lock();
+                if (!invokerRefreshReadLock.tryLock(LockUtils.DEFAULT_TIMEOUT, 
TimeUnit.MILLISECONDS)) {
+                    throw new RpcException(
+                            "Failed to acquire read lock on invokerRefreshLock 
within timeout. " + "Timeout: "
+                                    + LockUtils.DEFAULT_TIMEOUT + "ms, " + 
"Lock state: [readLockHeld="
+                                    + invokerRefreshLock.getReadLockCount() + 
", writeLockHeld="
+                                    + invokerRefreshLock.isWriteLocked() + ", 
writeLockHeldByCurrentThread="
+                                    + 
invokerRefreshLock.isWriteLockedByCurrentThread() + "], Service: "
+                                    + getConsumerUrl().getServiceKey());
                 }
+                lockAcquired = true;
                 // use clone to avoid being modified at doList().
                 if (invokersInitialized) {
                     availableInvokers = validInvokers.clone();
                 } else {
                     availableInvokers = invokers.clone();
                 }
-
-                if (routerChain != null) {
-                    singleChain = routerChain.getSingleChain(getConsumerUrl(), 
availableInvokers, invocation);
-                    singleChain.getLock().readLock().lock();
-                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RpcException(
+                        "Interrupted while acquiring read lock for invoker 
access, cause: " + e.getMessage(), e);
             } finally {
-                if (routerChain != null) {
-                    routerChain.getLock().readLock().unlock();
+                if (lockAcquired) {
+                    invokerRefreshReadLock.unlock();
                 }
             }
 
+            if (routerChain != null) {
+                singleChain = routerChain.getSingleChain(getConsumerUrl(), 
availableInvokers, invocation);
+                singleChain.getLock().readLock().lock();
+            }
             List<Invoker<T>> routedResult = doList(singleChain, 
availableInvokers, invocation);
             if (routedResult.isEmpty()) {
                 // 2-2 - No provider available.
@@ -249,6 +266,9 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
             if (singleChain != null) {
                 singleChain.getLock().readLock().unlock();
             }
+            if (routerChain != null) {
+                routerChain.getLock().readLock().unlock();
+            }
         }
     }
 
@@ -298,7 +318,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     @Override
     public void addInvalidateInvoker(Invoker<T> invoker) {
-        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+        LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, 
() -> {
             // 1. remove this invoker from validInvokers list, this invoker 
will not be listed in the next time
             if (removeValidInvoker(invoker)) {
                 // 2. add this invoker to reconnect list
@@ -329,7 +349,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
                             // 1. pick invokers from invokersToReconnect
                             // limit max reconnectTaskTryCount, prevent this 
task hang up all the connectivityExecutor
                             // for long time
-                            LockUtils.safeLock(invokerRefreshLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
+                            LockUtils.safeLock(invokerRefreshWriteLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
                                 if (invokersToReconnect.size() < 
reconnectTaskTryCount) {
                                     invokersToTry.addAll(invokersToReconnect);
                                 } else {
@@ -348,7 +368,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
                             // 2. try to check the invoker's status
                             for (Invoker<T> invoker : invokersToTry) {
                                 AtomicBoolean invokerExist = new 
AtomicBoolean(false);
-                                LockUtils.safeLock(invokerRefreshLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
+                                LockUtils.safeLock(invokerRefreshWriteLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
                                     
invokerExist.set(invokers.contains(invoker));
                                 });
                                 // Should not lock here, `invoker.isAvailable` 
may need some time to check
@@ -362,7 +382,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
                             }
 
                             // 3. recover valid invoker
-                            LockUtils.safeLock(invokerRefreshLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
+                            LockUtils.safeLock(invokerRefreshWriteLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
                                 for (Invoker<T> tInvoker : needDeleteList) {
                                     if (invokers.contains(tInvoker)) {
                                         addValidInvoker(tInvoker);
@@ -388,7 +408,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
                         }
 
                         // 4. submit new task if it has more to recover
-                        LockUtils.safeLock(invokerRefreshLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
+                        LockUtils.safeLock(invokerRefreshWriteLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
                             if (!invokersToReconnect.isEmpty()) {
                                 checkConnectivity();
                             }
@@ -411,7 +431,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
      * 4. all the invokers disappeared from total invokers should be removed 
in the disabled invokers list
      */
     public void refreshInvoker() {
-        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+        LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, 
() -> {
             if (invokersInitialized) {
                 refreshInvokerInternal();
             }
@@ -445,7 +465,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     @Override
     public void addDisabledInvoker(Invoker<T> invoker) {
-        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+        LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, 
() -> {
             if (invokers.contains(invoker)) {
                 disabledInvokers.add(invoker);
                 removeValidInvoker(invoker);
@@ -458,7 +478,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     @Override
     public void recoverDisabledInvoker(Invoker<T> invoker) {
-        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+        LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, 
() -> {
             if (disabledInvokers.remove(invoker)) {
                 try {
                     addValidInvoker(invoker);
@@ -526,7 +546,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
     }
 
     protected void setInvokers(BitList<Invoker<T>> invokers) {
-        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+        LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, 
() -> {
             this.invokers = invokers;
             refreshInvokerInternal();
             this.invokersInitialized = true;
@@ -538,7 +558,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     protected void destroyInvokers() {
         // set empty instead of clearing to support concurrent access.
-        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+        LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, 
() -> {
             this.invokers = BitList.emptyList();
             this.validInvokers = BitList.emptyList();
             this.invokersInitialized = false;
@@ -547,7 +567,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     private boolean addValidInvoker(Invoker<T> invoker) {
         AtomicBoolean result = new AtomicBoolean(false);
-        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+        LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, 
() -> {
             result.set(this.validInvokers.add(invoker));
         });
         MetricsEventBus.publish(
@@ -557,7 +577,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     private boolean removeValidInvoker(Invoker<T> invoker) {
         AtomicBoolean result = new AtomicBoolean(false);
-        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+        LockUtils.safeLock(invokerRefreshWriteLock, LockUtils.DEFAULT_TIMEOUT, 
() -> {
             result.set(this.validInvokers.remove(invoker));
         });
         MetricsEventBus.publish(
diff --git 
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java
 
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java
new file mode 100644
index 0000000000..0840e1bddb
--- /dev/null
+++ 
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectoryConcurrencyTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.directory;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.RouterChain;
+import org.apache.dubbo.rpc.cluster.SingleRouterChain;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.Mockito.mock;
+
+class AbstractDirectoryConcurrencyTest {
+
+    private TestDirectory directory;
+    private URL url;
+    private ExecutorService executor;
+
+    @BeforeEach
+    void setUp() {
+        url = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + 
":20880/com.foo.BarService");
+        directory = new TestDirectory(url);
+        executor = Executors.newFixedThreadPool(10);
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (directory != null) {
+            directory.destroy();
+        }
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    void testMultipleReadLocks() throws InterruptedException {
+        int threadCount = 5;
+        CountDownLatch latch = new CountDownLatch(1);
+        CountDownLatch doneLatch = new CountDownLatch(threadCount);
+        AtomicBoolean failed = new AtomicBoolean(false);
+
+        // Setup the directory with a slow list implementation to simulate 
work holding the read lock
+        directory.setListAction(() -> {
+            try {
+                // Wait for the latch to ensure all threads are in doList
+                latch.await(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        });
+
+        for (int i = 0; i < threadCount; i++) {
+            executor.submit(() -> {
+                try {
+                    directory.list(mock(Invocation.class));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    failed.set(true);
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        // Give threads time to start and acquire read lock
+        Thread.sleep(100);
+        // Release the latch, letting them proceed
+        latch.countDown();
+
+        Assertions.assertTrue(doneLatch.await(5, TimeUnit.SECONDS), "All list 
calls should complete");
+        Assertions.assertFalse(failed.get(), "No exceptions should occur 
during concurrent reads");
+    }
+
+    @Test
+    void testWriteBlocksRead() throws InterruptedException {
+        CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1);
+        CountDownLatch releaseWriteLockLatch = new CountDownLatch(1);
+        AtomicReference<Boolean> readBlocked = new AtomicReference<>(false);
+
+        // Thread to hold write lock
+        executor.submit(() -> {
+            directory.simulateWriteLock(writeLockAcquiredLatch, 
releaseWriteLockLatch);
+        });
+
+        // Wait for write lock to be acquired
+        Assertions.assertTrue(writeLockAcquiredLatch.await(5, 
TimeUnit.SECONDS));
+
+        // Try to read in another thread
+        Future<?> readFuture = executor.submit(() -> {
+            long start = System.currentTimeMillis();
+            directory.list(mock(Invocation.class));
+            long duration = System.currentTimeMillis() - start;
+            // If duration is > 100ms, we assume it was blocked
+            readBlocked.set(duration >= 100);
+        });
+
+        // Sleep to ensure read thread tries to acquire lock and blocks
+        Thread.sleep(200);
+
+        // Release write lock
+        releaseWriteLockLatch.countDown();
+
+        try {
+            readFuture.get(5, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            Assertions.fail("Read execution failed");
+        }
+
+        Assertions.assertTrue(readBlocked.get(), "Read operation should be 
blocked by write lock");
+    }
+
+    @Test
+    void testConcurrentReadAndWrite() throws InterruptedException {
+        int readThreads = 10;
+        int writeThreads = 2;
+        int iterations = 100;
+        CountDownLatch doneLatch = new CountDownLatch(readThreads + 
writeThreads);
+        AtomicBoolean failed = new AtomicBoolean(false);
+
+        directory.setListAction(() -> {
+            // Simulate some work
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {
+            }
+        });
+
+        // Start read threads
+        for (int i = 0; i < readThreads; i++) {
+            executor.submit(() -> {
+                try {
+                    for (int j = 0; j < iterations; j++) {
+                        directory.list(mock(Invocation.class));
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    failed.set(true);
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        // Start write threads
+        for (int i = 0; i < writeThreads; i++) {
+            executor.submit(() -> {
+                try {
+                    for (int j = 0; j < iterations; j++) {
+                        // Use setInvokers to trigger write lock
+                        directory.setInvokers(new 
BitList<>(Collections.emptyList()));
+                        Thread.sleep(2);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    failed.set(true);
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        Assertions.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "All 
operations should complete");
+        Assertions.assertFalse(failed.get(), "No exceptions should occur 
during concurrent read/write");
+    }
+
+    // Helper class to expose protected methods and hook into list()
+    static class TestDirectory extends AbstractDirectory<Object> {
+        private Runnable listAction = () -> {};
+
+        public TestDirectory(URL url) {
+            super(url);
+            // Initialize with empty router chain to avoid NPE
+            setRouterChain(RouterChain.buildChain(Object.class, url));
+        }
+
+        public void setListAction(Runnable listAction) {
+            this.listAction = listAction;
+        }
+
+        @Override
+        public Class<Object> getInterface() {
+            return Object.class;
+        }
+
+        @Override
+        public List<Invoker<Object>> getAllInvokers() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public boolean isAvailable() {
+            return true;
+        }
+
+        @Override
+        protected List<Invoker<Object>> doList(
+                SingleRouterChain<Object> singleRouterChain, 
BitList<Invoker<Object>> invokers, Invocation invocation)
+                throws RpcException {
+            listAction.run();
+            return Collections.emptyList();
+        }
+
+        // Helper to simulate holding write lock
+        public void simulateWriteLock(CountDownLatch acquired, CountDownLatch 
release) {
+            // We use refreshInvoker to acquire write lock, but we need to 
inject our blocking logic
+            // Since we can't easily inject into refreshInvoker without 
complex mocking,
+            // we'll use a trick: override setInvokers logic? No, setInvokers 
uses lock internally.
+            // But we can use the fact that addRouters/etc might not use the 
same lock? No.
+            // We can't access the lock directly.
+            // However, we can use 'addInvalidateInvoker' or similar if we can 
hook into it.
+
+            // Actually, we can use a method that holds the lock and calls 
something we can override?
+            // AbstractDirectory doesn't call many overridable methods inside 
the lock.
+            // refreshInvoker calls refreshInvokerInternal (private).
+
+            // Wait, we can use reflection to get the lock and lock it 
manually for this test helper.
+            try {
+                java.lang.reflect.Field lockField = 
AbstractDirectory.class.getDeclaredField("invokerRefreshLock");
+                lockField.setAccessible(true);
+                java.util.concurrent.locks.ReadWriteLock lock =
+                        (java.util.concurrent.locks.ReadWriteLock) 
lockField.get(this);
+
+                lock.writeLock().lock();
+                try {
+                    acquired.countDown();
+                    release.await();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    lock.writeLock().unlock();
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        // Expose setInvokers for test
+        @Override
+        public void setInvokers(BitList<Invoker<Object>> invokers) {
+            super.setInvokers(invokers);
+        }
+    }
+}

Reply via email to