This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 395b55cd373 [improve][broker] Extract duplication in AbstractTopic#incrementTopicEpochIfNeeded (#24520) 395b55cd373 is described below commit 395b55cd37392f1e6454a33277ca3db1dde50cbe Author: Ruimin MA <maruimin...@gmail.com> AuthorDate: Tue Jul 29 16:16:01 2025 +0800 [improve][broker] Extract duplication in AbstractTopic#incrementTopicEpochIfNeeded (#24520) --- .../pulsar/broker/service/AbstractTopic.java | 154 +++++++-------------- 1 file changed, 51 insertions(+), 103 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 61c5f954d58..520ece34f66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -806,84 +806,28 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { "Topic has an existing exclusive producer: " + exclusiveProducerName)); } else if (!producers.isEmpty()) { return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing shared producers")); - } else if (producer.getTopicEpoch().isPresent() - && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) { - // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs - // to be fenced, because a new producer had been present in between. - return FutureUtil.failedFuture(new ProducerFencedException( - String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d", - topicEpoch.get(), producer.getTopicEpoch().get()))); - } else { - // There are currently no existing producers - hasExclusiveProducer = true; - exclusiveProducerName = producer.getProducerName(); - - CompletableFuture<Long> future; - if (producer.getTopicEpoch().isPresent()) { - future = setTopicEpoch(producer.getTopicEpoch().get()); - } else { - future = incrementTopicEpoch(topicEpoch); - } - future.exceptionally(ex -> { - hasExclusiveProducer = false; - exclusiveProducerName = null; - return null; + } + return handleTopicEpochForExclusiveProducer(producer); + case ExclusiveWithFencing: + if (hasExclusiveProducer || !producers.isEmpty()) { + // clear all waiting producers + // otherwise closing any producer will trigger the promotion + // of the next pending producer + List<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducersCopy = + new ArrayList<>(waitingExclusiveProducers); + waitingExclusiveProducers.clear(); + waitingExclusiveProducersCopy.forEach((Pair<Producer, + CompletableFuture<Optional<Long>>> handle) -> { + log.info("[{}] Failing waiting producer {}", topic, handle.getKey()); + handle.getValue().completeExceptionally(new ProducerFencedException("Fenced out")); + handle.getKey().close(true); }); - - return future.thenApply(epoch -> { - topicEpoch = Optional.of(epoch); - return topicEpoch; + producers.forEach((k, currentProducer) -> { + log.info("[{}] Fencing out producer {}", topic, currentProducer); + currentProducer.close(true); }); } - case ExclusiveWithFencing: - if (hasExclusiveProducer || !producers.isEmpty()) { - // clear all waiting producers - // otherwise closing any producer will trigger the promotion - // of the next pending producer - List<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducersCopy = - new ArrayList<>(waitingExclusiveProducers); - waitingExclusiveProducers.clear(); - waitingExclusiveProducersCopy.forEach((Pair<Producer, - CompletableFuture<Optional<Long>>> handle) -> { - log.info("[{}] Failing waiting producer {}", topic, handle.getKey()); - handle.getValue().completeExceptionally(new ProducerFencedException("Fenced out")); - handle.getKey().close(true); - }); - producers.forEach((k, currentProducer) -> { - log.info("[{}] Fencing out producer {}", topic, currentProducer); - currentProducer.close(true); - }); - } - if (producer.getTopicEpoch().isPresent() - && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) { - // If a producer reconnects, but all the topic epoch has already moved forward, - // this producer needs to be fenced, because a new producer had been present in between. - hasExclusiveProducer = false; - return FutureUtil.failedFuture(new ProducerFencedException( - String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d", - topicEpoch.get(), producer.getTopicEpoch().get()))); - } else { - // There are currently no existing producers - hasExclusiveProducer = true; - exclusiveProducerName = producer.getProducerName(); - - CompletableFuture<Long> future; - if (producer.getTopicEpoch().isPresent()) { - future = setTopicEpoch(producer.getTopicEpoch().get()); - } else { - future = incrementTopicEpoch(topicEpoch); - } - future.exceptionally(ex -> { - hasExclusiveProducer = false; - exclusiveProducerName = null; - return null; - }); - - return future.thenApply(epoch -> { - topicEpoch = Optional.of(epoch); - return topicEpoch; - }); - } + return handleTopicEpochForExclusiveProducer(producer); case WaitForExclusive: { if (hasExclusiveProducer || !producers.isEmpty()) { CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); @@ -891,35 +835,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { waitingExclusiveProducers.add(Pair.of(producer, future)); producerQueuedFuture.complete(null); return future; - } else if (producer.getTopicEpoch().isPresent() - && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) { - // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs - // to be fenced, because a new producer had been present in between. - return FutureUtil.failedFuture(new ProducerFencedException( - String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d", - topicEpoch.get(), producer.getTopicEpoch().get()))); - } else { - // There are currently no existing producers - hasExclusiveProducer = true; - exclusiveProducerName = producer.getProducerName(); - - CompletableFuture<Long> future; - if (producer.getTopicEpoch().isPresent()) { - future = setTopicEpoch(producer.getTopicEpoch().get()); - } else { - future = incrementTopicEpoch(topicEpoch); - } - future.exceptionally(ex -> { - hasExclusiveProducer = false; - exclusiveProducerName = null; - return null; - }); - - return future.thenApply(epoch -> { - topicEpoch = Optional.of(epoch); - return topicEpoch; - }); } + return handleTopicEpochForExclusiveProducer(producer); } default: @@ -935,6 +852,37 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { } } + private CompletableFuture<Optional<Long>> handleTopicEpochForExclusiveProducer(Producer producer) { + if (producer.getTopicEpoch().isPresent() + && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) { + // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs + // to be fenced, because a new producer had been present in between. + return FutureUtil.failedFuture(new ProducerFencedException( + String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d", + topicEpoch.get(), producer.getTopicEpoch().get()))); + } + // There are currently no existing producers + hasExclusiveProducer = true; + exclusiveProducerName = producer.getProducerName(); + + CompletableFuture<Long> future; + if (producer.getTopicEpoch().isPresent()) { + future = setTopicEpoch(producer.getTopicEpoch().get()); + } else { + future = incrementTopicEpoch(topicEpoch); + } + future.exceptionally(ex -> { + hasExclusiveProducer = false; + exclusiveProducerName = null; + return null; + }); + + return future.thenApply(epoch -> { + topicEpoch = Optional.of(epoch); + return topicEpoch; + }); + } + protected abstract CompletableFuture<Long> setTopicEpoch(long newEpoch); protected abstract CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch);