This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 395b55cd373 [improve][broker] Extract duplication in 
AbstractTopic#incrementTopicEpochIfNeeded (#24520)
395b55cd373 is described below

commit 395b55cd37392f1e6454a33277ca3db1dde50cbe
Author: Ruimin MA <maruimin...@gmail.com>
AuthorDate: Tue Jul 29 16:16:01 2025 +0800

    [improve][broker] Extract duplication in 
AbstractTopic#incrementTopicEpochIfNeeded (#24520)
---
 .../pulsar/broker/service/AbstractTopic.java       | 154 +++++++--------------
 1 file changed, 51 insertions(+), 103 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 61c5f954d58..520ece34f66 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -806,84 +806,28 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
                                     "Topic has an existing exclusive producer: 
" + exclusiveProducerName));
                 } else if (!producers.isEmpty()) {
                     return FutureUtil.failedFuture(new 
ProducerFencedException("Topic has existing shared producers"));
-                } else if (producer.getTopicEpoch().isPresent()
-                        && producer.getTopicEpoch().get() < 
topicEpoch.orElse(-1L)) {
-                    // If a producer reconnects, but all the topic epoch has 
already moved forward, this producer needs
-                    // to be fenced, because a new producer had been present 
in between.
-                    return FutureUtil.failedFuture(new ProducerFencedException(
-                            String.format("Topic epoch has already moved. 
Current epoch: %d, Producer epoch: %d",
-                                    topicEpoch.get(), 
producer.getTopicEpoch().get())));
-                } else {
-                    // There are currently no existing producers
-                    hasExclusiveProducer = true;
-                    exclusiveProducerName = producer.getProducerName();
-
-                    CompletableFuture<Long> future;
-                    if (producer.getTopicEpoch().isPresent()) {
-                        future = setTopicEpoch(producer.getTopicEpoch().get());
-                    } else {
-                        future = incrementTopicEpoch(topicEpoch);
-                    }
-                    future.exceptionally(ex -> {
-                        hasExclusiveProducer = false;
-                        exclusiveProducerName = null;
-                        return null;
+                }
+                return handleTopicEpochForExclusiveProducer(producer);
+            case ExclusiveWithFencing:
+                if (hasExclusiveProducer || !producers.isEmpty()) {
+                    // clear all waiting producers
+                    // otherwise closing any producer will trigger the 
promotion
+                    // of the next pending producer
+                    List<Pair<Producer, CompletableFuture<Optional<Long>>>> 
waitingExclusiveProducersCopy =
+                            new ArrayList<>(waitingExclusiveProducers);
+                    waitingExclusiveProducers.clear();
+                    waitingExclusiveProducersCopy.forEach((Pair<Producer,
+                                                           
CompletableFuture<Optional<Long>>> handle) -> {
+                        log.info("[{}] Failing waiting producer {}", topic, 
handle.getKey());
+                        handle.getValue().completeExceptionally(new 
ProducerFencedException("Fenced out"));
+                        handle.getKey().close(true);
                     });
-
-                    return future.thenApply(epoch -> {
-                        topicEpoch = Optional.of(epoch);
-                        return topicEpoch;
+                    producers.forEach((k, currentProducer) -> {
+                        log.info("[{}] Fencing out producer {}", topic, 
currentProducer);
+                        currentProducer.close(true);
                     });
                 }
-                case ExclusiveWithFencing:
-                    if (hasExclusiveProducer || !producers.isEmpty()) {
-                        // clear all waiting producers
-                        // otherwise closing any producer will trigger the 
promotion
-                        // of the next pending producer
-                        List<Pair<Producer, 
CompletableFuture<Optional<Long>>>> waitingExclusiveProducersCopy =
-                                new ArrayList<>(waitingExclusiveProducers);
-                        waitingExclusiveProducers.clear();
-                        waitingExclusiveProducersCopy.forEach((Pair<Producer,
-                                                               
CompletableFuture<Optional<Long>>> handle) -> {
-                            log.info("[{}] Failing waiting producer {}", 
topic, handle.getKey());
-                            handle.getValue().completeExceptionally(new 
ProducerFencedException("Fenced out"));
-                            handle.getKey().close(true);
-                        });
-                        producers.forEach((k, currentProducer) -> {
-                            log.info("[{}] Fencing out producer {}", topic, 
currentProducer);
-                            currentProducer.close(true);
-                        });
-                    }
-                    if (producer.getTopicEpoch().isPresent()
-                            && producer.getTopicEpoch().get() < 
topicEpoch.orElse(-1L)) {
-                        // If a producer reconnects, but all the topic epoch 
has already moved forward,
-                        // this producer needs to be fenced, because a new 
producer had been present in between.
-                        hasExclusiveProducer = false;
-                        return FutureUtil.failedFuture(new 
ProducerFencedException(
-                                String.format("Topic epoch has already moved. 
Current epoch: %d, Producer epoch: %d",
-                                        topicEpoch.get(), 
producer.getTopicEpoch().get())));
-                    } else {
-                        // There are currently no existing producers
-                        hasExclusiveProducer = true;
-                        exclusiveProducerName = producer.getProducerName();
-
-                        CompletableFuture<Long> future;
-                        if (producer.getTopicEpoch().isPresent()) {
-                            future = 
setTopicEpoch(producer.getTopicEpoch().get());
-                        } else {
-                            future = incrementTopicEpoch(topicEpoch);
-                        }
-                        future.exceptionally(ex -> {
-                            hasExclusiveProducer = false;
-                            exclusiveProducerName = null;
-                            return null;
-                        });
-
-                        return future.thenApply(epoch -> {
-                            topicEpoch = Optional.of(epoch);
-                            return topicEpoch;
-                        });
-                    }
+                return handleTopicEpochForExclusiveProducer(producer);
             case WaitForExclusive: {
                 if (hasExclusiveProducer || !producers.isEmpty()) {
                     CompletableFuture<Optional<Long>> future = new 
CompletableFuture<>();
@@ -891,35 +835,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
                     waitingExclusiveProducers.add(Pair.of(producer, future));
                     producerQueuedFuture.complete(null);
                     return future;
-                } else if (producer.getTopicEpoch().isPresent()
-                        && producer.getTopicEpoch().get() < 
topicEpoch.orElse(-1L)) {
-                    // If a producer reconnects, but all the topic epoch has 
already moved forward, this producer needs
-                    // to be fenced, because a new producer had been present 
in between.
-                    return FutureUtil.failedFuture(new ProducerFencedException(
-                            String.format("Topic epoch has already moved. 
Current epoch: %d, Producer epoch: %d",
-                                    topicEpoch.get(), 
producer.getTopicEpoch().get())));
-                } else {
-                    // There are currently no existing producers
-                    hasExclusiveProducer = true;
-                    exclusiveProducerName = producer.getProducerName();
-
-                    CompletableFuture<Long> future;
-                    if (producer.getTopicEpoch().isPresent()) {
-                        future = setTopicEpoch(producer.getTopicEpoch().get());
-                    } else {
-                        future = incrementTopicEpoch(topicEpoch);
-                    }
-                    future.exceptionally(ex -> {
-                        hasExclusiveProducer = false;
-                        exclusiveProducerName = null;
-                        return null;
-                    });
-
-                    return future.thenApply(epoch -> {
-                        topicEpoch = Optional.of(epoch);
-                        return topicEpoch;
-                    });
                 }
+                return handleTopicEpochForExclusiveProducer(producer);
             }
 
             default:
@@ -935,6 +852,37 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
         }
     }
 
+    private CompletableFuture<Optional<Long>> 
handleTopicEpochForExclusiveProducer(Producer producer) {
+        if (producer.getTopicEpoch().isPresent()
+                && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
+            // If a producer reconnects, but all the topic epoch has already 
moved forward, this producer needs
+            // to be fenced, because a new producer had been present in 
between.
+            return FutureUtil.failedFuture(new ProducerFencedException(
+                    String.format("Topic epoch has already moved. Current 
epoch: %d, Producer epoch: %d",
+                            topicEpoch.get(), 
producer.getTopicEpoch().get())));
+        }
+        // There are currently no existing producers
+        hasExclusiveProducer = true;
+        exclusiveProducerName = producer.getProducerName();
+
+        CompletableFuture<Long> future;
+        if (producer.getTopicEpoch().isPresent()) {
+            future = setTopicEpoch(producer.getTopicEpoch().get());
+        } else {
+            future = incrementTopicEpoch(topicEpoch);
+        }
+        future.exceptionally(ex -> {
+            hasExclusiveProducer = false;
+            exclusiveProducerName = null;
+            return null;
+        });
+
+        return future.thenApply(epoch -> {
+            topicEpoch = Optional.of(epoch);
+            return topicEpoch;
+        });
+    }
+
     protected abstract CompletableFuture<Long> setTopicEpoch(long newEpoch);
 
     protected abstract CompletableFuture<Long> 
incrementTopicEpoch(Optional<Long> currentEpoch);

Reply via email to