This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9dba0d56088303aa5a6b3967f1250d3efa45841b
Author: Heesung Sohn <[email protected]>
AuthorDate: Thu Jun 13 12:26:40 2024 -0700

    [fix][broker] Asynchronously return brokerRegistry.lookupAsync when 
checking if broker is active(ExtensibleLoadManagerImpl only) (#22899)
    
    (cherry picked from commit c2702e9bc46c444cbc99f4b64cb453c622b56c26)
---
 .../channel/ServiceUnitStateChannelImpl.java       | 84 ++++++++++++----------
 .../channel/ServiceUnitStateChannelTest.java       | 51 ++++++++++---
 2 files changed, 89 insertions(+), 46 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 76c4c1dc51a..fc806adc0d6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -484,7 +484,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             String serviceUnit,
             ServiceUnitState state,
             Optional<String> owner) {
-        return deferGetOwnerRequest(serviceUnit)
+        return dedupeGetOwnerRequest(serviceUnit)
                 .thenCompose(newOwner -> {
                     if (newOwner == null) {
                         return CompletableFuture.completedFuture(null);
@@ -622,7 +622,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
         EventType eventType = Assign;
         eventCounters.get(eventType).getTotal().incrementAndGet();
-        CompletableFuture<String> getOwnerRequest = 
deferGetOwnerRequest(serviceUnit);
+        CompletableFuture<String> getOwnerRequest = 
dedupeGetOwnerRequest(serviceUnit);
 
         pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker, 
getNextVersionId(serviceUnit)))
                 .whenComplete((__, ex) -> {
@@ -927,44 +927,54 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         return broker.equals(brokerId);
     }
 
-    private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) 
{
+    private CompletableFuture<String> deferGetOwner(String serviceUnit) {
+        var future = new 
CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
+                        TimeUnit.MILLISECONDS)
+                .exceptionally(e -> {
+                    var ownerAfter = getOwner(serviceUnit);
+                    log.warn("{} failed to wait for owner for serviceUnit:{}; 
Trying to "
+                                    + "return the current owner:{}",
+                            brokerId, serviceUnit, ownerAfter, e);
+                    if (ownerAfter == null) {
+                        throw new IllegalStateException(e);
+                    }
+                    return ownerAfter.orElse(null);
+                });
+        if (debug()) {
+            log.info("{} is waiting for owner for serviceUnit:{}", brokerId, 
serviceUnit);
+        }
+        return future;
+    }
+
+    private CompletableFuture<String> dedupeGetOwnerRequest(String 
serviceUnit) {
 
         var requested = new MutableObject<CompletableFuture<String>>();
         try {
-            return getOwnerRequests
-                    .computeIfAbsent(serviceUnit, k -> {
-                        var ownerBefore = getOwner(serviceUnit);
-                        if (ownerBefore != null && ownerBefore.isPresent()) {
-                            // Here, we do a quick active check first with the 
computeIfAbsent lock
-                            
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
-                                    .ifPresent(__ -> requested.setValue(
-                                            
CompletableFuture.completedFuture(ownerBefore.get())));
-
-                            if (requested.getValue() != null) {
-                                return requested.getValue();
-                            }
-                        }
-
-
-                        CompletableFuture<String> future =
-                                new 
CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
-                                                TimeUnit.MILLISECONDS)
-                                        .exceptionally(e -> {
-                                            var ownerAfter = 
getOwner(serviceUnit);
-                                            log.warn("{} failed to wait for 
owner for serviceUnit:{}; Trying to "
-                                                            + "return the 
current owner:{}",
-                                                    brokerId, serviceUnit, 
ownerAfter, e);
-                                            if (ownerAfter == null) {
-                                                throw new 
IllegalStateException(e);
-                                            }
-                                            return ownerAfter.orElse(null);
-                                        });
-                        if (debug()) {
-                            log.info("{} is waiting for owner for 
serviceUnit:{}", brokerId, serviceUnit);
-                        }
-                        requested.setValue(future);
-                        return future;
-                    });
+            return getOwnerRequests.computeIfAbsent(serviceUnit, k -> {
+                var ownerBefore = getOwner(serviceUnit);
+                if (ownerBefore != null && ownerBefore.isPresent()) {
+                    // Here, we do the broker active check first with the 
computeIfAbsent lock
+                    
requested.setValue(brokerRegistry.lookupAsync(ownerBefore.get())
+                            .thenCompose(brokerLookupData -> {
+                                if (brokerLookupData.isPresent()) {
+                                    // The owner broker is active.
+                                    // Immediately return the request.
+                                    return 
CompletableFuture.completedFuture(ownerBefore.get());
+                                } else {
+                                    // The owner broker is inactive.
+                                    // The leader broker should be cleaning up 
the orphan service units.
+                                    // Defer this request til the leader 
notifies the new ownerships.
+                                    return deferGetOwner(serviceUnit);
+                                }
+                            }));
+                } else {
+                    // The owner broker has not been declared yet.
+                    // The ownership should be in the middle of transferring 
or assigning.
+                    // Defer this request til the inflight ownership change is 
complete.
+                    requested.setValue(deferGetOwner(serviceUnit));
+                }
+                return requested.getValue();
+            });
         } finally {
             var future = requested.getValue();
             if (future != null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index dd08400ea85..e060148ceef 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -1620,32 +1620,63 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     @Test(priority = 19)
     public void testActiveGetOwner() throws Exception {
 
-
-        // set the bundle owner is the broker
+        // case 1: the bundle owner is empty
         String broker = brokerId2;
         String bundle = "public/owned/0xfffffff0_0xffffffff";
+        overrideTableViews(bundle, null);
+        assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get());
+
+        // case 2: the bundle ownership is transferring, and the dst broker is 
not the channel owner
+        overrideTableViews(bundle,
+                new ServiceUnitStateData(Releasing, broker, brokerId1, 1));
+        assertEquals(Optional.of(broker), 
channel1.getOwnerAsync(bundle).get());
+
+
+        // case 3: the bundle ownership is transferring, and the dst broker is 
the channel owner
+        overrideTableViews(bundle,
+                new ServiceUnitStateData(Assigning, brokerId1, brokerId2, 1));
+        assertTrue(!channel1.getOwnerAsync(bundle).isDone());
+
+        // case 4: the bundle ownership is found
         overrideTableViews(bundle,
                 new ServiceUnitStateData(Owned, broker, null, 1));
         var owner = channel1.getOwnerAsync(bundle).get(5, 
TimeUnit.SECONDS).get();
         assertEquals(owner, broker);
 
-        // simulate the owner is inactive
+        // case 5: the owner lookup gets delayed
         var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
-        doReturn(CompletableFuture.completedFuture(Optional.empty()))
-                .when(spyRegistry).lookupAsync(eq(broker));
         FieldUtils.writeDeclaredField(channel1,
                 "brokerRegistry", spyRegistry , true);
         FieldUtils.writeDeclaredField(channel1,
                 "inFlightStateWaitingTimeInMillis", 1000, true);
+        var delayedFuture = new CompletableFuture();
+        doReturn(delayedFuture).when(spyRegistry).lookupAsync(eq(broker));
+        CompletableFuture.runAsync(() -> {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();;
+            }
+            delayedFuture.complete(Optional.of(broker));
+        });
 
-
-        // verify getOwnerAsync times out because the owner is inactive now.
+        // verify the owner eventually returns in 
inFlightStateWaitingTimeInMillis.
         long start = System.currentTimeMillis();
+        assertEquals(broker, channel1.getOwnerAsync(bundle).get().get());
+        long elapsed = System.currentTimeMillis() - start;
+        assertTrue(elapsed < 1000);
+
+        // case 6: the owner is inactive
+        doReturn(CompletableFuture.completedFuture(Optional.empty()))
+                .when(spyRegistry).lookupAsync(eq(broker));
+
+        // verify getOwnerAsync times out
+        start = System.currentTimeMillis();
         var ex = expectThrows(ExecutionException.class, () -> 
channel1.getOwnerAsync(bundle).get());
         assertTrue(ex.getCause() instanceof IllegalStateException);
         assertTrue(System.currentTimeMillis() - start >= 1000);
 
-        // simulate ownership cleanup(no selected owner) by the leader channel
+        // case 7: the ownership cleanup(no new owner) by the leader channel
         doReturn(CompletableFuture.completedFuture(Optional.empty()))
                 .when(loadManager).selectAsync(any(), any());
         var leaderChannel = channel1;
@@ -1669,7 +1700,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         waitUntilState(channel2, bundle, Init);
 
         assertTrue(System.currentTimeMillis() - start < 20_000);
-        // simulate ownership cleanup(brokerId1 selected owner) by the leader 
channel
+
+        // case 8: simulate ownership cleanup(brokerId1 as the new owner) by 
the leader channel
         overrideTableViews(bundle,
                 new ServiceUnitStateData(Owned, broker, null, 1));
         doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
@@ -1694,6 +1726,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
     }
 
+
     private static ConcurrentHashMap<String, 
CompletableFuture<Optional<String>>> getOwnerRequests(
             ServiceUnitStateChannel channel) throws IllegalAccessException {
         return (ConcurrentHashMap<String, CompletableFuture<Optional<String>>>)

Reply via email to