This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 701d4cfe21b [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) 701d4cfe21b is described below commit 701d4cfe21b8bc291d8530f7510ccdaa6c6280f0 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Fri Apr 26 15:16:47 2024 +0300 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) (cherry picked from commit 69839c72f1375d141b56734bc5e041c13e366c57) --- .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 079ae3e2ae5..2e88cb33324 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -192,7 +192,20 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore Code code = Code.get(rc); if (code == Code.CONNECTIONLOSS) { // There is the chance that we caused a connection reset by sending or requesting a batch - // that passed the max ZK limit. Retry with the individual operations + // that passed the max ZK limit. + + // Build the log warning message + // summarize the operations by type + String countsByType = ops.stream().collect( + Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1))) + .entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries") + .collect(Collectors.joining(", ")); + Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size)); + log.warn("Connection loss while executing batch operation of {} " + + "of total data size of {}. " + + "Retrying individual operations one-by-one.", countsByType, totalSize); + + // Retry with the individual operations executor.schedule(() -> { ops.forEach(o -> batchOperation(Collections.singletonList(o))); }, 100, TimeUnit.MILLISECONDS);