This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 65706c6ffa7 [improve] [broker] improve read entry error log for
troubleshooting (#21169)
65706c6ffa7 is described below
commit 65706c6ffa737f946ad9a1bfdb6ff70fa66c0415
Author: fengyubiao <[email protected]>
AuthorDate: Wed Sep 13 17:30:57 2023 +0800
[improve] [broker] improve read entry error log for troubleshooting (#21169)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 ++++++++++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 6 ++++++
.../mledger/impl/ReadOnlyManagedLedgerImpl.java | 4 ++--
.../pulsar/broker/admin/impl/PersistentTopicsBase.java | 18 ++++++++++++++++++
.../org/apache/pulsar/broker/service/ServerCnx.java | 6 ++++++
.../service/persistent/PersistentReplicator.java | 6 ++++++
.../service/persistent/PersistentSubscription.java | 6 ++++++
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../impl/SnapshotSegmentAbortedTxnProcessorImpl.java | 6 ++++++
9 files changed, 61 insertions(+), 3 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index ff8e0655d03..376bde3dc2c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -816,6 +816,11 @@ public class ManagedCursorImpl implements ManagedCursor {
result.entry = entry;
counter.countDown();
}
+
+ @Override
+ public String toString() {
+ return String.format("Cursor [{}] get Nth entry",
ManagedCursorImpl.this);
+ }
}, null);
counter.await(ledger.getConfig().getMetadataOperationsTimeoutSeconds(),
TimeUnit.SECONDS);
@@ -1527,6 +1532,11 @@ public class ManagedCursorImpl implements ManagedCursor {
callback.readEntriesFailed(exception.get(), ctx);
}
}
+
+ @Override
+ public String toString() {
+ return String.format("Cursor [{}] async replay entries",
ManagedCursorImpl.this);
+ }
};
positions.stream().filter(position ->
!alreadyAcknowledgedPositions.contains(position))
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 89c18f4b834..4991080d38c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1263,6 +1263,12 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.error("Error read entry for position {}", nextPos,
exception);
future.completeExceptionally(exception);
}
+
+ @Override
+ public String toString() {
+ return String.format("ML [{}] get earliest message publish
time of pos",
+ ManagedLedgerImpl.this.name);
+ }
}, null);
return future;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 944674f6862..1fdf6939506 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -143,8 +143,8 @@ public class ReadOnlyManagedLedgerImpl extends
ManagedLedgerImpl {
this.getLedgerHandle(position.getLedgerId())
.thenAccept((ledger) -> asyncReadEntry(ledger, position,
callback, ctx))
.exceptionally((ex) -> {
- log.error("[{}] Error opening ledger for reading at
position {} - {}", this.name, position,
- ex.getMessage());
+ log.error("[{}] Error opening ledger for reading at
position {} - {}. Op: {}", this.name,
+ position, ex.getMessage(), callback);
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
ctx);
return null;
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 56598f5cc45..790902d70e8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2744,6 +2744,12 @@ public class PersistentTopicsBase extends AdminResource {
}
}
}
+
+ @Override
+ public String toString() {
+ return String.format("Topic [{}] get entry batch size",
+ PersistentTopicsBase.this.topicName);
+ }
}, null);
} catch (NullPointerException npe) {
batchSizeFuture.completeExceptionally(new
RestException(Status.NOT_FOUND, "Message not found"));
@@ -2842,6 +2848,12 @@ public class PersistentTopicsBase extends AdminResource {
}
}
}
+
+ @Override
+ public String toString() {
+ return String.format("Topic [{}] internal get
message by id",
+ PersistentTopicsBase.this.topicName);
+ }
}, null);
return results;
});
@@ -3008,6 +3020,12 @@ public class PersistentTopicsBase extends AdminResource {
public void readEntryFailed(ManagedLedgerException
exception, Object ctx) {
future.completeExceptionally(exception);
}
+
+ @Override
+ public String toString() {
+ return String.format("Topic [{}] internal
examine message async",
+ PersistentTopicsBase.this.topicName);
+ }
}, null);
return future;
} catch (ManagedLedgerException exception) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6b1af573a08..cae16f3c761 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2124,6 +2124,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
entryFuture.completeExceptionally(exception);
}
+
+ @Override
+ public String toString() {
+ return String.format("ServerCnx [{}] get largest batch
index when possible",
+ ServerCnx.this.ctx.channel());
+ }
}, null);
CompletableFuture<Integer> batchSizeFuture =
entryFuture.thenApply(entry -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 0d387da1dd9..a96036b7cfe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -530,6 +530,12 @@ public abstract class PersistentReplicator extends
AbstractReplicator
public void readEntryComplete(Entry entry, Object ctx) {
future.complete(entry);
}
+
+ @Override
+ public String toString() {
+ return String.format("Replication [{}] peek Nth message",
+ PersistentReplicator.this.producer.getProducerName());
+ }
}, null);
return future;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2f5485afbaa..1f6f688d86f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -849,6 +849,12 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
public void readEntryComplete(Entry entry, Object ctx) {
future.complete(entry);
}
+
+ @Override
+ public String toString() {
+ return String.format("Subscription [{}-{}] async replay
entries", PersistentSubscription.this.topicName,
+ PersistentSubscription.this.subName);
+ }
}, null);
return future;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 557421da96f..dfeb03a2546 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3089,7 +3089,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// if AutoSkipNonRecoverableData is set to true, just return
true here.
return true;
} else {
- log.warn("[{}] Error while getting the oldest message", topic,
e);
+ log.warn("[{}] [{}] Error while getting the oldest message",
topic, cursor.toString(), e);
}
} finally {
if (entry != null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index be1271a155c..04b2ef66d25 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -327,6 +327,12 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
hasInvalidIndex.set(true);
}
}
+
+ @Override
+ public String toString() {
+ return
String.format("Transaction buffer [{}] recover from snapshot",
+
SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
+ }
}, null);
});
openManagedLedgerAndHandleSegmentsFuture.complete(null);