This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 65706c6ffa7 [improve] [broker] improve read entry error log for 
troubleshooting (#21169)
65706c6ffa7 is described below

commit 65706c6ffa737f946ad9a1bfdb6ff70fa66c0415
Author: fengyubiao <[email protected]>
AuthorDate: Wed Sep 13 17:30:57 2023 +0800

    [improve] [broker] improve read entry error log for troubleshooting (#21169)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java     | 10 ++++++++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java     |  6 ++++++
 .../mledger/impl/ReadOnlyManagedLedgerImpl.java        |  4 ++--
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 18 ++++++++++++++++++
 .../org/apache/pulsar/broker/service/ServerCnx.java    |  6 ++++++
 .../service/persistent/PersistentReplicator.java       |  6 ++++++
 .../service/persistent/PersistentSubscription.java     |  6 ++++++
 .../broker/service/persistent/PersistentTopic.java     |  2 +-
 .../impl/SnapshotSegmentAbortedTxnProcessorImpl.java   |  6 ++++++
 9 files changed, 61 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index ff8e0655d03..376bde3dc2c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -816,6 +816,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                 result.entry = entry;
                 counter.countDown();
             }
+
+            @Override
+            public String toString() {
+                return String.format("Cursor [{}] get Nth entry", 
ManagedCursorImpl.this);
+            }
         }, null);
 
         
counter.await(ledger.getConfig().getMetadataOperationsTimeoutSeconds(), 
TimeUnit.SECONDS);
@@ -1527,6 +1532,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                     callback.readEntriesFailed(exception.get(), ctx);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("Cursor [{}] async replay entries", 
ManagedCursorImpl.this);
+            }
         };
 
         positions.stream().filter(position -> 
!alreadyAcknowledgedPositions.contains(position))
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 89c18f4b834..4991080d38c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1263,6 +1263,12 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 log.error("Error read entry for position {}", nextPos, 
exception);
                 future.completeExceptionally(exception);
             }
+
+            @Override
+            public String toString() {
+                return String.format("ML [{}] get earliest message publish 
time of pos",
+                        ManagedLedgerImpl.this.name);
+            }
         }, null);
 
         return future;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 944674f6862..1fdf6939506 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -143,8 +143,8 @@ public class ReadOnlyManagedLedgerImpl extends 
ManagedLedgerImpl {
             this.getLedgerHandle(position.getLedgerId())
                     .thenAccept((ledger) -> asyncReadEntry(ledger, position, 
callback, ctx))
                     .exceptionally((ex) -> {
-                        log.error("[{}] Error opening ledger for reading at 
position {} - {}", this.name, position,
-                                ex.getMessage());
+                        log.error("[{}] Error opening ledger for reading at 
position {} - {}. Op: {}", this.name,
+                                position, ex.getMessage(), callback);
                         
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
 ctx);
                         return null;
                     });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 56598f5cc45..790902d70e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2744,6 +2744,12 @@ public class PersistentTopicsBase extends AdminResource {
                             }
                         }
                     }
+
+                    @Override
+                    public String toString() {
+                        return String.format("Topic [{}] get entry batch size",
+                                PersistentTopicsBase.this.topicName);
+                    }
                 }, null);
             } catch (NullPointerException npe) {
                 batchSizeFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Message not found"));
@@ -2842,6 +2848,12 @@ public class PersistentTopicsBase extends AdminResource {
                                 }
                             }
                         }
+
+                        @Override
+                        public String toString() {
+                            return String.format("Topic [{}] internal get 
message by id",
+                                    PersistentTopicsBase.this.topicName);
+                        }
                     }, null);
             return results;
         });
@@ -3008,6 +3020,12 @@ public class PersistentTopicsBase extends AdminResource {
                             public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
                                 future.completeExceptionally(exception);
                             }
+
+                            @Override
+                            public String toString() {
+                                return String.format("Topic [{}] internal 
examine message async",
+                                        PersistentTopicsBase.this.topicName);
+                            }
                         }, null);
                         return future;
                     } catch (ManagedLedgerException exception) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6b1af573a08..cae16f3c761 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2124,6 +2124,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
                     entryFuture.completeExceptionally(exception);
                 }
+
+                @Override
+                public String toString() {
+                    return String.format("ServerCnx [{}] get largest batch 
index when possible",
+                            ServerCnx.this.ctx.channel());
+                }
             }, null);
 
             CompletableFuture<Integer> batchSizeFuture = 
entryFuture.thenApply(entry -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 0d387da1dd9..a96036b7cfe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -530,6 +530,12 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             public void readEntryComplete(Entry entry, Object ctx) {
                 future.complete(entry);
             }
+
+            @Override
+            public String toString() {
+                return String.format("Replication [{}] peek Nth message",
+                        PersistentReplicator.this.producer.getProducerName());
+            }
         }, null);
 
         return future;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2f5485afbaa..1f6f688d86f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -849,6 +849,12 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
             public void readEntryComplete(Entry entry, Object ctx) {
                 future.complete(entry);
             }
+
+            @Override
+            public String toString() {
+                return String.format("Subscription [{}-{}] async replay 
entries", PersistentSubscription.this.topicName,
+                        PersistentSubscription.this.subName);
+            }
         }, null);
 
         return future;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 557421da96f..dfeb03a2546 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3089,7 +3089,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 // if AutoSkipNonRecoverableData is set to true, just return 
true here.
                 return true;
             } else {
-                log.warn("[{}] Error while getting the oldest message", topic, 
e);
+                log.warn("[{}] [{}] Error while getting the oldest message", 
topic, cursor.toString(), e);
             }
         } finally {
             if (entry != null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index be1271a155c..04b2ef66d25 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -327,6 +327,12 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
                                                     hasInvalidIndex.set(true);
                                                 }
                                             }
+
+                                            @Override
+                                            public String toString() {
+                                                return 
String.format("Transaction buffer [{}] recover from snapshot",
+                                                        
SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
+                                            }
                                         }, null);
                             });
                             
openManagedLedgerAndHandleSegmentsFuture.complete(null);

Reply via email to