This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2e2cd4480d6 handle NPE when getLeader returns null (#15058)
2e2cd4480d6 is described below
commit 2e2cd4480d6ee7c6183826686a93a6e155ac3b7b
Author: Rui Fu <[email protected]>
AuthorDate: Fri Apr 8 10:07:36 2022 +0800
handle NPE when getLeader returns null (#15058)
---
.../java/org/apache/pulsar/functions/worker/LeaderService.java | 4 ++--
.../org/apache/pulsar/functions/worker/PulsarWorkerService.java | 4 ++--
.../java/org/apache/pulsar/functions/worker/WorkerUtils.java | 9 +++++++++
.../apache/pulsar/functions/worker/rest/api/FunctionsImpl.java | 2 +-
.../org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java | 6 ++++++
5 files changed, 20 insertions(+), 5 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index bf406f4592b..132441b1be2 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -104,8 +104,8 @@ public class LeaderService implements AutoCloseable,
ConsumerEventListener {
// attempt to acquire exclusive publishers to both the
metadata topic and assignments topic
// we should keep trying to acquire exclusive producers as
long as we are still the leader
- Supplier<Boolean> checkIsStillLeader = () -> membershipManager
-
.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
+ Supplier<Boolean> checkIsStillLeader =
WorkerUtils.getIsStillLeaderSupplier(membershipManager,
+ workerConfig.getWorkerId());
Producer<byte[]> scheduleManagerExclusiveProducer = null;
Producer<byte[]> functionMetaDataManagerExclusiveProducer =
null;
try {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 730e5af4751..7d3f505b35a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -507,8 +507,8 @@ public class PulsarWorkerService implements WorkerService {
log.info("/** Initializing Runtime Manager **/");
MessageId lastAssignmentMessageId =
functionRuntimeManager.initialize();
- Supplier<Boolean> checkIsStillLeader =
- () ->
membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
+ Supplier<Boolean> checkIsStillLeader =
WorkerUtils.getIsStillLeaderSupplier(membershipManager,
+ workerConfig.getWorkerId());
// Setting references to managers in scheduler
schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 8519f0ca9e4..aa8ba57acdb 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -403,4 +404,12 @@ public final class WorkerUtils {
public static class NotLeaderAnymore extends Exception {
}
+
+ public static Supplier<Boolean> getIsStillLeaderSupplier(final
MembershipManager membershipManager,
+ final String
workerId) {
+ return () -> {
+ WorkerInfo workerInfo = membershipManager.getLeader();
+ return workerInfo != null &&
workerInfo.getWorkerId().equals(workerId);
+ };
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 63b1b332684..168596d1a11 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -747,7 +747,7 @@ public class FunctionsImpl extends ComponentImpl implements
Functions<PulsarWork
// Redirect if we are not the leader
if (!worker().getLeaderService().isLeader()) {
WorkerInfo workerInfo =
worker().getMembershipManager().getLeader();
- if
(workerInfo.getWorkerId().equals(worker().getWorkerConfig().getWorkerId())) {
+ if (workerInfo == null ||
workerInfo.getWorkerId().equals(worker().getWorkerConfig().getWorkerId())) {
throw new RestException(Response.Status.SERVICE_UNAVAILABLE,
"Leader not yet ready. Please retry again");
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 012b16863dd..7eef8ae7894 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -229,6 +229,9 @@ public class WorkerImpl implements
Workers<PulsarWorkerService> {
}
} else {
WorkerInfo workerInfo =
worker().getMembershipManager().getLeader();
+ if (workerInfo == null) {
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, "Leader
cannot be determined");
+ }
URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
@@ -337,6 +340,9 @@ public class WorkerImpl implements
Workers<PulsarWorkerService> {
// Use the leader-URI path in both cases for the redirect to the
leader.
String leaderPath = "admin/v2/worker/leader/drain";
WorkerInfo workerInfo = worker().getMembershipManager().getLeader();
+ if (workerInfo == null) {
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, "Leader
cannot be determined");
+ }
URI redirect = UriBuilder.fromUri(uri)
.host(workerInfo.getWorkerHostname())
.port(workerInfo.getPort())