merlimat closed pull request #2234: add stats entry for active consumer name
and endpoint to get leader of functions cluster
URL: https://github.com/apache/incubator-pulsar/pull/2234
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index f97f1807c9..16d82525c0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -213,6 +213,20 @@ public Response getCluster() {
return functions.getCluster();
}
+ @GET
+ @ApiOperation(
+ value = "Fetches info about the leader node of the Pulsar cluster
running Pulsar Functions",
+ response = WorkerInfo.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
+
+ })
+ @Path("/cluster/leader")
+ public WorkerInfo getClusterLeader() {
+ return functions.getClusterLeader();
+ }
+
@GET
@ApiOperation(
value = "Fetches information about which Pulsar Functions are
assigned to which Pulsar clusters",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8a77bfacb5..6092a8663a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -630,6 +630,12 @@ public SubscriptionStats getStats() {
}
subStats.type = getType();
+ if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+ Consumer activeConsumer =
((PersistentDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer();
+ if (activeConsumer != null) {
+ subStats.activeConsumerName = activeConsumer.consumerName();
+ }
+ }
if (SubType.Shared.equals(subStats.type)) {
if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
subStats.unackedMessages =
((PersistentDispatcherMultipleConsumers) dispatcher)
@@ -640,6 +646,7 @@ public SubscriptionStats getStats() {
}
subStats.msgBacklog = getNumberOfEntriesInBacklog();
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
+
return subStats;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 4e34950e37..f141d4a6d2 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -47,9 +47,12 @@
/** Number of unacknowledged messages for the subscription */
public long unackedMessages;
- /** whether this subscription is Exclusive or Shared or Failover */
+ /** Whether this subscription is Exclusive or Shared or Failover */
public SubType type;
+ /** The name of the consumer that is active for single active consumer
subscriptions i.e. failover or exclusive */
+ public String activeConsumerName;
+
/** Total rate of messages expired on this subscription. msg/s */
public double msgRateExpired;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 45e5049cb6..7994ef33d4 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -132,6 +132,31 @@ public boolean isLeader() {
return workerIds;
}
+ public WorkerInfo getLeader() {
+ TopicStats topicStats = null;
+ PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
+ try {
+ topicStats =
pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
+ } catch (PulsarAdminException e) {
+ log.error("Failed to get status of coordinate topic {}",
+ this.workerConfig.getClusterCoordinationTopic(), e);
+ throw new RuntimeException(e);
+ }
+
+ String activeConsumerName =
topicStats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName;
+ WorkerInfo leader = null;
+ for (ConsumerStats consumerStats : topicStats.subscriptions
+ .get(COORDINATION_TOPIC_SUBSCRIPTION).consumers) {
+ if (consumerStats.consumerName.equals(activeConsumerName)) {
+ leader =
WorkerInfo.parseFrom(consumerStats.metadata.get(WORKER_IDENTIFIER));
+ }
+ }
+ if (leader == null) {
+ log.warn("Failed to determine leader in functions cluster");
+ }
+ return leader;
+ }
+
@Override
public void close() throws PulsarClientException {
consumer.close();
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index b935bf5f58..9b0ad98b27 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -508,6 +508,24 @@ public Response getCluster() {
return Response.status(Status.OK).entity(new
Gson().toJson(members)).build();
}
+ public WorkerInfo getClusterLeader() {
+ if (!isWorkerServiceAvailable()) {
+ throw new WebApplicationException(
+
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("Function worker service is
not avaialable")).build());
+ }
+
+ MembershipManager membershipManager = worker().getMembershipManager();
+ WorkerInfo leader = membershipManager.getLeader();
+
+ if (leader == null) {
+ throw new WebApplicationException(
+
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("Leader cannot be
determined")).build());}
+
+ return leader;
+ }
+
public Response getAssignments() {
if (!isWorkerServiceAvailable()) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 92957b32d4..a44284aa6f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -135,6 +135,13 @@ public Response getCluster() {
return functions.getCluster();
}
+ @GET
+ @Path("/cluster/leader")
+ @Produces(MediaType.APPLICATION_JSON)
+ public WorkerInfo getClusterLeader() {
+ return functions.getClusterLeader();
+ }
+
@GET
@Path("/assignments")
public Response getAssignments() {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services