This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ab69c3d4e04 [fix] [broker] Fix config replicationStartAt does not work
when set it to earliest (#23719)
ab69c3d4e04 is described below
commit ab69c3d4e04ad66e6a31823ca01a0e7b5fe14356
Author: fengyubiao <[email protected]>
AuthorDate: Fri Dec 13 09:42:17 2024 +0800
[fix] [broker] Fix config replicationStartAt does not work when set it to
earliest (#23719)
Co-authored-by: Lari Hotari <[email protected]>
(cherry picked from commit 39f4ccdc2c9fe60e9a35bcced81bf915fa3df294)
---
.../apache/pulsar/broker/service/persistent/PersistentTopic.java | 8 ++++++--
.../org/apache/pulsar/broker/service/OneWayReplicatorTest.java | 4 ++--
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 710b366a4b7..afa9dc1f7a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1975,9 +1975,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
final CompletableFuture<Void> future = new CompletableFuture<>();
String name = PersistentReplicator.getReplicatorName(replicatorPrefix,
remoteCluster);
+ String replicationStartAt =
getBrokerService().getPulsar().getConfiguration().getReplicationStartAt();
final InitialPosition initialPosition;
- if (MessageId.earliest.toString()
-
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt()))
{
+ // "MessageId.earliest.toString()" is "-1:-1:-1", which is not
suggested, just guarantee compatibility with the
+ // previous version.
+ // "InitialPosition.Earliest.name()" is "Earliest", which is suggested.
+ if (MessageId.earliest.toString().equalsIgnoreCase(replicationStartAt)
+ ||
InitialPosition.Earliest.name().equalsIgnoreCase(replicationStartAt)) {
initialPosition = InitialPosition.Earliest;
} else {
initialPosition = InitialPosition.Latest;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 7c3e18a9860..2a4c61eb8eb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -993,9 +993,9 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
disableReplication(topic1);
// 2.Update config: start at "earliest".
- admin1.brokers().updateDynamicConfiguration("replicationStartAt",
MessageId.earliest.toString());
+ admin1.brokers().updateDynamicConfiguration("replicationStartAt",
"earliest");
Awaitility.await().untilAsserted(() -> {
-
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
+ assertEquals(pulsar1.getConfiguration().getReplicationStartAt(),
"earliest");
});
final String topic2 = BrokerTestUtil.newUniqueName("persistent://" +
ns1 + "/tp_");