This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9efac61711f9cf3d1b31f0907aaeae4c3b3c3a56
Author: fengyubiao <[email protected]>
AuthorDate: Mon Dec 30 10:40:37 2024 +0800

    [improve][log] Print ZK path if write to ZK fails due to data being too 
large to persist (#23652)
    
    (cherry picked from commit 5a3a1f169a7f90181bd5c213c8e9f479bc74f0f2)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java     | 6 +++---
 .../java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java     | 8 +++++++-
 .../java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java | 2 ++
 3 files changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 022cecf8d57..0cd9fc0d54c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2247,6 +2247,8 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
             if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
+                log.error("[{}][{}] Metadata ledger creation failed, try to 
persist the position in the metadata"
+                        + " store.", ledger.getName(), name);
                 persistPositionToMetaStore(mdEntry, cb);
             } else {
                 cb.operationFailed(new ManagedLedgerException("Switch new 
cursor ledger failed"));
@@ -2969,9 +2971,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             @Override
             public void operationFailed(ManagedLedgerException exception) {
-                log.error("[{}][{}] Metadata ledger creation failed {}, try to 
persist the position in the metadata"
-                        + " store.", ledger.getName(), name, exception);
-
+                log.error("[{}][{}] Metadata ledger creation failed {}", 
ledger.getName(), name, exception);
                 synchronized (pendingMarkDeleteOps) {
                     // At this point we don't have a ledger ready
                     STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 603a4503dc8..4c24aa5938b 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -201,10 +202,15 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                                         
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
                                 .entrySet().stream().map(e -> e.getValue() + " 
" + e.getKey().name() + " entries")
                                 .collect(Collectors.joining(", "));
+                        List<Pair> opsForLog = ops.stream()
+                                .filter(item -> item.size() > 256 * 1024)
+                                .map(op -> Pair.of(op.getPath(), op.size()))
+                                .collect(Collectors.toList());
                         Long totalSize = 
ops.stream().collect(Collectors.summingLong(MetadataOp::size));
                         log.warn("Connection loss while executing batch 
operation of {} "
                                 + "of total data size of {}. "
-                                + "Retrying individual operations 
one-by-one.", countsByType, totalSize);
+                                + "Retrying individual operations one-by-one. 
ops whose size > 256KB: {}",
+                                countsByType, totalSize, opsForLog);
 
                         // Retry with the individual operations
                         executor.schedule(() -> {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
index abf60f7b724..06ff425372b 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
@@ -51,4 +51,6 @@ public interface MetadataOp {
     default OpPut asPut() {
         return (OpPut) this;
     }
+
+    String getPath();
 }

Reply via email to