This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9b403f1a6b9e083671855a929401e64df6c7b8d9 Author: Rui Fu <[email protected]> AuthorDate: Fri Apr 8 10:07:36 2022 +0800 handle NPE when getLeader returns null (#15058) (cherry picked from commit 2e2cd4480d6ee7c6183826686a93a6e155ac3b7b) --- .../java/org/apache/pulsar/functions/worker/LeaderService.java | 3 ++- .../apache/pulsar/functions/worker/PulsarWorkerService.java | 3 ++- .../java/org/apache/pulsar/functions/worker/WorkerUtils.java | 10 ++++++++++ .../apache/pulsar/functions/worker/rest/api/FunctionsImpl.java | 2 +- .../apache/pulsar/functions/worker/rest/api/WorkerImpl.java | 6 +++++- 5 files changed, 20 insertions(+), 4 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java index fb11fab785b..645cdcb2bae 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java @@ -103,7 +103,8 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener { // attempt to acquire exclusive publishers to both the metadata topic and assignments topic // we should keep trying to acquire exclusive producers as long as we are still the leader - Supplier<Boolean> checkIsStillLeader = () -> membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId()); + Supplier<Boolean> checkIsStillLeader = WorkerUtils.getIsStillLeaderSupplier(membershipManager, + workerConfig.getWorkerId()); Producer<byte[]> scheduleManagerExclusiveProducer = null; Producer<byte[]> functionMetaDataManagerExclusiveProducer = null; try { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java index 93d426391b5..8eccb9d2751 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java @@ -512,7 +512,8 @@ public class PulsarWorkerService implements WorkerService { log.info("/** Initializing Runtime Manager **/"); MessageId lastAssignmentMessageId = functionRuntimeManager.initialize(); - Supplier<Boolean> checkIsStillLeader = () -> membershipManager.getLeader().getWorkerId().equals(workerConfig.getWorkerId()); + Supplier<Boolean> checkIsStillLeader = WorkerUtils.getIsStillLeaderSupplier(membershipManager, + workerConfig.getWorkerId()); // Setting references to managers in scheduler schedulerManager.setFunctionMetaDataManager(functionMetaDataManager); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java index 4f0287f19ff..52065f60242 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java @@ -41,6 +41,8 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.functions.WorkerInfo; +import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl; import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl; import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl; import org.apache.pulsar.functions.proto.InstanceCommunication; @@ -368,4 +370,12 @@ public final class WorkerUtils { public static class NotLeaderAnymore extends Exception { } + + public static Supplier<Boolean> getIsStillLeaderSupplier(final MembershipManager membershipManager, + final String workerId) { + return () -> { + WorkerInfo workerInfo = membershipManager.getLeader(); + return workerInfo != null && workerInfo.getWorkerId().equals(workerId); + }; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index b2d0f36787e..d9829e6471d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -704,7 +704,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork // Redirect if we are not the leader if (!worker().getLeaderService().isLeader()) { WorkerInfo workerInfo = worker().getMembershipManager().getLeader(); - if (workerInfo.getWorkerId().equals(worker().getWorkerConfig().getWorkerId())) { + if (workerInfo == null || workerInfo.getWorkerId().equals(worker().getWorkerConfig().getWorkerId())) { throw new RestException(Response.Status.SERVICE_UNAVAILABLE, "Leader not yet ready. Please retry again"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java index 0d0a9f83621..2f80d1d6782 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -229,7 +229,11 @@ public class WorkerImpl implements Workers<PulsarWorkerService> { } } else { WorkerInfo workerInfo = worker().getMembershipManager().getLeader(); - URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(); + if (workerInfo == null) { + throw new RestException(Status.INTERNAL_SERVER_ERROR, "Leader cannot be determined"); + } + URI redirect = + UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(); throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); } }
