This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 565b59bd278 [improve][ml] Propagate slog logger context to BookKeeper 
ledger create/open (#25995)
565b59bd278 is described below

commit 565b59bd278aefe3a8a87fd3f477103ec07b7ab1
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 11 07:29:10 2026 -0700

    [improve][ml] Propagate slog logger context to BookKeeper ledger 
create/open (#25995)
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  23 +++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   4 +-
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  32 ++++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  35 +++++--
 .../mledger/impl/ReadOnlyManagedLedgerImpl.java    |   3 +-
 .../impl/ManagedLedgerFactoryShutdownTest.java     |  12 +--
 .../impl/ManagedLedgerLoggerContextTest.java       | 111 +++++++++++++++++++++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  12 ++-
 .../bucket/BookkeeperBucketSnapshotStorage.java    |  26 +++--
 .../pulsar/broker/service/BrokerService.java       |  11 ++
 .../service/schema/BookkeeperSchemaStorage.java    |  26 +++--
 .../pendingack/impl/MLPendingAckStoreProvider.java |   4 +
 .../compaction/AbstractTwoPhaseCompactor.java      |  28 +++---
 .../compaction/StrategicTwoPhaseCompactor.java     |   2 +-
 .../coordinator/impl/MLTransactionLogImpl.java     |   1 +
 .../bookkeeper/client/PulsarMockBookKeeper.java    |  79 +++++++++++++++
 16 files changed, 340 insertions(+), 69 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 8fbf1cfda0d..20da70bece8 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import io.github.merlimat.slog.Logger;
 import java.nio.charset.StandardCharsets;
 import java.time.Clock;
 import java.util.Arrays;
@@ -81,6 +82,7 @@ public class ManagedLedgerConfig {
     private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
     private int newEntriesCheckDelayInMillis = 10;
     private Clock clock = Clock.systemUTC();
+    private Logger loggerContext;
     private ManagedLedgerInterceptor managedLedgerInterceptor;
     private Map<String, String> properties;
     private int inactiveLedgerRollOverTimeMs = 0;
@@ -597,6 +599,27 @@ public class ManagedLedgerConfig {
         return this;
     }
 
+    /**
+     * Get the parent logger whose context attributes are inherited by the 
managed ledger logger.
+     *
+     * @return the parent logger, or null if none was set
+     */
+    public Logger getLoggerContext() {
+        return loggerContext;
+    }
+
+    /**
+     * Set a parent slog {@link Logger} whose context attributes (e.g. {@code 
topic}, {@code subscription}) are
+     * inherited by the managed ledger logger and propagated to the BookKeeper 
client when ledgers are created or
+     * opened, so that log statements emitted by the BookKeeper client carry 
the application context.
+     *
+     * @param loggerContext logger whose context attributes to inherit; null 
means no extra context
+     */
+    public ManagedLedgerConfig setLoggerContext(Logger loggerContext) {
+        this.loggerContext = loggerContext;
+        return this;
+    }
+
     /**
      * Get clock to use to time operations.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 67d392f78f5..b043a41c990 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -374,7 +374,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         this.cursorProperties = Collections.emptyMap();
         this.ledger = ledger;
         this.name = cursorName;
-        this.log = slog.with().attr("managedLedger", 
ledger.getName()).attr("cursor", name).build();
+        this.log = slog.with().ctx(ledger.getLogger()).attr("cursor", 
name).build();
         this.individualDeletedMessages = new 
RangeSetWrapper<>(positionRangeConverter,
                 positionRangeReverseConverter, this);
         if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
@@ -3322,7 +3322,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.debug().attr("ledgerId", lh.getId()).log("Created cursor 
ledger");
                 future.complete(lh);
             });
-        }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
+        }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name), log);
 
         return future;
     }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 269cbafc207..e855a21c17d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -47,6 +47,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.CustomLog;
@@ -57,6 +58,7 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -1251,7 +1253,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                         // find no of entries in last ledger
                         if (!ledgers.isEmpty()) {
                             final long id = ledgers.lastKey();
-                            AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) 
-> {
+                            BiConsumer<Integer, ReadHandle> opencb = (rc, lh) 
-> {
                                 log.debug().attr("managedLedger", 
managedLedgerName)
                                         .attr("ledgerId", id)
                                         .attr("result", 
BKException.getMessage(rc))
@@ -1282,16 +1284,24 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                             log.debug().attr("managedLedger", 
managedLedgerName)
                                     .attr("ledgerId", id).log("Opening 
ledger");
                             getBookKeeper()
-                                    .thenAccept(bk -> {
-                                        bk.asyncOpenLedgerNoRecovery(id, 
digestType, password, opencb, null);
-                                    }).exceptionally(ex -> {
-                                        log.warn().attr("managedLedger", 
managedLedgerName)
-                                                .attr("ledgerId", id)
-                                                .exception(ex)
-                                                .log("Failed to open ledger");
-                                        opencb.openComplete(-1, null, null);
-                                        mlMetaCounter.countDown();
-                                        return null;
+                                    .thenCompose(bk -> bk.newOpenLedgerOp()
+                                            .withRecovery(false)
+                                            .withLedgerId(id)
+                                            
.withDigestType(digestType.toApiDigestType())
+                                            .withPassword(password)
+                                            .withLoggerContext(
+                                                    
log.with().attr("managedLedger", managedLedgerName).build())
+                                            .execute())
+                                    .whenComplete((rh, ex) -> {
+                                        if (ex != null) {
+                                            log.warn().attr("managedLedger", 
managedLedgerName)
+                                                    .attr("ledgerId", id)
+                                                    .exception(ex)
+                                                    .log("Failed to open 
ledger");
+                                            
opencb.accept(BKException.getExceptionCode(ex), null);
+                                        } else {
+                                            opencb.accept(BKException.Code.OK, 
rh);
+                                        }
                                     });
                         } else {
                             log.warn().attr("managedLedger", 
managedLedgerName).log("Ledger list empty");
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 22d1655b718..5726701bbca 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -398,7 +398,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         this.config = config;
         this.store = store;
         this.name = name;
-        this.log = slog.with().attr("managedLedger", name).build();
+        this.log = 
slog.with().ctx(config.getLoggerContext()).attr("managedLedger", name).build();
         this.ledgerMetadata = 
LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
         this.digestType = 
BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
         this.scheduledExecutor = scheduledExecutor;
@@ -657,7 +657,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 // Save it back to ensure all nodes exist
                 store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), 
ledgersStat, storeLedgersCb);
             });
-        }, ledgerMetadata);
+        }, ledgerMetadata, log);
     }
 
     protected void initializeCursors(final 
ManagedLedgerInitializeLedgerCallback callback) {
@@ -899,7 +899,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 log.info("Creating a new ledger");
                 this.lastLedgerCreationInitiationTimestamp = 
System.currentTimeMillis();
                 mbean.startDataLedgerCreateOp();
-                asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap());
+                asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap(), log);
             }
         } else {
             checkArgument(state == State.LedgerOpened, "ledger=%s is not 
opened", state);
@@ -2002,7 +2002,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             // Use the executor here is to avoid use the Zookeeper thread to 
create the ledger which will lead
             // to deadlock at the zookeeper client, details to see 
https://github.com/apache/pulsar/issues/13736
             this.executor.execute(() ->
-                    asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap()));
+                    asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap(), log));
         }
     }
 
@@ -2245,7 +2245,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     && info != null && info.hasOffloadContext()
                     && !info.getOffloadContext().isBookkeeperDeleted()) {
                 openFuture = 
bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
-                        
.withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
+                        
.withDigestType(config.getDigestType()).withPassword(config.getPassword())
+                        .withLoggerContext(log).execute();
 
             } else if (info != null && info.hasOffloadContext() && 
info.getOffloadContext().isComplete()) {
 
@@ -2260,7 +2261,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                         offloadDriverMetadata);
             } else {
                 openFuture = 
bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
-                        
.withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
+                        
.withDigestType(config.getDigestType()).withPassword(config.getPassword())
+                        .withLoggerContext(log).execute();
             }
             openFuture.whenCompleteAsync((res, ex) -> {
                 mbean.endDataLedgerOpenOp();
@@ -4653,9 +4655,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
      * @param digestType
      * @param cb
      * @param metadata
+     * @param ctxLogger logger whose context attributes are propagated to the 
ledger handle logger
      */
     protected void asyncCreateLedger(BookKeeper bookKeeper, 
ManagedLedgerConfig config, DigestType digestType,
-            CreateCallback cb, Map<String, byte[]> metadata) {
+            CreateCallback cb, Map<String, byte[]> metadata, Logger ctxLogger) 
{
         CompletableFuture<LedgerHandle> ledgerFutureHook = new 
CompletableFuture<>();
         Map<String, byte[]> finalMetadata = new HashMap<>();
         finalMetadata.putAll(ledgerMetadata);
@@ -4675,8 +4678,22 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
         createdLedgerCustomMetadata = finalMetadata;
         try {
-            bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, 
config.getPassword(), cb, ledgerFutureHook, finalMetadata);
+            bookKeeper.newCreateLedgerOp()
+                    .withEnsembleSize(config.getEnsembleSize())
+                    .withWriteQuorumSize(config.getWriteQuorumSize())
+                    .withAckQuorumSize(config.getAckQuorumSize())
+                    .withDigestType(digestType.toApiDigestType())
+                    .withPassword(config.getPassword())
+                    .withCustomMetadata(finalMetadata)
+                    .withLoggerContext(ctxLogger)
+                    .execute()
+                    .whenComplete((writeHandle, ex) -> {
+                        if (ex != null) {
+                            
cb.createComplete(BKException.getExceptionCode(ex), null, ledgerFutureHook);
+                        } else {
+                            cb.createComplete(Code.OK, (LedgerHandle) 
writeHandle, ledgerFutureHook);
+                        }
+                    });
         } catch (Throwable cause) {
             log.error().exception(cause).log("Encountered unexpected error 
when creating ledger");
             ledgerFutureHook.completeExceptionally(cause);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 98143e7cda5..6df03c348fa 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -75,7 +75,8 @@ public class ReadOnlyManagedLedgerImpl extends 
ManagedLedgerImpl {
 
                     // Fetch last add confirmed for last ledger
                     
bookKeeper.newOpenLedgerOp().withRecovery(false).withLedgerId(lastLedgerId)
-                            
.withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute()
+                            
.withDigestType(config.getDigestType()).withPassword(config.getPassword())
+                            .withLoggerContext(log).execute()
                             .thenAccept(readHandle -> {
                                 
readHandle.readLastAddConfirmedAsync().thenAccept(lastAddConfirmed -> {
                                     LedgerInfo info = new 
LedgerInfo().setLedgerId(lastLedgerId)
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
index 9385f0983eb..b408c1e9136 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.BDDMockito.given;
@@ -37,6 +36,7 @@ import lombok.CustomLog;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -142,12 +143,9 @@ public class ManagedLedgerFactoryShutdownTest {
             cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class));
             return null;
         }).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), 
any(), anyBoolean());
-        doAnswer(inv -> {
-            AsyncCallback.CreateCallback cb = inv.getArgument(5, 
AsyncCallback.CreateCallback.class);
-            cb.createComplete(0, newLedgerHandle, inv.getArgument(6, 
Object.class));
-            return null;
-        }).when(bookKeeper)
-                .asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), 
any()/*callback*/, any(), any());
+        CreateBuilder createBuilder = mock(CreateBuilder.class, 
Mockito.RETURNS_SELF);
+        doAnswer(inv -> 
CompletableFuture.completedFuture(newLedgerHandle)).when(createBuilder).execute();
+        given(bookKeeper.newCreateLedgerOp()).willReturn(createBuilder);
 
 
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerLoggerContextTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerLoggerContextTest.java
new file mode 100644
index 00000000000..3a6ec2609bc
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerLoggerContextTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import io.github.merlimat.slog.Logger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Property;
+import org.testng.annotations.Test;
+
+/**
+ * Verifies that the logger context attributes set via {@link 
ManagedLedgerConfig#setLoggerContext(Logger)} are
+ * inherited by the managed ledger and cursor loggers.
+ */
+public class ManagedLedgerLoggerContextTest extends MockedBookKeeperTestCase {
+
+    private static class CapturingAppender extends AbstractAppender {
+        private final List<Map<String, String>> contextData = new 
ArrayList<>();
+        private final List<String> messages = new ArrayList<>();
+
+        CapturingAppender() {
+            super("ml-logger-context-capture", null, null, false, 
Property.EMPTY_ARRAY);
+        }
+
+        @Override
+        public synchronized void append(LogEvent event) {
+            // The event is a reusable mutable instance: snapshot the data
+            Map<String, String> attrs = new HashMap<>();
+            event.getContextData().forEach((k, v) -> attrs.put(k, 
String.valueOf(v)));
+            contextData.add(attrs);
+            messages.add(event.getMessage().getFormattedMessage());
+        }
+
+        synchronized Map<String, String> attrsForMessage(String message) {
+            for (int i = 0; i < messages.size(); i++) {
+                if (messages.get(i).equals(message)) {
+                    return contextData.get(i);
+                }
+            }
+            return null;
+        }
+    }
+
+    @Test
+    public void testLoggerContextInheritance() throws Exception {
+        LoggerContext loggerContext = (LoggerContext) 
LogManager.getContext(false);
+        CapturingAppender appender = new CapturingAppender();
+        appender.start();
+        loggerContext.getConfiguration().getRootLogger().addAppender(appender, 
Level.INFO, null);
+        loggerContext.updateLoggers();
+
+        try {
+            ManagedLedgerConfig config = new ManagedLedgerConfig();
+            config.setLoggerContext(Logger.get(getClass())
+                    .with()
+                    .attr("topic", "persistent://public/default/my-topic")
+                    .attr("segment", "0000-7fff-1")
+                    .build());
+
+            ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("logger_context_ledger", config);
+            ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor("sub-1");
+
+            ledger.getLogger().info("ml-logger-probe");
+            cursor.log.info("cursor-logger-probe");
+
+            Map<String, String> mlAttrs = 
appender.attrsForMessage("ml-logger-probe");
+            assertTrue(mlAttrs != null, "Expected the managed ledger probe 
event to be captured");
+            assertEquals(mlAttrs.get("topic"), 
"persistent://public/default/my-topic");
+            assertEquals(mlAttrs.get("segment"), "0000-7fff-1");
+            assertEquals(mlAttrs.get("managedLedger"), 
"logger_context_ledger");
+
+            Map<String, String> cursorAttrs = 
appender.attrsForMessage("cursor-logger-probe");
+            assertTrue(cursorAttrs != null, "Expected the cursor probe event 
to be captured");
+            assertEquals(cursorAttrs.get("topic"), 
"persistent://public/default/my-topic");
+            assertEquals(cursorAttrs.get("segment"), "0000-7fff-1");
+            assertEquals(cursorAttrs.get("managedLedger"), 
"logger_context_ledger");
+            assertEquals(cursorAttrs.get("cursor"), "sub-1");
+        } finally {
+            
loggerContext.getConfiguration().getRootLogger().removeAppender(appender.getName());
+            loggerContext.updateLoggers();
+            appender.stop();
+        }
+    }
+}
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 5dbdcaa71e8..67d2ab17e4f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -103,9 +103,11 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockReadHandleInterceptor;
+import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.common.util.BoundedScheduledExecutorService;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -822,7 +824,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
             }, null);
             log.info("after add, ledger state: " + ledger.getState());
             return invocationOnMock.callRealMethod();
-        }).when(ledger).asyncCreateLedger(any(), any(), any(), any(), any());
+        }).when(ledger).asyncCreateLedger(any(), any(), any(), any(), any(), 
any());
         doAnswer(invocationOnMock -> {
             Object o = invocationOnMock.callRealMethod();
             log.info("createComplete finished, state: " + ledger.getState());
@@ -3333,15 +3335,17 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("timeout_ledger_test", config);
 
         BookKeeper bk = mock(BookKeeper.class);
-        doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), 
any(), any(), any(), any(), any());
+        CreateBuilder createBuilder = mock(CreateBuilder.class, 
Mockito.RETURNS_SELF);
+        doReturn(new 
CompletableFuture<WriteHandle>()).when(createBuilder).execute();
+        doReturn(createBuilder).when(bk).newCreateLedgerOp();
         AtomicInteger response = new AtomicInteger(0);
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<Object> ctxHolder = new AtomicReference<>();
-        ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {
+        ledger.asyncCreateLedger(bk, config, BookKeeper.DigestType.CRC32C, 
(rc, lh, ctx) -> {
             response.set(rc);
             latch.countDown();
             ctxHolder.set(ctx);
-        }, Collections.emptyMap());
+        }, Collections.emptyMap(), ledger.getLogger());
 
         latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, 
TimeUnit.SECONDS);
         assertEquals(response.get(), BKException.Code.TimeoutException);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 57f41b41e76..09bbb23d8c0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -169,19 +169,23 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
         CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         Map<String, byte[]> metadata = 
LedgerMetadataUtils.buildMetadataForDelayedIndexBucket(bucketKey,
                 topicName, cursorName);
-        bookKeeper.asyncCreateLedger(
-                config.getManagedLedgerDefaultEnsembleSize(),
-                config.getManagedLedgerDefaultWriteQuorum(),
-                config.getManagedLedgerDefaultAckQuorum(),
-                
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
-                LedgerPassword,
-                (rc, handle, ctx) -> {
-                    if (rc != BKException.Code.OK) {
-                        future.completeExceptionally(bkException("Create 
ledger", rc, -1));
+        bookKeeper.newCreateLedgerOp()
+                .withEnsembleSize(config.getManagedLedgerDefaultEnsembleSize())
+                
.withWriteQuorumSize(config.getManagedLedgerDefaultWriteQuorum())
+                .withAckQuorumSize(config.getManagedLedgerDefaultAckQuorum())
+                .withDigestType(config.getManagedLedgerDigestType())
+                .withPassword(LedgerPassword)
+                .withCustomMetadata(metadata)
+                .withLoggerContext(log.with().attr("topic", 
topicName).attr("cursor", cursorName).build())
+                .execute()
+                .whenComplete((writeHandle, ex) -> {
+                    if (ex != null) {
+                        future.completeExceptionally(bkException("Create 
ledger",
+                                BKException.getExceptionCode(ex), -1));
                     } else {
-                        future.complete(handle);
+                        future.complete((LedgerHandle) writeHandle);
                     }
-                }, null, metadata);
+                });
         return future;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d180048b1b3..f702ffc1afe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -29,6 +29,7 @@ import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInte
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.RateLimiter;
+import io.github.merlimat.slog.LoggerBuilder;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.AdaptiveRecvByteBufAllocator;
@@ -1949,6 +1950,16 @@ public class BrokerService implements Closeable {
             // Once we have the configuration, we can proceed with the async 
open operation
             ManagedLedgerFactory managedLedgerFactory =
                     getManagedLedgerFactoryForTopic(topicName, 
managedLedgerConfig.getStorageClassName());
+            LoggerBuilder loggerContextBuilder = log.with();
+            if (topicName.isSegment()) {
+                loggerContextBuilder
+                        .attr("topic", 
TopicName.get(TopicDomain.topic.value(), topicName.getTenant(),
+                                topicName.getNamespacePortion(), 
topicName.getLocalName()).toString())
+                        .attr("segment", topicName.getSegmentDescriptor());
+            } else {
+                loggerContextBuilder.attr("topic", topicName.toString());
+            }
+            managedLedgerConfig.setLoggerContext(loggerContextBuilder.build());
             
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), 
managedLedgerConfig,
                     new OpenLedgerCallback() {
                         @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index a7921b8b8d0..246da337f50 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -645,20 +645,24 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         Map<String, byte[]> metadata = 
LedgerMetadataUtils.buildMetadataForSchema(schemaId);
         final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
         try {
-            bookKeeper.asyncCreateLedger(
-                    config.getManagedLedgerDefaultEnsembleSize(),
-                    config.getManagedLedgerDefaultWriteQuorum(),
-                    config.getManagedLedgerDefaultAckQuorum(),
-                    
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
-                    LedgerPassword,
-                    (rc, handle, ctx) -> {
-                        if (rc != BKException.Code.OK) {
-                            future.completeExceptionally(bkException("Failed 
to create ledger", rc, -1, -1,
+            bookKeeper.newCreateLedgerOp()
+                    
.withEnsembleSize(config.getManagedLedgerDefaultEnsembleSize())
+                    
.withWriteQuorumSize(config.getManagedLedgerDefaultWriteQuorum())
+                    
.withAckQuorumSize(config.getManagedLedgerDefaultAckQuorum())
+                    .withDigestType(config.getManagedLedgerDigestType())
+                    .withPassword(LedgerPassword)
+                    .withCustomMetadata(metadata)
+                    .withLoggerContext(log.with().attr("schemaId", 
schemaId).build())
+                    .execute()
+                    .whenComplete((writeHandle, ex) -> {
+                        if (ex != null) {
+                            future.completeExceptionally(bkException("Failed 
to create ledger",
+                                    BKException.getExceptionCode(ex), -1, -1,
                                     config.isSchemaLedgerForceRecovery()));
                         } else {
-                            future.complete(handle);
+                            future.complete((LedgerHandle) writeHandle);
                         }
-                    }, null, metadata);
+                    });
         } catch (Throwable t) {
             log.error()
                     .attr("schemaId", schemaId)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index cdc6323f0ee..29c77374e2f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -164,6 +164,10 @@ public class MLPendingAckStoreProvider implements 
TransactionPendingAckStoreProv
                                                    Timer 
brokerClientSharedTimer,
                                                    PersistentTopic 
originPersistentTopic) {
         config.setCreateIfMissing(true);
+        config.setLoggerContext(log.with()
+                .attr("topic", topicName.toString())
+                .attr("subscription", subscription.getName())
+                .build());
         brokerService
                 .getManagedLedgerFactoryForTopic(topicName, 
config.getStorageClassName())
                 
.asyncOpen(pendingAckTopicNameObject.getPersistenceNamingEncoding(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index af989d181fe..7255421b84e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -185,7 +185,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
       Map<String, MessageId> latestForKey, BookKeeper bk) {
     Map<String, byte[]> metadata =
         LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(), 
to.toByteArray());
-    return createLedger(bk, metadata).thenCompose((ledger) -> {
+    return createLedger(bk, metadata, reader.getTopic()).thenCompose((ledger) 
-> {
       log.info()
               .attr("topic", reader.getTopic())
               .attr("from", from)
@@ -384,22 +384,26 @@ public abstract class AbstractTwoPhaseCompactor<T> 
extends Compactor {
   }
 
   protected CompletableFuture<LedgerHandle> createLedger(BookKeeper bk,
-      Map<String, byte[]> metadata) {
+      Map<String, byte[]> metadata, String topic) {
     CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
 
     try {
-      bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
-          conf.getManagedLedgerDefaultWriteQuorum(),
-          conf.getManagedLedgerDefaultAckQuorum(),
-          Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
-          Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
-          (rc, ledger, ctx) -> {
-            if (rc != BKException.Code.OK) {
-              bkf.completeExceptionally(BKException.create(rc));
+      bk.newCreateLedgerOp()
+          .withEnsembleSize(conf.getManagedLedgerDefaultEnsembleSize())
+          .withWriteQuorumSize(conf.getManagedLedgerDefaultWriteQuorum())
+          .withAckQuorumSize(conf.getManagedLedgerDefaultAckQuorum())
+          
.withDigestType(Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE.toApiDigestType())
+          .withPassword(Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD)
+          .withCustomMetadata(metadata)
+          .withLoggerContext(log.with().attr("topic", topic).build())
+          .execute()
+          .whenComplete((writeHandle, ex) -> {
+            if (ex != null) {
+              
bkf.completeExceptionally(BKException.create(BKException.getExceptionCode(ex)));
             } else {
-              bkf.complete(ledger);
+              bkf.complete((LedgerHandle) writeHandle);
             }
-          }, null, metadata);
+          });
     } catch (Throwable t) {
       log.error().exception(t).log("Encountered unexpected error when creating 
compaction ledger");
       return FutureUtil.failedFuture(t);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index a4688830ff3..0569c0f6850 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -334,7 +334,7 @@ public class StrategicTwoPhaseCompactor extends 
PublishingOrderCompactor {
         Map<String, byte[]> metadata =
                 LedgerMetadataUtils.buildMetadataForCompactedLedger(
                         phaseOneResult.topic, 
phaseOneResult.lastId.toByteArray());
-        return createLedger(bk, metadata)
+        return createLedger(bk, metadata, phaseOneResult.topic)
                 .thenCompose((ledger) -> {
                     log.info()
                             .attr("topic", phaseOneResult.topic)
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 692673ccdee..d085b40a1c8 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -91,6 +91,7 @@ public class MLTransactionLogImpl implements TransactionLog {
         this.tcId = tcID.getId();
         this.managedLedgerFactory = managedLedgerFactory;
         this.managedLedgerConfig = managedLedgerConfig;
+        this.managedLedgerConfig.setLoggerContext(log.with().attr("tcId", 
tcId).build());
         this.timer = timer;
         this.txnLogBufferedWriterConfig = txnLogBufferedWriterConfig;
         if (txnLogBufferedWriterConfig.isBatchEnabled()) {
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 0e1e75248e4..617edaa1f0c 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -47,10 +48,14 @@ import lombok.Setter;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.api.CreateAdvBuilder;
+import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.client.impl.OpenBuilderBase;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -239,6 +244,80 @@ public class PulsarMockBookKeeper extends BookKeeper {
 
 
 
+    @Override
+    public CreateBuilder newCreateLedgerOp() {
+        return new CreateBuilder() {
+            private int ensembleSize = 3;
+            private int writeQuorumSize = 2;
+            private int ackQuorumSize = 2;
+            private byte[] password = new byte[0];
+            private org.apache.bookkeeper.client.api.DigestType digestType =
+                    org.apache.bookkeeper.client.api.DigestType.CRC32;
+            private Map<String, byte[]> customMetadata = 
Collections.emptyMap();
+
+            @Override
+            public CreateBuilder withEnsembleSize(int ensembleSize) {
+                this.ensembleSize = ensembleSize;
+                return this;
+            }
+
+            @Override
+            public CreateBuilder withWriteQuorumSize(int writeQuorumSize) {
+                this.writeQuorumSize = writeQuorumSize;
+                return this;
+            }
+
+            @Override
+            public CreateBuilder withAckQuorumSize(int ackQuorumSize) {
+                this.ackQuorumSize = ackQuorumSize;
+                return this;
+            }
+
+            @Override
+            public CreateBuilder withPassword(byte[] password) {
+                this.password = password;
+                return this;
+            }
+
+            @Override
+            public CreateBuilder withWriteFlags(EnumSet<WriteFlag> writeFlags) 
{
+                return this;
+            }
+
+            @Override
+            public CreateBuilder withCustomMetadata(Map<String, byte[]> 
customMetadata) {
+                this.customMetadata = customMetadata;
+                return this;
+            }
+
+            @Override
+            public CreateBuilder 
withDigestType(org.apache.bookkeeper.client.api.DigestType digestType) {
+                this.digestType = digestType;
+                return this;
+            }
+
+            @Override
+            public CreateAdvBuilder makeAdv() {
+                throw new UnsupportedOperationException("Adv ledger creation 
is not supported by the mock");
+            }
+
+            @Override
+            public CompletableFuture<WriteHandle> execute() {
+                CompletableFuture<WriteHandle> future = new 
CompletableFuture<>();
+                asyncCreateLedger(ensembleSize, writeQuorumSize, ackQuorumSize,
+                        DigestType.fromApiDigestType(digestType), password,
+                        (rc, lh, ctx) -> {
+                            if (rc != BKException.Code.OK) {
+                                
future.completeExceptionally(BKException.create(rc));
+                            } else {
+                                future.complete(lh);
+                            }
+                        }, null, customMetadata);
+                return future;
+            }
+        };
+    }
+
     @Override
     public OpenBuilder newOpenLedgerOp() {
         return new OpenBuilderBase() {


Reply via email to