This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9b403f1a6b9e083671855a929401e64df6c7b8d9
Author: Rui Fu <[email protected]>
AuthorDate: Fri Apr 8 10:07:36 2022 +0800

    handle NPE when getLeader returns null (#15058)
    
    (cherry picked from commit 2e2cd4480d6ee7c6183826686a93a6e155ac3b7b)
---
 .../java/org/apache/pulsar/functions/worker/LeaderService.java |  3 ++-
 .../apache/pulsar/functions/worker/PulsarWorkerService.java    |  3 ++-
 .../java/org/apache/pulsar/functions/worker/WorkerUtils.java   | 10 ++++++++++
 .../apache/pulsar/functions/worker/rest/api/FunctionsImpl.java |  2 +-
 .../apache/pulsar/functions/worker/rest/api/WorkerImpl.java    |  6 +++++-
 5 files changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index fb11fab785b..645cdcb2bae 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -103,7 +103,8 @@ public class LeaderService implements AutoCloseable, 
ConsumerEventListener {
 
                 // attempt to acquire exclusive publishers to both the 
metadata topic and assignments topic
                 // we should keep trying to acquire exclusive producers as 
long as we are still the leader
-                Supplier<Boolean> checkIsStillLeader = () -> 
membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
+                Supplier<Boolean> checkIsStillLeader = 
WorkerUtils.getIsStillLeaderSupplier(membershipManager,
+                        workerConfig.getWorkerId());
                 Producer<byte[]> scheduleManagerExclusiveProducer = null;
                 Producer<byte[]> functionMetaDataManagerExclusiveProducer = 
null;
                 try {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 93d426391b5..8eccb9d2751 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -512,7 +512,8 @@ public class PulsarWorkerService implements WorkerService {
             log.info("/** Initializing Runtime Manager **/");
 
             MessageId lastAssignmentMessageId = 
functionRuntimeManager.initialize();
-            Supplier<Boolean> checkIsStillLeader = () -> 
membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId());
+            Supplier<Boolean> checkIsStillLeader = 
WorkerUtils.getIsStillLeaderSupplier(membershipManager,
+                    workerConfig.getWorkerId());
 
             // Setting references to managers in scheduler
             
schedulerManager.setFunctionMetaDataManager(functionMetaDataManager);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 4f0287f19ff..52065f60242 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -41,6 +41,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
 import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
 import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -368,4 +370,12 @@ public final class WorkerUtils {
     public static class NotLeaderAnymore extends Exception {
 
     }
+
+    public static Supplier<Boolean> getIsStillLeaderSupplier(final 
MembershipManager membershipManager,
+                                                             final String 
workerId) {
+        return () -> {
+            WorkerInfo workerInfo = membershipManager.getLeader();
+            return workerInfo != null && 
workerInfo.getWorkerId().equals(workerId);
+        };
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index b2d0f36787e..d9829e6471d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -704,7 +704,7 @@ public class FunctionsImpl extends ComponentImpl implements 
Functions<PulsarWork
         // Redirect if we are not the leader
         if (!worker().getLeaderService().isLeader()) {
             WorkerInfo workerInfo = 
worker().getMembershipManager().getLeader();
-            if 
(workerInfo.getWorkerId().equals(worker().getWorkerConfig().getWorkerId())) {
+            if (workerInfo == null || 
workerInfo.getWorkerId().equals(worker().getWorkerConfig().getWorkerId())) {
                 throw new RestException(Response.Status.SERVICE_UNAVAILABLE,
                         "Leader not yet ready. Please retry again");
             }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 0d0a9f83621..2f80d1d6782 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -229,7 +229,11 @@ public class WorkerImpl implements 
Workers<PulsarWorkerService> {
             }
         } else {
             WorkerInfo workerInfo = 
worker().getMembershipManager().getLeader();
-            URI redirect = 
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+            if (workerInfo == null) {
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Leader 
cannot be determined");
+            }
+            URI redirect =
+                    
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
             throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
         }
     }

Reply via email to