Copilot commented on code in PR #24991:
URL: https://github.com/apache/pulsar/pull/24991#discussion_r2657790731
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5466,4 +5466,86 @@ private static Long getIndexFromEntry(Entry entry) {
}
});
}
+
+ protected CompletableFuture<Void>
internalSetCustomMetricLabels(Map<String, String> labels, boolean isGlobal) {
+ // Feature flag check
+ if
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Custom topic metric labels feature is disabled"));
+ }
+
+ // Validate labels against allowed keys and value length
+ Set<String> allowedKeys = getAllowedCustomMetricLabelKeys();
+ int maxValueLength =
pulsar().getConfiguration().getMaxCustomMetricLabelValueLength();
+
+ if (labels != null && !labels.isEmpty()) {
+ for (Map.Entry<String, String> entry : labels.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ // Check if key is allowed
+ if (allowedKeys.isEmpty() || !allowedKeys.contains(key)) {
Review Comment:
The validation logic has a flaw: when allowedKeys is empty AND the feature
is enabled, all keys will be rejected. The condition should be 'if
(!allowedKeys.isEmpty() && !allowedKeys.contains(key))' to only validate when
there are configured allowed keys, otherwise the feature cannot be used at all.
```suggestion
// Check if key is allowed (only when there are configured
allowed keys)
if (!allowedKeys.isEmpty() && !allowedKeys.contains(key)) {
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5466,4 +5466,86 @@ private static Long getIndexFromEntry(Entry entry) {
}
});
}
+
+ protected CompletableFuture<Void>
internalSetCustomMetricLabels(Map<String, String> labels, boolean isGlobal) {
+ // Feature flag check
+ if
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Custom topic metric labels feature is disabled"));
+ }
+
+ // Validate labels against allowed keys and value length
+ Set<String> allowedKeys = getAllowedCustomMetricLabelKeys();
+ int maxValueLength =
pulsar().getConfiguration().getMaxCustomMetricLabelValueLength();
+
+ if (labels != null && !labels.isEmpty()) {
+ for (Map.Entry<String, String> entry : labels.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ // Check if key is allowed
+ if (allowedKeys.isEmpty() || !allowedKeys.contains(key)) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Label key '" + key + "' is not in the list of
allowed custom metric label keys"));
+ }
+
+ // Check value length
+ if (value != null && value.length() > maxValueLength) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Label value for key '" + key + "' exceeds maximum
length of " + maxValueLength));
+ }
+ }
+ }
+
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal,
+ labels == null || labels.isEmpty(), policies -> {
+ if (labels == null || labels.isEmpty()) {
+ policies.setCustomMetricLabels(new HashMap<>());
+ } else {
+ policies.setCustomMetricLabels(new HashMap<>(labels));
+ }
+ });
+ }
+
+ protected CompletableFuture<Map<String, String>>
internalGetCustomMetricLabels(boolean isGlobal) {
+ if
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Custom topic metric labels feature is disabled"));
+ }
+
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenApply(op -> op.map(TopicPolicies::getCustomMetricLabels)
+ .orElse(null));
+ }
+
+ protected CompletableFuture<Void> internalRemoveCustomMetricLabels(boolean
removeAll, List<String> keys,
+ boolean
isGlobal) {
+ if
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Custom topic metric labels feature is disabled"));
+ }
+
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
isGlobal, true, policies -> {
+ Map<String, String> currentLabels =
policies.getCustomMetricLabels();
+ if (currentLabels == null || currentLabels.isEmpty()) {
+ return; // Nothing to remove
+ }
+ if (removeAll) {
+ policies.setCustomMetricLabels(new HashMap<>());
+ } else {
+ for (String key : keys) {
+ currentLabels.remove(key);
+ }
+ policies.setCustomMetricLabels(currentLabels);
+ }
Review Comment:
When removeAll is false and keys is provided, the code does not check if
keys is null or empty before iterating. If the caller passes removeAll=false
with keys=null or an empty list, this could lead to a NullPointerException or
no-op behavior without proper feedback to the user.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -5131,5 +5131,95 @@ public void getMessageIDByIndex(@Suspended final
AsyncResponse asyncResponse,
});
}
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/customMetricLabels")
+ @ApiOperation(value = "Set custom metric labels for a topic")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Operation successful"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic doesn't exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is
disabled"),
+ @ApiResponse(code = 412, message = "Feature is disabled or invalid
label keys/values"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void setCustomMetricLabels(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @ApiParam(value = "Custom metric labels") Map<String, String>
labels) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
+ .thenCompose(__ -> internalSetCustomMetricLabels(labels,
isGlobal))
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ handleTopicPolicyException("setCustomMetricLabels", ex,
asyncResponse);
+ return null;
+ });
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/customMetricLabels")
+ @ApiOperation(value = "Get custom metric labels for a topic", response =
Map.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "OK"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is
disabled"),
+ @ApiResponse(code = 412, message = "Feature is disabled"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getCustomMetricLabels(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
+ .thenCompose(__ -> internalGetCustomMetricLabels(isGlobal))
+ .thenApply(asyncResponse::resume).exceptionally(ex -> {
+ handleTopicPolicyException("getCustomMetricLabels", ex,
asyncResponse);
+ return null;
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/customMetricLabels")
+ @ApiOperation(value = "Remove custom metric labels from a topic")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Operation successful"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is
disabled"),
+ @ApiResponse(code = 412, message = "Feature is disabled"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeCustomMetricLabels(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @QueryParam("all") @DefaultValue("false") boolean removeAll,
+ @QueryParam(value = "List of keys to remove, or null to remove
all") List<String> keys) {
Review Comment:
The @QueryParam annotation has an incorrect 'value' attribute. The value
should be the parameter name (e.g., "keys"), not a description. The description
should be in @ApiParam or similar documentation annotations. This will cause
the query parameter binding to fail.
```suggestion
@ApiParam(value = "List of keys to remove, or null to remove
all") @QueryParam("keys") List<String> keys) {
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5466,4 +5466,86 @@ private static Long getIndexFromEntry(Entry entry) {
}
});
}
+
+ protected CompletableFuture<Void>
internalSetCustomMetricLabels(Map<String, String> labels, boolean isGlobal) {
+ // Feature flag check
+ if
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Custom topic metric labels feature is disabled"));
Review Comment:
The error message refers to "not enabled" but this is inconsistent with
other error messages in the same file which use "disabled". For consistency,
consider using "Custom topic metric labels feature is disabled" in all three
methods.
##########
conf/broker.conf:
##########
@@ -1846,6 +1846,21 @@ metricsServletTimeoutMs=30000
# Enable or disable broker bundles metrics. The default value is false.
exposeBundlesMetricsInPrometheus=false
+# Enable or disable custom topic metric labels feature.
+# If enabled, custom metric labels can be set on topics and will be exposed in
Prometheus metrics.
+# Default is false.
+exposeCustomTopicMetricLabelsEnabled=false
+
+# A comma-separated list of allowed custom metric label keys.
+# Only these keys can be set as custom metric labels on topics.
+# Example: sla_tier,data_sensitivity,cost_center,app_owner
+# If empty and the feature is enabled, no custom metric labels can be set.
Review Comment:
The documentation states "If empty and the feature is enabled, no custom
metric labels can be set" which contradicts the intended behavior. Based on the
validation logic at line 5487, when allowedKeys is empty, ALL keys are
rejected. However, this may be unexpected behavior - it might be more intuitive
to allow any keys when the list is empty (no restrictions), rather than
blocking all keys. The documentation should clarify this behavior or the
validation logic should be reconsidered.
```suggestion
# If empty and the feature is enabled, all custom metric labels are
rejected; specify one or more keys to allow labels.
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5466,4 +5466,86 @@ private static Long getIndexFromEntry(Entry entry) {
}
});
}
+
+ protected CompletableFuture<Void>
internalSetCustomMetricLabels(Map<String, String> labels, boolean isGlobal) {
+ // Feature flag check
+ if
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Custom topic metric labels feature is disabled"));
+ }
+
+ // Validate labels against allowed keys and value length
+ Set<String> allowedKeys = getAllowedCustomMetricLabelKeys();
+ int maxValueLength =
pulsar().getConfiguration().getMaxCustomMetricLabelValueLength();
+
+ if (labels != null && !labels.isEmpty()) {
+ for (Map.Entry<String, String> entry : labels.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ // Check if key is allowed
+ if (allowedKeys.isEmpty() || !allowedKeys.contains(key)) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Label key '" + key + "' is not in the list of
allowed custom metric label keys"));
+ }
+
+ // Check value length
+ if (value != null && value.length() > maxValueLength) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Label value for key '" + key + "' exceeds maximum
length of " + maxValueLength));
+ }
+ }
+ }
+
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal,
+ labels == null || labels.isEmpty(), policies -> {
+ if (labels == null || labels.isEmpty()) {
+ policies.setCustomMetricLabels(new HashMap<>());
+ } else {
+ policies.setCustomMetricLabels(new HashMap<>(labels));
+ }
+ });
+ }
+
+ protected CompletableFuture<Map<String, String>>
internalGetCustomMetricLabels(boolean isGlobal) {
+ if
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Custom topic metric labels feature is disabled"));
+ }
+
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenApply(op -> op.map(TopicPolicies::getCustomMetricLabels)
+ .orElse(null));
+ }
+
+ protected CompletableFuture<Void> internalRemoveCustomMetricLabels(boolean
removeAll, List<String> keys,
+ boolean
isGlobal) {
+ if
(!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Custom topic metric labels feature is disabled"));
+ }
+
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
isGlobal, true, policies -> {
+ Map<String, String> currentLabels =
policies.getCustomMetricLabels();
+ if (currentLabels == null || currentLabels.isEmpty()) {
+ return; // Nothing to remove
+ }
+ if (removeAll) {
+ policies.setCustomMetricLabels(new HashMap<>());
+ } else {
+ for (String key : keys) {
+ currentLabels.remove(key);
+ }
+ policies.setCustomMetricLabels(currentLabels);
+ }
+ });
+ }
+
+ private Set<String> getAllowedCustomMetricLabelKeys() {
+ String allowedKeysStr =
pulsar().getConfiguration().getAllowedCustomMetricLabelKeys();
+ if (allowedKeysStr == null || allowedKeysStr.trim().isEmpty()) {
+ return Set.of();
+ }
+ return Set.of(allowedKeysStr.split(","));
Review Comment:
The getAllowedCustomMetricLabelKeys method does not trim individual keys
after splitting by comma. If the configuration contains spaces like "key1,
key2, key3", the keys will include leading/trailing spaces and won't match user
input. Each key should be trimmed after splitting.
##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -3073,4 +3077,77 @@ void run() throws Exception {
System.out.println(getAdmin().topics().getMessageIdByIndex(topic,
index));
}
}
+
+ @Command(description = "Get custom metric labels for a topic")
+ private class GetCustomMetricLabels extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String topic = validateTopicName(topicName);
+ print(getAdmin().topicPolicies().getCustomMetricLabels(topic));
+ }
+ }
+
+ @Command(description = "Set custom metric labels for a topic")
+ private class SetCustomMetricLabels extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Option(names = {"--labels",
+ "-l"}, description = "Custom metric labels (key=value pairs, comma
separated, e.g. sla_tier=gold,"
+ + "app_owner=team-a)", required = true)
+ private String labelsStr;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String topic = validateTopicName(topicName);
+ Map<String, String> labels = new HashMap<>();
+
+ if (labelsStr != null && !labelsStr.trim().isEmpty()) {
+ String[] pairs = labelsStr.split(",");
+ for (String pair : pairs) {
+ String[] kv = pair.split("=", 2);
+ if (kv.length != 2) {
+ throw new ParameterException("Invalid label format: "
+ pair + ". Expected format: key=value");
+ }
+ labels.put(kv[0].trim(), kv[1].trim());
+ }
+ }
+
+ getAdmin().topicPolicies().setCustomMetricLabels(topic, labels);
+ }
+ }
+
+ @Command(description = "Remove custom metric labels from a topic")
+ private class RemoveCustomMetricLabels extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Option(names = {"--keys", "-k"}, description = "Label keys to remove"
+ + " (comma separated, e.g. sla_tier,app_owner). If not specified, "
+ + "all labels will be removed.", required = false)
+ private String keysStr;
+
+ @Option(names = {"--all", "-a"}, description = "Remove all labels",
required = false)
+ private boolean removeAll;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String topic = validateTopicName(topicName);
+
+ if (!removeAll) {
+ List<String> keys = Arrays.asList(keysStr.split(","));
+ keys =
keys.stream().map(String::trim).collect(Collectors.toList());
+ if (keys.isEmpty()) {
+ throw new ParameterException("No label keys specified for
removal.");
+ }
+ getAdmin().topicPolicies().removeCustomMetricLabels(topic,
false, keys);
Review Comment:
When neither --keys nor --all is specified, keysStr will be null, causing a
NullPointerException on line 3141 when split(",") is called. The code should
check if keysStr is null or empty before attempting to split it, or require at
least one of the options to be provided.
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java:
##########
@@ -1323,6 +1323,47 @@ public CompletableFuture<Void>
deleteTopicPoliciesAsync(String topic) {
return asyncDeleteRequest(path);
}
+ @Override
+ public void setCustomMetricLabels(String topic, Map<String, String>
labels) throws PulsarAdminException {
+ sync(() -> setCustomMetricLabelsAsync(topic, labels));
+ }
+
+ @Override
+ public CompletableFuture<Void> setCustomMetricLabelsAsync(String topic,
Map<String, String> labels) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "customMetricLabels");
+ return asyncPostRequest(path, Entity.entity(labels,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public Map<String, String> getCustomMetricLabels(String topic) throws
PulsarAdminException {
+ return sync(() -> getCustomMetricLabelsAsync(topic));
+ }
+
+ @Override
+ public CompletableFuture<Map<String, String>>
getCustomMetricLabelsAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "customMetricLabels");
+ return asyncGetRequest(path, new FutureCallback<Map<String,
String>>(){});
+ }
+
+ @Override
+ public void removeCustomMetricLabels(String topic, boolean removeAll,
List<String> keys) throws PulsarAdminException {
+ sync(() -> removeCustomMetricLabelsAsync(topic, removeAll, keys));
+ }
+
+ @Override
+ public CompletableFuture<Void> removeCustomMetricLabelsAsync(String topic,
boolean removeAll, List<String> keys) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "customMetricLabels");
+ if (removeAll) {
+ path.queryParam("all", true);
+ } else if (keys != null && !keys.isEmpty()) {
+ path.queryParam("keys", keys);
Review Comment:
The path variable is being reassigned with query parameters but
WebTarget.queryParam() returns a new instance. The modified path is not being
used in the subsequent asyncDeleteRequest. This will result in query parameters
not being sent to the server.
```suggestion
path = path.queryParam("all", true);
} else if (keys != null && !keys.isEmpty()) {
path = path.queryParam("keys", keys);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]