This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 39f4ccdc2c9 [fix] [broker] Fix config replicationStartAt does not work
when set it to earliest (#23719)
39f4ccdc2c9 is described below
commit 39f4ccdc2c9fe60e9a35bcced81bf915fa3df294
Author: fengyubiao <[email protected]>
AuthorDate: Fri Dec 13 09:42:17 2024 +0800
[fix] [broker] Fix config replicationStartAt does not work when set it to
earliest (#23719)
Co-authored-by: Lari Hotari <[email protected]>
---
.../apache/pulsar/broker/service/persistent/PersistentTopic.java | 8 ++++++--
.../org/apache/pulsar/broker/service/OneWayReplicatorTest.java | 4 ++--
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index eb48ceee72d..0b2c9d8c7bc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2066,9 +2066,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
final CompletableFuture<Void> future = new CompletableFuture<>();
String name = PersistentReplicator.getReplicatorName(replicatorPrefix,
remoteCluster);
+ String replicationStartAt =
getBrokerService().getPulsar().getConfiguration().getReplicationStartAt();
final InitialPosition initialPosition;
- if (MessageId.earliest.toString()
-
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt()))
{
+ // "MessageId.earliest.toString()" is "-1:-1:-1", which is not
suggested, just guarantee compatibility with the
+ // previous version.
+ // "InitialPosition.Earliest.name()" is "Earliest", which is suggested.
+ if (MessageId.earliest.toString().equalsIgnoreCase(replicationStartAt)
+ ||
InitialPosition.Earliest.name().equalsIgnoreCase(replicationStartAt)) {
initialPosition = InitialPosition.Earliest;
} else {
initialPosition = InitialPosition.Latest;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index a8f8d7ecbbd..d3356a60695 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -1023,9 +1023,9 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
disableReplication(topic1);
// 2.Update config: start at "earliest".
- admin1.brokers().updateDynamicConfiguration("replicationStartAt",
MessageId.earliest.toString());
+ admin1.brokers().updateDynamicConfiguration("replicationStartAt",
"earliest");
Awaitility.await().untilAsserted(() -> {
-
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
+ assertEquals(pulsar1.getConfiguration().getReplicationStartAt(),
"earliest");
});
final String topic2 = BrokerTestUtil.newUniqueName("persistent://" +
ns1 + "/tp_");