merlimat commented on a change in pull request #8685:
URL: https://github.com/apache/pulsar/pull/8685#discussion_r530523232
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
.checkConsumerCompatibility(id, schema,
schemaCompatibilityStrategy);
}
+ @Override
+ public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+ checkArgument(producer.getTopic() == this);
+
+ CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+ incrementTopicEpochIfNeeded(producer)
+ .thenAccept(epoch -> {
+ lock.readLock().lock();
+ try {
+ brokerService.checkTopicNsOwnership(getName());
+ checkTopicFenced();
+ if (isTerminated()) {
+ log.warn("[{}] Attempting to add producer to a
terminated topic", topic);
+ throw new TopicTerminatedException("Topic was
already terminated");
+ }
+ internalAddProducer(producer);
+
+ USAGE_COUNT_UPDATER.incrementAndGet(this);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Added producer -- count: {}",
topic, producer.getProducerName(),
+ USAGE_COUNT_UPDATER.get(this));
+ }
+
+ future.complete(epoch);
+ } catch (Throwable e) {
+ future.completeExceptionally(e);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }).exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
+
+ return future;
+ }
+
+ protected CompletableFuture<Optional<Long>>
incrementTopicEpochIfNeeded(Producer producer) {
+ lock.writeLock().lock();
+ try {
+ switch (producer.getAccessMode()) {
+ case Shared:
+ if (hasExclusiveProducer) {
+ return FutureUtil.failedFuture(new
ProducerBusyException("Topic has an existing exclusive producer"));
+ } else {
+ // Normal producer getting added, we don't need a new epoch
+ return CompletableFuture.completedFuture(topicEpoch);
+ }
+
+ case Exclusive:
+ if (hasExclusiveProducer || !producers.isEmpty()) {
+ return FutureUtil.failedFuture(new
ProducerFencedException("Topic has existing producers"));
+ } else if (producer.getTopicEpoch().isPresent() &&
producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)){
+ // If a producer reconnects, but all the topic epoch has
already moved forward, this producer needs to
+ // be fenced, because a new producer had been present in
between.
+ return FutureUtil.failedFuture(new
ProducerFencedException("Topic epoch has already moved"));
+ } else {
+ // There are currently no existing producers
+ hasExclusiveProducer = true;
+
+ CompletableFuture<Optional<Long>> future =
incrementTopicEpoch(topicEpoch).thenApply(epoch -> {
+ topicEpoch = Optional.of(epoch);
+ return topicEpoch;
+ });
+
+ future.exceptionally(ex -> {
+ hasExclusiveProducer = false;
+ return null;
+ });
+ return future;
+ }
+
+ // case WaitForExclusive:
Review comment:
The problem I see is that it's not exactly the same thing and that can
confuse people:
1. The semantic of creating a `WaitForExclusive` is different because the
`newProducer()....create()` call is hanging until that particular producer is
selected, unlike in consumers where it's created immediately but it will not
receive messages.
2. The use case is mostly different from "failover" for reliability
purpose, since you can use `WaitForExclusive` to do leader election
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]