rdhabalia commented on a change in pull request #8685:
URL: https://github.com/apache/pulsar/pull/8685#discussion_r530165563



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, 
schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a 
terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was 
already terminated");
+                        }
+                        internalAddProducer(producer);
+
+                        USAGE_COUNT_UPDATER.incrementAndGet(this);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Added producer -- count: {}", 
topic, producer.getProducerName(),
+                                    USAGE_COUNT_UPDATER.get(this));
+                        }
+
+                        future.complete(epoch);
+                    } catch (Throwable e) {
+                        future.completeExceptionally(e);
+                    } finally {
+                        lock.readLock().unlock();
+                    }
+                }).exceptionally(ex -> {
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+        return future;
+    }
+
+    protected CompletableFuture<Optional<Long>> 
incrementTopicEpochIfNeeded(Producer producer) {
+        lock.writeLock().lock();
+        try {
+            switch (producer.getAccessMode()) {
+            case Shared:
+                if (hasExclusiveProducer) {
+                   return FutureUtil.failedFuture(new 
ProducerBusyException("Topic has an existing exclusive producer"));
+                } else {
+                    // Normal producer getting added, we don't need a new epoch
+                    return CompletableFuture.completedFuture(topicEpoch);
+                }
+
+            case Exclusive:
+                 if (hasExclusiveProducer || !producers.isEmpty()) {
+                    return FutureUtil.failedFuture(new 
ProducerFencedException("Topic has existing producers"));
+                 } else if (producer.getTopicEpoch().isPresent() && 
producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)){
+                     // If a producer reconnects, but all the topic epoch has 
already moved forward, this producer needs to
+                     // be fenced, because a new producer had been present in 
between.
+                     return FutureUtil.failedFuture(new 
ProducerFencedException("Topic epoch has already moved"));

Review comment:
       can we add debug log here with epoch.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, 
schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a 
terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was 
already terminated");
+                        }
+                        internalAddProducer(producer);

Review comment:
       is it possible to add producer-type if `topic-stats` if producer is 
exclusive for information and debugging.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, 
schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a 
terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was 
already terminated");
+                        }
+                        internalAddProducer(producer);
+
+                        USAGE_COUNT_UPDATER.incrementAndGet(this);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Added producer -- count: {}", 
topic, producer.getProducerName(),
+                                    USAGE_COUNT_UPDATER.get(this));
+                        }
+
+                        future.complete(epoch);
+                    } catch (Throwable e) {
+                        future.completeExceptionally(e);
+                    } finally {
+                        lock.readLock().unlock();
+                    }
+                }).exceptionally(ex -> {
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+        return future;
+    }
+
+    protected CompletableFuture<Optional<Long>> 
incrementTopicEpochIfNeeded(Producer producer) {
+        lock.writeLock().lock();
+        try {
+            switch (producer.getAccessMode()) {
+            case Shared:
+                if (hasExclusiveProducer) {
+                   return FutureUtil.failedFuture(new 
ProducerBusyException("Topic has an existing exclusive producer"));
+                } else {
+                    // Normal producer getting added, we don't need a new epoch
+                    return CompletableFuture.completedFuture(topicEpoch);
+                }
+
+            case Exclusive:
+                 if (hasExclusiveProducer || !producers.isEmpty()) {
+                    return FutureUtil.failedFuture(new 
ProducerFencedException("Topic has existing producers"));
+                 } else if (producer.getTopicEpoch().isPresent() && 
producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)){
+                     // If a producer reconnects, but all the topic epoch has 
already moved forward, this producer needs to
+                     // be fenced, because a new producer had been present in 
between.
+                     return FutureUtil.failedFuture(new 
ProducerFencedException("Topic epoch has already moved"));
+                } else {
+                    // There are currently no existing producers
+                    hasExclusiveProducer = true;
+
+                    CompletableFuture<Optional<Long>> future = 
incrementTopicEpoch(topicEpoch).thenApply(epoch -> {
+                        topicEpoch = Optional.of(epoch);
+                        return topicEpoch;
+                    });
+
+                    future.exceptionally(ex -> {
+                        hasExclusiveProducer = false;
+                        return null;
+                    });
+                    return future;
+                }
+
+           // case WaitForExclusive:

Review comment:
       I have one suggestion. I am sure you must have thought about it but 
can't we rename `WaitForExclusive` with `FailOver` as it will be consistent to 
subscription type name and it will need no explanation and easy to understand.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -451,7 +554,44 @@ private boolean isUserProvidedProducerName(Producer 
producer){
         return producer.isUserProvidedProducerName() && 
!producer.getProducerName().startsWith(replicatorPrefix);
     }
 
-    protected abstract void handleProducerRemoved(Producer producer);
+
+    @Override
+    public void removeProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        if (producers.remove(producer.getProducerName(), producer)) {
+            handleProducerRemoved(producer);

Review comment:
       shouldn't we need lock here else it may create a race condition and  
producer with `WaitForExclusive ` may wait forever.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, 
schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a 
terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was 
already terminated");
+                        }
+                        internalAddProducer(producer);
+
+                        USAGE_COUNT_UPDATER.incrementAndGet(this);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Added producer -- count: {}", 
topic, producer.getProducerName(),
+                                    USAGE_COUNT_UPDATER.get(this));
+                        }
+
+                        future.complete(epoch);
+                    } catch (Throwable e) {
+                        future.completeExceptionally(e);
+                    } finally {
+                        lock.readLock().unlock();
+                    }
+                }).exceptionally(ex -> {
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+        return future;
+    }
+
+    protected CompletableFuture<Optional<Long>> 
incrementTopicEpochIfNeeded(Producer producer) {
+        lock.writeLock().lock();

Review comment:
       can we avoid locking for normal producer usecase? 
   ```
   if (producers.isEmpty() && producer.getAccessMode() == Shared) {
   return CompletableFuture.completedFuture(topicEpoch);
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to