This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 206ea82 Fix NPEs in PersistentReplicator (#9763)
206ea82 is described below
commit 206ea82e012af34c5bb7f824826205019f3dd4ff
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Mar 1 22:00:38 2021 +0200
Fix NPEs in PersistentReplicator (#9763)
---
.../service/persistent/PersistentReplicator.java | 22 +++++++++++++---------
1 file changed, 13 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 2d1a60f..e1f6949 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -71,7 +71,7 @@ public class PersistentReplicator extends AbstractReplicator
private final PersistentTopic topic;
private final String replicatorName;
private final ManagedLedger ledger;
- protected ManagedCursor cursor;
+ protected volatile ManagedCursor cursor;
private Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
@@ -190,12 +190,14 @@ public class PersistentReplicator extends
AbstractReplicator
@Override
protected void disableReplicatorRead() {
- // deactivate cursor after successfully close the producer
- this.cursor.setInactive();
+ if (this.cursor != null) {
+ // deactivate cursor after successfully close the producer
+ this.cursor.setInactive();
+ }
}
@Override
- protected synchronized CompletableFuture<Void> openCursorAsync() {
+ protected CompletableFuture<Void> openCursorAsync() {
log.info("[{}][{} -> {}] Starting open cursor for replicator",
topicName, localCluster, remoteCluster);
if (cursor != null) {
log.info("[{}][{} -> {}] Using the exists cursor for replicator",
topicName, localCluster, remoteCluster);
@@ -441,10 +443,12 @@ public class PersistentReplicator extends
AbstractReplicator
}
public void updateCursorState() {
- if (producer != null && producer.isConnected()) {
- this.cursor.setActive();
- } else {
- this.cursor.setInactive();
+ if (this.cursor != null) {
+ if (producer != null && producer.isConnected()) {
+ this.cursor.setActive();
+ } else {
+ this.cursor.setInactive();
+ }
}
}
@@ -688,7 +692,7 @@ public class PersistentReplicator extends AbstractReplicator
}
public ReplicatorStats getStats() {
- stats.replicationBacklog = cursor.getNumberOfEntriesInBacklog(false);
+ stats.replicationBacklog = cursor != null ?
cursor.getNumberOfEntriesInBacklog(false) : 0;
stats.connected = producer != null && producer.isConnected();
stats.replicationDelayInSeconds = getReplicationDelayInSeconds();