codelipenghui commented on code in PR #21220:
URL: https://github.com/apache/pulsar/pull/21220#discussion_r1378596083
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java:
##########
@@ -974,31 +975,40 @@ protected void internalAddProducer(Producer producer)
throws BrokerServiceExcept
Producer existProducer =
producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
- tryOverwriteOldProducer(existProducer, producer);
+ return tryOverwriteOldProducer(existProducer, producer);
} else if (!producer.isRemote()) {
USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
}
+ return CompletableFuture.completedFuture(null);
}
- private void tryOverwriteOldProducer(Producer oldProducer, Producer
newProducer)
- throws BrokerServiceException {
+ private CompletableFuture<Void> tryOverwriteOldProducer(Producer
oldProducer, Producer newProducer) {
if (newProducer.isSuccessorTo(oldProducer)) {
oldProducer.close(false);
if (!producers.replace(newProducer.getProducerName(), oldProducer,
newProducer)) {
// Met concurrent update, throw exception here so that client
can try reconnect later.
- throw new BrokerServiceException.NamingException("Producer
with name '" + newProducer.getProducerName()
- + "' replace concurrency error");
+ return CompletableFuture.failedFuture(new
BrokerServiceException.NamingException("Producer with name '"
+ + newProducer.getProducerName() + "' replace
concurrency error"));
} else {
handleProducerRemoved(oldProducer);
+ return CompletableFuture.completedFuture(null);
}
} else {
// If a producer with the same name tries to use a new connection,
async check the old connection is
// available. The producers related the connection that not
available are automatically cleaned up.
if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
- oldProducer.getCnx().checkConnectionLiveness();
+ return
oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
+ if (previousIsActive) {
+ return CompletableFuture.failedFuture(new
BrokerServiceException.NamingException(
+ "Producer with name '" +
newProducer.getProducerName()
+ + "' is already connected to topic"));
+ } else {
+ return internalAddProducer(newProducer);
Review Comment:
It's better to add a comment here to explain why we should take a recursive
call.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java:
##########
@@ -974,31 +975,40 @@ protected void internalAddProducer(Producer producer)
throws BrokerServiceExcept
Producer existProducer =
producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
- tryOverwriteOldProducer(existProducer, producer);
+ return tryOverwriteOldProducer(existProducer, producer);
} else if (!producer.isRemote()) {
USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
}
+ return CompletableFuture.completedFuture(null);
}
- private void tryOverwriteOldProducer(Producer oldProducer, Producer
newProducer)
- throws BrokerServiceException {
+ private CompletableFuture<Void> tryOverwriteOldProducer(Producer
oldProducer, Producer newProducer) {
if (newProducer.isSuccessorTo(oldProducer)) {
oldProducer.close(false);
if (!producers.replace(newProducer.getProducerName(), oldProducer,
newProducer)) {
// Met concurrent update, throw exception here so that client
can try reconnect later.
- throw new BrokerServiceException.NamingException("Producer
with name '" + newProducer.getProducerName()
- + "' replace concurrency error");
+ return CompletableFuture.failedFuture(new
BrokerServiceException.NamingException("Producer with name '"
+ + newProducer.getProducerName() + "' replace
concurrency error"));
} else {
handleProducerRemoved(oldProducer);
+ return CompletableFuture.completedFuture(null);
}
} else {
// If a producer with the same name tries to use a new connection,
async check the old connection is
// available. The producers related the connection that not
available are automatically cleaned up.
if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
- oldProducer.getCnx().checkConnectionLiveness();
+ return
oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
+ if (previousIsActive) {
+ return CompletableFuture.failedFuture(new
BrokerServiceException.NamingException(
+ "Producer with name '" +
newProducer.getProducerName()
+ + "' is already connected to topic"));
+ } else {
+ return internalAddProducer(newProducer);
Review Comment:
And it also should be BUG before, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]