This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eed171d759b5935e9bc13914838ba0e329dccad2 Author: Matteo Merli <[email protected]> AuthorDate: Fri Aug 27 02:18:41 2021 -0700 Producer getting producer busy is removing existing producer from list (#11804) ### Motivation When a producer is getting error because of ProducerBusy (existing producer with the same name), it will trigger a producer close operation that will eventually lead to the existing producer getting removed from the topic map (even though that producer is still writing on the topic). The problem is the producer close is triggering a removal from the map: pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Line 683 in 43ded59 if (producers.remove(producer.getProducerName(), producer)) { Even though we check for producer equality, the Producer.equals() is only comparing the producer name, so the old instance gets removed from the map. Instead, the equality of producer needs to be based on the connection id (local & remote addresses and unique id), plus the producer id within that connection. * Producer getting producer busy is removing existing producer from list * Fixed test (cherry picked from commit 6aef83f3b77c343b9ea3edc1c07dbaf6bac9bd59) --- .../org/apache/pulsar/broker/service/Producer.java | 5 ++- .../apache/pulsar/broker/service/ServerCnx.java | 18 +++++++++ .../broker/service/PersistentTopicE2ETest.java | 45 ++++++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 8c35e66..12697f6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -147,7 +147,10 @@ public class Producer { public boolean equals(Object obj) { if (obj instanceof Producer) { Producer other = (Producer) obj; - return Objects.equals(producerName, other.producerName) && Objects.equals(topic, other.topic); + return Objects.equals(producerName, other.producerName) + && Objects.equals(topic, other.topic) + && producerId == other.producerId + && Objects.equals(cnx, other.cnx); } return false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 6a5ee25..84bba9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -41,6 +41,7 @@ import java.util.Collections; import java.util.IdentityHashMap; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -1531,6 +1532,23 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ServerCnx other = (ServerCnx) o; + return Objects.equals(ctx().channel().id(), other.ctx().channel().id()); + } + + @Override + public int hashCode() { + return Objects.hash(ctx().channel().id()); + } + + @Override protected void handleCloseProducer(CommandCloseProducer closeProducer) { checkArgument(state == State.Connected); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index ce48402..c39c692 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1786,4 +1786,49 @@ public class PersistentTopicE2ETest extends BrokerTestBase { assertEquals(msg.getValue(), "test"); assertEquals(msg.getEventTime(), 5); } + + @Test + public void testProducerBusy() throws Exception { + final String topicName = "prop/ns-abc/producer-busy-" + System.nanoTime(); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + + for (int i =0; i < 5; i++) { + try { + pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + fail("Should have failed"); + } catch (ProducerBusyException e) { + // Expected + } + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + } + + // Try from different connection + @Cleanup + PulsarClient client2 = PulsarClient.builder() + .serviceUrl(getPulsar().getBrokerServiceUrl()) + .build(); + + try { + client2.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + fail("Should have failed"); + } catch (ProducerBusyException e) { + // Expected + } + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + } }
