This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2618249b583 [fix][test] Fix flaky
ReplicatorTest.testReplicatorClearBacklog NPE on inFlightTask (#25691)
2618249b583 is described below
commit 2618249b583f4fa914c4d68682527e3c04de241e
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 19:18:15 2026 -0700
[fix][test] Fix flaky ReplicatorTest.testReplicatorClearBacklog NPE on
inFlightTask (#25691)
---
.../java/org/apache/pulsar/broker/service/ReplicatorTest.java | 9 +++++++--
.../persistent/BrokerServicePersistInternalMethodInvoker.java | 5 +++++
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 7f699419187..9753e68dfb1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
+import static
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.newInFlightTaskCtx;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleGaugeValue;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.mockito.ArgumentMatchers.eq;
@@ -663,7 +664,9 @@ public class ReplicatorTest extends ReplicatorTestBase {
PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
PersistentReplicator replicator = (PersistentReplicator) spy(
topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0)));
- replicator.readEntriesFailed(new
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
+ Object inFlightTaskCtx = newInFlightTaskCtx(replicator,
PositionFactory.create(1, 1), 1);
+ replicator.readEntriesFailed(new
ManagedLedgerException.InvalidCursorPositionException("failed"),
+ inFlightTaskCtx);
replicator.clearBacklog().get();
Thread.sleep(100);
replicator.updateRates(); // for code-coverage
@@ -693,7 +696,9 @@ public class ReplicatorTest extends ReplicatorTestBase {
PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
PersistentReplicator replicator = (PersistentReplicator) spy(
topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0)));
- replicator.readEntriesFailed(new
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
+ Object inFlightTaskCtx = newInFlightTaskCtx(replicator,
PositionFactory.create(1, 1), 1);
+ replicator.readEntriesFailed(new
ManagedLedgerException.InvalidCursorPositionException("failed"),
+ inFlightTaskCtx);
replicator.clearBacklog().get();
Thread.sleep(100);
replicator.updateRates(); // for code-coverage
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
index 8c26aece764..c52b9c6896b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.Position;
import org.awaitility.Awaitility;
public class BrokerServicePersistInternalMethodInvoker {
@@ -42,4 +43,8 @@ public class BrokerServicePersistInternalMethodInvoker {
return true;
});
}
+
+ public static Object newInFlightTaskCtx(PersistentReplicator replicator,
Position readPos, int readingEntries) {
+ return new PersistentReplicator.InFlightTask(readPos, readingEntries,
replicator.getReplicatorId());
+ }
}