This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e2cd4480d6 handle NPE when getLeader returns null (#15058)
2e2cd4480d6 is described below

commit 2e2cd4480d6ee7c6183826686a93a6e155ac3b7b
Author: Rui Fu <[email protected]>
AuthorDate: Fri Apr 8 10:07:36 2022 +0800

    handle NPE when getLeader returns null (#15058)
---
 .../java/org/apache/pulsar/functions/worker/LeaderService.java   | 4 ++--
 .../org/apache/pulsar/functions/worker/PulsarWorkerService.java  | 4 ++--
 .../java/org/apache/pulsar/functions/worker/WorkerUtils.java     | 9 +++++++++
 .../apache/pulsar/functions/worker/rest/api/FunctionsImpl.java   | 2 +-
 .../org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java  | 6 ++++++
 5 files changed, 20 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index bf406f4592b..132441b1be2 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -104,8 +104,8 @@ public class LeaderService implements AutoCloseable, 
ConsumerEventListener {
 
                 // attempt to acquire exclusive publishers to both the 
metadata topic and assignments topic
                 // we should keep trying to acquire exclusive producers as 
long as we are still the leader
-                Supplier<Boolean> checkIsStillLeader = () -> membershipManager
-                        
.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
+                Supplier<Boolean> checkIsStillLeader = 
WorkerUtils.getIsStillLeaderSupplier(membershipManager,
+                        workerConfig.getWorkerId());
                 Producer<byte[]> scheduleManagerExclusiveProducer = null;
                 Producer<byte[]> functionMetaDataManagerExclusiveProducer = 
null;
                 try {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 730e5af4751..7d3f505b35a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -507,8 +507,8 @@ public class PulsarWorkerService implements WorkerService {
             log.info("/** Initializing Runtime Manager **/");
 
             MessageId lastAssignmentMessageId = 
functionRuntimeManager.initialize();
-            Supplier<Boolean> checkIsStillLeader =
-                    () -> 
membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
+            Supplier<Boolean> checkIsStillLeader = 
WorkerUtils.getIsStillLeaderSupplier(membershipManager,
+                    workerConfig.getWorkerId());
 
             // Setting references to managers in scheduler
             
schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 8519f0ca9e4..aa8ba57acdb 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
 import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -403,4 +404,12 @@ public final class WorkerUtils {
     public static class NotLeaderAnymore extends Exception {
 
     }
+
+    public static Supplier<Boolean> getIsStillLeaderSupplier(final 
MembershipManager membershipManager,
+                                                             final String 
workerId) {
+        return () -> {
+            WorkerInfo workerInfo = membershipManager.getLeader();
+            return workerInfo != null && 
workerInfo.getWorkerId().equals(workerId);
+        };
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 63b1b332684..168596d1a11 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -747,7 +747,7 @@ public class FunctionsImpl extends ComponentImpl implements 
Functions<PulsarWork
         // Redirect if we are not the leader
         if (!worker().getLeaderService().isLeader()) {
             WorkerInfo workerInfo = 
worker().getMembershipManager().getLeader();
-            if 
(workerInfo.getWorkerId().equals(worker().getWorkerConfig().getWorkerId())) {
+            if (workerInfo == null || 
workerInfo.getWorkerId().equals(worker().getWorkerConfig().getWorkerId())) {
                 throw new RestException(Response.Status.SERVICE_UNAVAILABLE,
                         "Leader not yet ready. Please retry again");
             }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 012b16863dd..7eef8ae7894 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -229,6 +229,9 @@ public class WorkerImpl implements 
Workers<PulsarWorkerService> {
             }
         } else {
             WorkerInfo workerInfo = 
worker().getMembershipManager().getLeader();
+            if (workerInfo == null) {
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Leader 
cannot be determined");
+            }
             URI redirect =
                     
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
             throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
@@ -337,6 +340,9 @@ public class WorkerImpl implements 
Workers<PulsarWorkerService> {
         // Use the leader-URI path in both cases for the redirect to the 
leader.
         String leaderPath = "admin/v2/worker/leader/drain";
         WorkerInfo workerInfo = worker().getMembershipManager().getLeader();
+        if (workerInfo == null) {
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, "Leader 
cannot be determined");
+        }
         URI redirect = UriBuilder.fromUri(uri)
                 .host(workerInfo.getWorkerHostname())
                 .port(workerInfo.getPort())

Reply via email to