This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 10ec02e add stats entry for active consumer name and endpoint to get
leader of functions cluster (#2234)
10ec02e is described below
commit 10ec02eabe0670aab7d60d49e6d242469a8a11f0
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Thu Jul 26 15:51:53 2018 -0700
add stats entry for active consumer name and endpoint to get leader of
functions cluster (#2234)
* add stats entry for active consumer name and endpoint to get leader of
functions cluster
* fixing bug in stats
* removing line
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 14 ++++++++++++
.../service/persistent/PersistentSubscription.java | 7 ++++++
.../common/policies/data/SubscriptionStats.java | 5 ++++-
.../pulsar/functions/worker/MembershipManager.java | 25 ++++++++++++++++++++++
.../functions/worker/rest/api/FunctionsImpl.java | 18 ++++++++++++++++
.../worker/rest/api/v2/FunctionApiV2Resource.java | 7 ++++++
6 files changed, 75 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index f97f180..16d8252 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -215,6 +215,20 @@ public class FunctionsBase extends AdminResource
implements Supplier<WorkerServi
@GET
@ApiOperation(
+ value = "Fetches info about the leader node of the Pulsar cluster
running Pulsar Functions",
+ response = WorkerInfo.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
+
+ })
+ @Path("/cluster/leader")
+ public WorkerInfo getClusterLeader() {
+ return functions.getClusterLeader();
+ }
+
+ @GET
+ @ApiOperation(
value = "Fetches information about which Pulsar Functions are
assigned to which Pulsar clusters",
response = Assignment.class,
responseContainer = "Map"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8a77bfa..6092a86 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -630,6 +630,12 @@ public class PersistentSubscription implements
Subscription {
}
subStats.type = getType();
+ if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+ Consumer activeConsumer =
((PersistentDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer();
+ if (activeConsumer != null) {
+ subStats.activeConsumerName = activeConsumer.consumerName();
+ }
+ }
if (SubType.Shared.equals(subStats.type)) {
if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
subStats.unackedMessages =
((PersistentDispatcherMultipleConsumers) dispatcher)
@@ -640,6 +646,7 @@ public class PersistentSubscription implements Subscription
{
}
subStats.msgBacklog = getNumberOfEntriesInBacklog();
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
+
return subStats;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 4e34950..f141d4a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -47,9 +47,12 @@ public class SubscriptionStats {
/** Number of unacknowledged messages for the subscription */
public long unackedMessages;
- /** whether this subscription is Exclusive or Shared or Failover */
+ /** Whether this subscription is Exclusive or Shared or Failover */
public SubType type;
+ /** The name of the consumer that is active for single active consumer
subscriptions i.e. failover or exclusive */
+ public String activeConsumerName;
+
/** Total rate of messages expired on this subscription. msg/s */
public double msgRateExpired;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 45e5049..7994ef3 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -132,6 +132,31 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
return workerIds;
}
+ public WorkerInfo getLeader() {
+ TopicStats topicStats = null;
+ PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
+ try {
+ topicStats =
pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
+ } catch (PulsarAdminException e) {
+ log.error("Failed to get status of coordinate topic {}",
+ this.workerConfig.getClusterCoordinationTopic(), e);
+ throw new RuntimeException(e);
+ }
+
+ String activeConsumerName =
topicStats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName;
+ WorkerInfo leader = null;
+ for (ConsumerStats consumerStats : topicStats.subscriptions
+ .get(COORDINATION_TOPIC_SUBSCRIPTION).consumers) {
+ if (consumerStats.consumerName.equals(activeConsumerName)) {
+ leader =
WorkerInfo.parseFrom(consumerStats.metadata.get(WORKER_IDENTIFIER));
+ }
+ }
+ if (leader == null) {
+ log.warn("Failed to determine leader in functions cluster");
+ }
+ return leader;
+ }
+
@Override
public void close() throws PulsarClientException {
consumer.close();
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index b935bf5..9b0ad98 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -508,6 +508,24 @@ public class FunctionsImpl {
return Response.status(Status.OK).entity(new
Gson().toJson(members)).build();
}
+ public WorkerInfo getClusterLeader() {
+ if (!isWorkerServiceAvailable()) {
+ throw new WebApplicationException(
+
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("Function worker service is
not avaialable")).build());
+ }
+
+ MembershipManager membershipManager = worker().getMembershipManager();
+ WorkerInfo leader = membershipManager.getLeader();
+
+ if (leader == null) {
+ throw new WebApplicationException(
+
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("Leader cannot be
determined")).build());}
+
+ return leader;
+ }
+
public Response getAssignments() {
if (!isWorkerServiceAvailable()) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 92957b3..a44284a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -136,6 +136,13 @@ public class FunctionApiV2Resource extends
FunctionApiResource {
}
@GET
+ @Path("/cluster/leader")
+ @Produces(MediaType.APPLICATION_JSON)
+ public WorkerInfo getClusterLeader() {
+ return functions.getClusterLeader();
+ }
+
+ @GET
@Path("/assignments")
public Response getAssignments() {
return functions.getAssignments();