codelipenghui commented on a change in pull request #5571: Add epoch for
connection handler to handle create producer timeout.
URL: https://github.com/apache/pulsar/pull/5571#discussion_r344017172
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -258,16 +258,59 @@ public void incrementPublishCount(int numOfMessages,
long msgSizeInBytes) {
@Override
public void resetPublishCountAndEnableReadIfRequired() {
if (this.publishRateLimiter.resetPublishCount()) {
- enableProduerRead();
+ enableProducerRead();
}
}
/**
* it sets cnx auto-readable if producer's cnx is disabled due to
publish-throttling
*/
- protected void enableProduerRead() {
+ protected void enableProducerRead() {
if (producers != null) {
- producers.forEach(producer ->
producer.getCnx().enableCnxAutoRead());
+ producers.values().forEach(producer ->
producer.getCnx().enableCnxAutoRead());
+ }
+ }
+
+ protected void checkTopicFenced() throws BrokerServiceException {
+ if (isFenced) {
+ log.warn("[{}] Attempting to add producer to a fenced topic",
topic);
+ throw new BrokerServiceException.TopicFencedException("Topic is
temporarily unavailable");
+ }
+ }
+
+ protected void internalAddProducer(Producer producer) throws
BrokerServiceException {
+ if (isProducersExceeded()) {
+ log.warn("[{}] Attempting to add producer to topic which reached
max producers limit", topic);
+ throw new BrokerServiceException.ProducerBusyException("Topic
reached max producers limit");
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] {} Got request to create producer ", topic,
producer.getProducerName());
+ }
+
+ Producer existProducer =
producers.putIfAbsent(producer.getProducerName(), producer);
+ if (existProducer != null) {
+ tryOverwriteOldProducer(existProducer, producer);
+ }
+ }
+
+ private void tryOverwriteOldProducer(Producer oldProducer, Producer
newProducer)
+ throws BrokerServiceException {
+ boolean canOverwrite = false;
+ if (oldProducer.equals(newProducer) &&
!oldProducer.isUserProvidedProducerName()
+ && !newProducer.isUserProvidedProducerName() &&
newProducer.getEpoch() > oldProducer.getEpoch()) {
+ oldProducer.close();
+ canOverwrite = true;
+ }
+ if (canOverwrite) {
Review comment:
May be if can simplified with
```
if (!canOverwrite || !producers.replace(newProducer.getProducerName(),
oldProducer, newProducer)) {
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() +
"' is already connected")
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services