This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b864a664397 MINOR: Add logging for ReplicationControlManager topic
deletion (#17617)
b864a664397 is described below
commit b864a664397c21e0a3ec65b635af29e9fd7e10f3
Author: Mahsa Seifikar <[email protected]>
AuthorDate: Fri Nov 1 15:24:22 2024 -0400
MINOR: Add logging for ReplicationControlManager topic deletion (#17617)
Reviewers: Colin P. McCabe <[email protected]>
---
.../controller/ReplicationControlManager.java | 30 +++++++++++++++++++---
1 file changed, 27 insertions(+), 3 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 491f4620304..16cc762ebc5 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -939,17 +939,39 @@ public class ReplicationControlManager {
Map<Uuid, ApiError> results = new HashMap<>(ids.size());
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP,
ids.size());
+ StringBuilder resultsBuilder = new StringBuilder();
+ String resultsPrefix = "";
+
for (Uuid id : ids) {
+ String topicName = "null";
+ ApiError error;
try {
+ log.trace("Starting deletion of topic with ID {}.", id);
deleteTopic(context, id, records);
- results.put(id, ApiError.NONE);
+ error = ApiError.NONE;
} catch (ApiException e) {
- results.put(id, ApiError.fromThrowable(e));
+ error = ApiError.fromThrowable(e);
} catch (Exception e) {
log.error("Unexpected deleteTopics error for {}", id, e);
- results.put(id, ApiError.fromThrowable(e));
+ error = ApiError.fromThrowable(e);
}
+
+ results.put(id, error);
+
+ if (!error.isFailure() || error.error() != UNKNOWN_TOPIC_ID) {
+ topicName = topics.get(id).name;
+ }
+
+ resultsBuilder.append(resultsPrefix)
+ .append("{id: ").append(id)
+ .append(", name: ").append(topicName)
+ .append(", result: ")
+ .append(error.isFailure() ? error.error() : "SUCCESS")
+ .append("}");
+ resultsPrefix = ", ";
}
+
+ log.info("DeleteTopics result(s): {}", resultsBuilder);
return ControllerResult.atomicOf(records, results);
}
@@ -959,8 +981,10 @@ public class ReplicationControlManager {
throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
}
int numPartitions = topic.parts.size();
+ log.trace("Deleting topic {} with ID {} and {} partitions",
topic.name, id, numPartitions);
try {
context.applyPartitionChangeQuota(numPartitions); // check
controller mutation quota
+ log.trace("Checked for a partition change quota on topic {} with
ID {}", topic.name, id);
} catch (ThrottlingQuotaExceededException e) {
// log a message and rethrow the exception
log.debug("Topic deletion of {} partitions not allowed because
quota is violated. Delay time: {}",