This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 10ec02e  add stats entry for active consumer name and endpoint to get 
leader of functions cluster (#2234)
10ec02e is described below

commit 10ec02eabe0670aab7d60d49e6d242469a8a11f0
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Thu Jul 26 15:51:53 2018 -0700

    add stats entry for active consumer name and endpoint to get leader of 
functions cluster (#2234)
    
    * add stats entry for active consumer name and endpoint to get leader of 
functions cluster
    
    * fixing bug in stats
    
    * removing line
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    | 14 ++++++++++++
 .../service/persistent/PersistentSubscription.java |  7 ++++++
 .../common/policies/data/SubscriptionStats.java    |  5 ++++-
 .../pulsar/functions/worker/MembershipManager.java | 25 ++++++++++++++++++++++
 .../functions/worker/rest/api/FunctionsImpl.java   | 18 ++++++++++++++++
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  7 ++++++
 6 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index f97f180..16d8252 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -215,6 +215,20 @@ public class FunctionsBase extends AdminResource 
implements Supplier<WorkerServi
 
     @GET
     @ApiOperation(
+            value = "Fetches info about the leader node of the Pulsar cluster 
running Pulsar Functions",
+            response = WorkerInfo.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+
+    })
+    @Path("/cluster/leader")
+    public WorkerInfo getClusterLeader() {
+        return functions.getClusterLeader();
+    }
+
+    @GET
+    @ApiOperation(
             value = "Fetches information about which Pulsar Functions are 
assigned to which Pulsar clusters",
             response = Assignment.class,
             responseContainer = "Map"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8a77bfa..6092a86 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -630,6 +630,12 @@ public class PersistentSubscription implements 
Subscription {
         }
 
         subStats.type = getType();
+        if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            Consumer activeConsumer = 
((PersistentDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer();
+            if (activeConsumer != null) {
+                subStats.activeConsumerName = activeConsumer.consumerName();
+            }
+        }
         if (SubType.Shared.equals(subStats.type)) {
             if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
                 subStats.unackedMessages = 
((PersistentDispatcherMultipleConsumers) dispatcher)
@@ -640,6 +646,7 @@ public class PersistentSubscription implements Subscription 
{
         }
         subStats.msgBacklog = getNumberOfEntriesInBacklog();
         subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
+
         return subStats;
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 4e34950..f141d4a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -47,9 +47,12 @@ public class SubscriptionStats {
     /** Number of unacknowledged messages for the subscription */
     public long unackedMessages;
 
-    /** whether this subscription is Exclusive or Shared or Failover */
+    /** Whether this subscription is Exclusive or Shared or Failover */
     public SubType type;
 
+    /** The name of the consumer that is active for single active consumer 
subscriptions i.e. failover or exclusive */
+    public String activeConsumerName;
+
     /** Total rate of messages expired on this subscription. msg/s */
     public double msgRateExpired;
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 45e5049..7994ef3 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -132,6 +132,31 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
         return workerIds;
     }
 
+    public WorkerInfo getLeader() {
+        TopicStats topicStats = null;
+        PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
+        try {
+            topicStats = 
pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
+        } catch (PulsarAdminException e) {
+            log.error("Failed to get status of coordinate topic {}",
+                    this.workerConfig.getClusterCoordinationTopic(), e);
+            throw new RuntimeException(e);
+        }
+
+        String activeConsumerName = 
topicStats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName;
+        WorkerInfo leader = null;
+        for (ConsumerStats consumerStats : topicStats.subscriptions
+                .get(COORDINATION_TOPIC_SUBSCRIPTION).consumers) {
+            if (consumerStats.consumerName.equals(activeConsumerName)) {
+                leader = 
WorkerInfo.parseFrom(consumerStats.metadata.get(WORKER_IDENTIFIER));
+            }
+        }
+        if (leader == null) {
+            log.warn("Failed to determine leader in functions cluster");
+        }
+        return leader;
+    }
+
     @Override
     public void close() throws PulsarClientException {
         consumer.close();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index b935bf5..9b0ad98 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -508,6 +508,24 @@ public class FunctionsImpl {
         return Response.status(Status.OK).entity(new 
Gson().toJson(members)).build();
     }
 
+    public WorkerInfo getClusterLeader() {
+        if (!isWorkerServiceAvailable()) {
+            throw new WebApplicationException(
+                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
+        }
+
+        MembershipManager membershipManager = worker().getMembershipManager();
+        WorkerInfo leader = membershipManager.getLeader();
+
+        if (leader == null) {
+            throw new WebApplicationException(
+                    
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                            .entity(new ErrorData("Leader cannot be 
determined")).build());}
+
+        return leader;
+    }
+
     public Response getAssignments() {
 
         if (!isWorkerServiceAvailable()) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 92957b3..a44284a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -136,6 +136,13 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
     }
 
     @GET
+    @Path("/cluster/leader")
+    @Produces(MediaType.APPLICATION_JSON)
+    public WorkerInfo getClusterLeader() {
+        return functions.getClusterLeader();
+    }
+
+    @GET
     @Path("/assignments")
     public Response getAssignments() {
         return functions.getAssignments();

Reply via email to