This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dfc7bac2efc [improve] [broker] Improve logs for troubleshooting
(#21141)
dfc7bac2efc is described below
commit dfc7bac2efc614bb80c7543d07e0c67040274e7f
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 7 03:15:22 2023 +0800
[improve] [broker] Improve logs for troubleshooting (#21141)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 26 +++++++++++++---------
.../mledger/impl/NonDurableCursorImpl.java | 8 +++++--
.../mledger/impl/NonDurableCursorTest.java | 3 ++-
.../broker/service/persistent/PersistentTopic.java | 8 +++----
4 files changed, 27 insertions(+), 18 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index a2420c1c29e..ff8e0655d03 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -509,7 +509,7 @@ public class ManagedCursorImpl implements ManagedCursor {
callback.operationComplete();
} else {
// Need to proceed and read the last entry in the
specified ledger to find out the last position
- log.info("[{}] Consumer {} meta-data recover from ledger
{}", ledger.getName(), name,
+ log.info("[{}] Cursor {} meta-data recover from ledger
{}", ledger.getName(), name,
info.getCursorsLedgerId());
recoverFromLedger(info, callback);
}
@@ -529,16 +529,16 @@ public class ManagedCursorImpl implements ManagedCursor {
long ledgerId = info.getCursorsLedgerId();
OpenCallback openCallback = (rc, lh, ctx) -> {
if (log.isInfoEnabled()) {
- log.info("[{}] Opened ledger {} for consumer {}. rc={}",
ledger.getName(), ledgerId, name, rc);
+ log.info("[{}] Opened ledger {} for cursor {}. rc={}",
ledger.getName(), ledgerId, name, rc);
}
if (isBkErrorNotRecoverable(rc)) {
- log.error("[{}] Error opening metadata ledger {} for consumer
{}: {}", ledger.getName(), ledgerId, name,
+ log.error("[{}] Error opening metadata ledger {} for cursor
{}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(),
Collections.emptyMap(), callback);
return;
} else if (rc != BKException.Code.OK) {
- log.warn("[{}] Error opening metadata ledger {} for consumer
{}: {}", ledger.getName(), ledgerId, name,
+ log.warn("[{}] Error opening metadata ledger {} for cursor {}:
{}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
callback.operationFailed(new
ManagedLedgerException(BKException.getMessage(rc)));
return;
@@ -548,7 +548,7 @@ public class ManagedCursorImpl implements ManagedCursor {
long lastEntryInLedger = lh.getLastAddConfirmed();
if (lastEntryInLedger < 0) {
- log.warn("[{}] Error reading from metadata ledger {} for
consumer {}: No entries in ledger",
+ log.warn("[{}] Error reading from metadata ledger {} for
cursor {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
initialize(getRollbackPosition(info), Collections.emptyMap(),
cursorProperties, callback);
@@ -560,13 +560,13 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}} readComplete rc={} entryId={}",
ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
if (isBkErrorNotRecoverable(rc1)) {
- log.error("[{}] Error reading from metadata ledger {} for
consumer {}: {}", ledger.getName(),
+ log.error("[{}] Error reading from metadata ledger {} for
cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
initialize(getRollbackPosition(info),
Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc1 != BKException.Code.OK) {
- log.warn("[{}] Error reading from metadata ledger {} for
consumer {}: {}", ledger.getName(),
+ log.warn("[{}] Error reading from metadata ledger {} for
cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
callback.operationFailed(createManagedLedgerException(rc1));
@@ -2453,8 +2453,12 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public synchronized String toString() {
- return MoreObjects.toStringHelper(this).add("ledger",
ledger.getName()).add("name", name)
- .add("ackPos", markDeletePosition).add("readPos",
readPosition).toString();
+ return MoreObjects.toStringHelper(this)
+ .add("ledger", ledger.getName())
+ .add("name", name)
+ .add("ackPos", markDeletePosition)
+ .add("readPos", readPosition)
+ .toString();
}
@Override
@@ -3068,7 +3072,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (shouldCloseLedger(lh1)) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Need to create new metadata ledger for
consumer {}", ledger.getName(), name);
+ log.debug("[{}] Need to create new metadata ledger for
cursor {}", ledger.getName(), name);
}
startCreatingNewMetadataLedger();
}
@@ -3153,7 +3157,7 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void operationFailed(MetaStoreException e) {
- log.warn("[{}] Failed to update consumer {}",
ledger.getName(), name, e);
+ log.warn("[{}] Failed to update cursor metadata {}",
ledger.getName(), name, e);
// it means it failed to switch the newly created ledger so,
it should be
// deleted to prevent leak
deleteLedgerAsync(lh).thenRun(() ->
callback.operationFailed(e));
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 9d2829b1707..51e56158cad 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -138,8 +138,12 @@ public class NonDurableCursorImpl extends
ManagedCursorImpl {
@Override
public synchronized String toString() {
- return MoreObjects.toStringHelper(this).add("ledger",
ledger.getName()).add("ackPos", markDeletePosition)
- .add("readPos", readPosition).toString();
+ return MoreObjects.toStringHelper(this)
+ .add("ledger", ledger.getName())
+ .add("cursor", getName())
+ .add("ackPos", markDeletePosition)
+ .add("readPos", readPosition)
+ .toString();
}
private static final Logger log =
LoggerFactory.getLogger(NonDurableCursorImpl.class);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 1ad3f5f8de6..1e1f7df0a46 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -80,7 +80,8 @@ public class NonDurableCursorTest extends
MockedBookKeeperTestCase {
entries.forEach(Entry::release);
// Test string representation
- assertEquals(c1.toString(),
"NonDurableCursorImpl{ledger=my_test_ledger, ackPos=3:-1, readPos=3:1}");
+ assertEquals(c1.toString(),
"NonDurableCursorImpl{ledger=my_test_ledger, cursor="
+ + c1.getName() + ", ackPos=3:-1, readPos=3:1}");
}
@Test(timeOut = 20000)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c737a73660b..557421da96f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -910,8 +910,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
consumer.close();
} catch (BrokerServiceException e) {
if (e instanceof ConsumerBusyException) {
- log.warn("[{}][{}] Consumer {} {} already
connected",
- topic, subscriptionName, consumerId,
consumerName);
+ log.warn("[{}][{}] Consumer {} {} already
connected: {}",
+ topic, subscriptionName, consumerId,
consumerName, e.getMessage());
} else if (e instanceof SubscriptionBusyException)
{
log.warn("[{}][{}] {}", topic,
subscriptionName, e.getMessage());
}
@@ -941,8 +941,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
decrementUsageCount();
if (ex.getCause() instanceof ConsumerBusyException) {
- log.warn("[{}][{}] Consumer {} {} already connected",
topic, subscriptionName, consumerId,
- consumerName);
+ log.warn("[{}][{}] Consumer {} {} already connected: {}",
topic, subscriptionName, consumerId,
+ consumerName, ex.getCause().getMessage());
Consumer consumer = null;
try {
consumer = subscriptionFuture.isDone() ?
getActiveConsumer(subscriptionFuture.get()) : null;