merlimat commented on a change in pull request #9554:
URL: https://github.com/apache/pulsar/pull/9554#discussion_r574238697
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -109,6 +109,9 @@
protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
+ // pointer to the exclusive producer
+ private volatile Producer exclusiveProducer;
Review comment:
Should we just use a ref to the name instead? Otherwise it will provide
a much longer toString() representation
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -382,17 +385,17 @@ public String getReplicatorPrefix() {
switch (producer.getAccessMode()) {
case Shared:
if (hasExclusiveProducer ||
!waitingExclusiveProducers.isEmpty()) {
- return FutureUtil.failedFuture(new ProducerBusyException(
- "Topic has an existing exclusive producer: " +
producers.keys().nextElement()));
+ return FutureUtil.failedFuture(
+ new ProducerFencedException("Topic has an existing
exclusive producer: " + exclusiveProducer));
Review comment:
A shared producer will not get fenced, instead it will get a busy error
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -619,17 +629,21 @@ public void removeProducer(Producer producer) {
protected void handleProducerRemoved(Producer producer) {
// decrement usage only if this was a valid producer close
- long newCount = USAGE_COUNT_UPDATER.decrementAndGet(this);
- if (newCount == 0) {
+ USAGE_COUNT_UPDATER.decrementAndGet(this);
Review comment:
Since we add an issue withe consumers here, can you add a test with
consumer, to make sure we don’t break it in the future?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]