merlimat commented on a change in pull request #8685:
URL: https://github.com/apache/pulsar/pull/8685#discussion_r530520323



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, 
schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a 
terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was 
already terminated");
+                        }
+                        internalAddProducer(producer);
+
+                        USAGE_COUNT_UPDATER.incrementAndGet(this);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Added producer -- count: {}", 
topic, producer.getProducerName(),
+                                    USAGE_COUNT_UPDATER.get(this));
+                        }
+
+                        future.complete(epoch);
+                    } catch (Throwable e) {
+                        future.completeExceptionally(e);
+                    } finally {
+                        lock.readLock().unlock();
+                    }
+                }).exceptionally(ex -> {
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+        return future;
+    }
+
+    protected CompletableFuture<Optional<Long>> 
incrementTopicEpochIfNeeded(Producer producer) {
+        lock.writeLock().lock();

Review comment:
       I'm not sure that we can avoid the locking, and I'm not sure that it's 
something to worry about in the context of adding a producer. 
   Actually, in the specific case there's still a race condition between 
updating the `producers` map, since the update to `hasExclusiveProducer` and 
the insertion into `producers` map should be atomic as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to