This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eed171d759b5935e9bc13914838ba0e329dccad2
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Aug 27 02:18:41 2021 -0700

    Producer getting producer busy is removing existing producer from list 
(#11804)
    
    ### Motivation
    When a producer is getting error because of ProducerBusy (existing producer 
with the same name), it will trigger a producer close operation that will 
eventually lead to the existing producer getting removed from the topic map 
(even though that producer is still writing on the topic).
    
    The problem is the producer close is triggering a removal from the map:
    
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
    
    Line 683 in 43ded59
    
     if (producers.remove(producer.getProducerName(), producer)) {
    Even though we check for producer equality, the Producer.equals() is only 
comparing the producer name, so the old instance gets removed from the map.
    
    Instead, the equality of producer needs to be based on the connection id 
(local & remote addresses and unique id), plus the producer id within that 
connection.
    
    * Producer getting producer busy is removing existing producer from list
    
    * Fixed test
    
    (cherry picked from commit 6aef83f3b77c343b9ea3edc1c07dbaf6bac9bd59)
---
 .../org/apache/pulsar/broker/service/Producer.java |  5 ++-
 .../apache/pulsar/broker/service/ServerCnx.java    | 18 +++++++++
 .../broker/service/PersistentTopicE2ETest.java     | 45 ++++++++++++++++++++++
 3 files changed, 67 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 8c35e66..12697f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -147,7 +147,10 @@ public class Producer {
     public boolean equals(Object obj) {
         if (obj instanceof Producer) {
             Producer other = (Producer) obj;
-            return Objects.equals(producerName, other.producerName) && 
Objects.equals(topic, other.topic);
+            return Objects.equals(producerName, other.producerName)
+                    && Objects.equals(topic, other.topic)
+                    && producerId == other.producerId
+                    && Objects.equals(cnx, other.cnx);
         }
 
         return false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6a5ee25..84bba9c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -41,6 +41,7 @@ import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -1531,6 +1532,23 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ServerCnx other = (ServerCnx) o;
+        return Objects.equals(ctx().channel().id(), 
other.ctx().channel().id());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(ctx().channel().id());
+    }
+
+    @Override
     protected void handleCloseProducer(CommandCloseProducer closeProducer) {
         checkArgument(state == State.Connected);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index ce48402..c39c692 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1786,4 +1786,49 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         assertEquals(msg.getValue(), "test");
         assertEquals(msg.getEventTime(), 5);
     }
+
+    @Test
+    public void testProducerBusy() throws Exception {
+        final String topicName = "prop/ns-abc/producer-busy-" + 
System.nanoTime();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .producerName("xxx")
+                .create();
+
+        
assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+
+        for (int i =0; i < 5; i++) {
+            try {
+                pulsarClient.newProducer(Schema.STRING)
+                        .topic(topicName)
+                        .producerName("xxx")
+                        .create();
+                fail("Should have failed");
+            } catch (ProducerBusyException e) {
+                // Expected
+            }
+
+            
assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+        }
+
+        // Try from different connection
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder()
+                .serviceUrl(getPulsar().getBrokerServiceUrl())
+                .build();
+
+        try {
+            client2.newProducer(Schema.STRING)
+                    .topic(topicName)
+                    .producerName("xxx")
+                    .create();
+            fail("Should have failed");
+        } catch (ProducerBusyException e) {
+            // Expected
+        }
+
+        
assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+    }
 }

Reply via email to