This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 39f4ccdc2c9 [fix] [broker] Fix config replicationStartAt does not work 
when set it to earliest (#23719)
39f4ccdc2c9 is described below

commit 39f4ccdc2c9fe60e9a35bcced81bf915fa3df294
Author: fengyubiao <[email protected]>
AuthorDate: Fri Dec 13 09:42:17 2024 +0800

    [fix] [broker] Fix config replicationStartAt does not work when set it to 
earliest (#23719)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java  | 8 ++++++--
 .../org/apache/pulsar/broker/service/OneWayReplicatorTest.java    | 4 ++--
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index eb48ceee72d..0b2c9d8c7bc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2066,9 +2066,13 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         final CompletableFuture<Void> future = new CompletableFuture<>();
 
         String name = PersistentReplicator.getReplicatorName(replicatorPrefix, 
remoteCluster);
+        String replicationStartAt = 
getBrokerService().getPulsar().getConfiguration().getReplicationStartAt();
         final InitialPosition initialPosition;
-        if (MessageId.earliest.toString()
-                
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt()))
 {
+        // "MessageId.earliest.toString()" is "-1:-1:-1", which is not 
suggested, just guarantee compatibility with the
+        //  previous version.
+        // "InitialPosition.Earliest.name()" is "Earliest", which is suggested.
+        if (MessageId.earliest.toString().equalsIgnoreCase(replicationStartAt)
+                || 
InitialPosition.Earliest.name().equalsIgnoreCase(replicationStartAt)) {
             initialPosition = InitialPosition.Earliest;
         } else {
             initialPosition = InitialPosition.Latest;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index a8f8d7ecbbd..d3356a60695 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -1023,9 +1023,9 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         disableReplication(topic1);
 
         // 2.Update config: start at "earliest".
-        admin1.brokers().updateDynamicConfiguration("replicationStartAt", 
MessageId.earliest.toString());
+        admin1.brokers().updateDynamicConfiguration("replicationStartAt", 
"earliest");
         Awaitility.await().untilAsserted(() -> {
-            
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
+            assertEquals(pulsar1.getConfiguration().getReplicationStartAt(), 
"earliest");
         });
 
         final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + 
ns1 + "/tp_");

Reply via email to