This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new c243d18 Issue #6054 - Adding more permits debug statements to better
see changes to permits (#10217)
c243d18 is described below
commit c243d185261e964010a122b65f9c730bffbb2f0c
Author: Devin Bost <[email protected]>
AuthorDate: Mon Apr 19 03:28:47 2021 -0600
Issue #6054 - Adding more permits debug statements to better see changes to
permits (#10217)
---
docker/build.sh | 2 +-
.../org/apache/pulsar/broker/service/Consumer.java | 26 ++++++++++++++++++++++
.../broker/service/PulsarCommandSenderImpl.java | 4 ++--
.../PersistentDispatcherMultipleConsumers.java | 23 ++++++++++++++-----
4 files changed, 47 insertions(+), 8 deletions(-)
diff --git a/docker/build.sh b/docker/build.sh
index 944c35b..1379790 100755
--- a/docker/build.sh
+++ b/docker/build.sh
@@ -22,4 +22,4 @@ ROOT_DIR=$(git rev-parse --show-toplevel)
cd $ROOT_DIR/docker
mvn -f ../dashboard/pom.xml package -Pdocker
-mvn package -Pdocker
+mvn package -Pdocker
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 263d722..475fdc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -237,6 +237,11 @@ public class Consumer {
if (entry != null) {
int batchSize = batchSizes.getBatchSize(i);
pendingAcks.put(entry.getLedgerId(), entry.getEntryId(),
batchSize, 0);
+ if (log.isDebugEnabled()){
+ log.debug("[{}-{}] Added {}:{} ledger entry with
batchSize of {} to pendingAcks in"
+ + " broker.service.Consumer for
consumerId: {}",
+ topicName, subscription, entry.getLedgerId(),
entry.getEntryId(), batchSize, consumerId);
+ }
}
}
}
@@ -250,6 +255,11 @@ public class Consumer {
// reduce permit and increment unackedMsg count with total number of
messages in batch-msgs
int ackedCount = batchIndexesAcks == null ? 0 :
batchIndexesAcks.getTotalAckedIndexCount();
MESSAGE_PERMITS_UPDATER.addAndGet(this, ackedCount - totalMessages);
+ if (log.isDebugEnabled()){
+ log.debug("[{}-{}] Added {} minus {} messages to
MESSAGE_PERMITS_UPDATER in broker.service.Consumer"
+ + " for consumerId: {}; avgMessagesPerEntry is {}",
+ topicName, subscription, ackedCount, totalMessages,
consumerId, tmpAvgMessagesPerEntry);
+ }
incrementUnackedMessages(totalMessages);
msgOut.recordMultipleEvents(totalMessages, totalBytes);
msgOutCounter.add(totalMessages);
@@ -506,6 +516,10 @@ public class Consumer {
int oldPermits;
if (!blockedConsumerOnUnackedMsgs) {
oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this,
additionalNumberOfMessages);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Added {} message permits in
broker.service.Consumer before updating dispatcher "
+ + "for consumer", topicName, subscription,
additionalNumberOfMessages, consumerId);
+ }
subscription.consumerFlow(this, additionalNumberOfMessages);
} else {
oldPermits =
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this,
additionalNumberOfMessages);
@@ -529,6 +543,10 @@ public class Consumer {
int additionalNumberOfPermits =
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
// add newly flow permits to actual consumer.messagePermits
MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits);
+ if (log.isDebugEnabled()){
+ log.debug("[{}-{}] Added {} blocked permits to
broker.service.Consumer for consumer", topicName,
+ subscription, additionalNumberOfPermits, consumerId);
+ }
// dispatch pending permits to flow more messages: it will add more
permits to dispatcher and consumer
subscription.consumerFlow(consumer, additionalNumberOfPermits);
}
@@ -577,6 +595,10 @@ public class Consumer {
lastAckedTimestamp = consumerStats.lastAckedTimestamp;
lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
+ if (log.isDebugEnabled()){
+ log.debug("[{}-{}] Setting broker.service.Consumer's
messagePermits to {} for consumer", topicName,
+ subscription, consumerStats.availablePermits, consumerId);
+ }
unackedMessages = consumerStats.unackedMessages;
blockedConsumerOnUnackedMsgs =
consumerStats.blockedConsumerOnUnackedMsgs;
AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry);
@@ -754,6 +776,10 @@ public class Consumer {
// if permitsReceivedWhileConsumerBlocked has been accumulated then
pass it to Dispatcher to flow messages
if (numberOfBlockedPermits > 0) {
MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Added {} blockedPermits to
broker.service.Consumer's messagePermits for consumer {}",
+ topicName, subscription, numberOfBlockedPermits,
consumerId);
+ }
subscription.consumerFlow(this, numberOfBlockedPermits);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 41705e9..dcc8458 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -284,8 +284,8 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
}
if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Sending message to consumerId {}, msg
id {}-{}", topicName, subscription,
- consumerId, entry.getLedgerId(),
entry.getEntryId());
+ log.debug("[{}-{}] Sending message to consumerId {}, msg
id {}-{} with batchSize {}",
+ topicName, subscription, consumerId,
entry.getLedgerId(), entry.getEntryId(), batchSize);
}
int redeliveryCount = 0;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index af22fa5..c2dc7d7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -213,6 +213,11 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId,
entryId));
});
totalAvailablePermits -= consumer.getAvailablePermits();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Decreased totalAvailablePermits by {} in
PersistentDispatcherMultipleConsumers. "
+ + "New dispatcher permit count is {}",
name, consumer.getAvailablePermits(),
+ totalAvailablePermits);
+ }
readMoreEntries();
}
} else {
@@ -232,8 +237,9 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
totalAvailablePermits += additionalNumberOfMessages;
if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Trigger new read after receiving flow control
message with permits {}", name, consumer,
- totalAvailablePermits);
+ log.debug("[{}-{}] Trigger new read after receiving flow control
message with permits {} "
+ + "after adding {} permits", name, consumer,
+ totalAvailablePermits, additionalNumberOfMessages);
}
readMoreEntries();
}
@@ -494,8 +500,9 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
// round-robin dispatch batch size for this consumer
int availablePermits = c.isWritable() ? c.getAvailablePermits() :
1;
if (log.isDebugEnabled() && !c.isWritable()) {
- log.debug("[{}-{}] consumer is not writable. dispatching only
1 message to {} ", topic.getName(), name,
- c);
+ log.debug("[{}-{}] consumer is not writable. dispatching only
1 message to {}; "
+ + "availablePermits are {}", topic.getName(),
name,
+ c, availablePermits);
}
int messagesForC = Math.min(
Math.min(entriesToDispatch, availablePermits),
@@ -524,7 +531,13 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
int msgSent = sendMessageInfo.getTotalMessages();
start += messagesForC;
entriesToDispatch -= messagesForC;
- TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(msgSent -
batchIndexesAcks.getTotalAckedIndexCount()));
+ TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
+ -(msgSent -
batchIndexesAcks.getTotalAckedIndexCount()));
+ if (log.isDebugEnabled()){
+ log.debug("[{}] Added -({} minus {}) permits to
TOTAL_AVAILABLE_PERMITS_UPDATER in "
+ + "PersistentDispatcherMultipleConsumers",
+ name, msgSent,
batchIndexesAcks.getTotalAckedIndexCount());
+ }
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}