This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 565b59bd278 [improve][ml] Propagate slog logger context to BookKeeper
ledger create/open (#25995)
565b59bd278 is described below
commit 565b59bd278aefe3a8a87fd3f477103ec07b7ab1
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 11 07:29:10 2026 -0700
[improve][ml] Propagate slog logger context to BookKeeper ledger
create/open (#25995)
---
.../bookkeeper/mledger/ManagedLedgerConfig.java | 23 +++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 +-
.../mledger/impl/ManagedLedgerFactoryImpl.java | 32 ++++--
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 35 +++++--
.../mledger/impl/ReadOnlyManagedLedgerImpl.java | 3 +-
.../impl/ManagedLedgerFactoryShutdownTest.java | 12 +--
.../impl/ManagedLedgerLoggerContextTest.java | 111 +++++++++++++++++++++
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 12 ++-
.../bucket/BookkeeperBucketSnapshotStorage.java | 26 +++--
.../pulsar/broker/service/BrokerService.java | 11 ++
.../service/schema/BookkeeperSchemaStorage.java | 26 +++--
.../pendingack/impl/MLPendingAckStoreProvider.java | 4 +
.../compaction/AbstractTwoPhaseCompactor.java | 28 +++---
.../compaction/StrategicTwoPhaseCompactor.java | 2 +-
.../coordinator/impl/MLTransactionLogImpl.java | 1 +
.../bookkeeper/client/PulsarMockBookKeeper.java | 79 +++++++++++++++
16 files changed, 340 insertions(+), 69 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 8fbf1cfda0d..20da70bece8 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger;
import static com.google.common.base.Preconditions.checkArgument;
+import io.github.merlimat.slog.Logger;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.Arrays;
@@ -81,6 +82,7 @@ public class ManagedLedgerConfig {
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
+ private Logger loggerContext;
private ManagedLedgerInterceptor managedLedgerInterceptor;
private Map<String, String> properties;
private int inactiveLedgerRollOverTimeMs = 0;
@@ -597,6 +599,27 @@ public class ManagedLedgerConfig {
return this;
}
+ /**
+ * Get the parent logger whose context attributes are inherited by the
managed ledger logger.
+ *
+ * @return the parent logger, or null if none was set
+ */
+ public Logger getLoggerContext() {
+ return loggerContext;
+ }
+
+ /**
+ * Set a parent slog {@link Logger} whose context attributes (e.g. {@code
topic}, {@code subscription}) are
+ * inherited by the managed ledger logger and propagated to the BookKeeper
client when ledgers are created or
+ * opened, so that log statements emitted by the BookKeeper client carry
the application context.
+ *
+ * @param loggerContext logger whose context attributes to inherit; null
means no extra context
+ */
+ public ManagedLedgerConfig setLoggerContext(Logger loggerContext) {
+ this.loggerContext = loggerContext;
+ return this;
+ }
+
/**
* Get clock to use to time operations.
*
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 67d392f78f5..b043a41c990 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -374,7 +374,7 @@ public class ManagedCursorImpl implements ManagedCursor {
this.cursorProperties = Collections.emptyMap();
this.ledger = ledger;
this.name = cursorName;
- this.log = slog.with().attr("managedLedger",
ledger.getName()).attr("cursor", name).build();
+ this.log = slog.with().ctx(ledger.getLogger()).attr("cursor",
name).build();
this.individualDeletedMessages = new
RangeSetWrapper<>(positionRangeConverter,
positionRangeReverseConverter, this);
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
@@ -3322,7 +3322,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug().attr("ledgerId", lh.getId()).log("Created cursor
ledger");
future.complete(lh);
});
- }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
+ }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name), log);
return future;
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 269cbafc207..e855a21c17d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -47,6 +47,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.CustomLog;
@@ -57,6 +58,7 @@ import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -1251,7 +1253,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
// find no of entries in last ledger
if (!ledgers.isEmpty()) {
final long id = ledgers.lastKey();
- AsyncCallback.OpenCallback opencb = (rc, lh, ctx1)
-> {
+ BiConsumer<Integer, ReadHandle> opencb = (rc, lh)
-> {
log.debug().attr("managedLedger",
managedLedgerName)
.attr("ledgerId", id)
.attr("result",
BKException.getMessage(rc))
@@ -1282,16 +1284,24 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
log.debug().attr("managedLedger",
managedLedgerName)
.attr("ledgerId", id).log("Opening
ledger");
getBookKeeper()
- .thenAccept(bk -> {
- bk.asyncOpenLedgerNoRecovery(id,
digestType, password, opencb, null);
- }).exceptionally(ex -> {
- log.warn().attr("managedLedger",
managedLedgerName)
- .attr("ledgerId", id)
- .exception(ex)
- .log("Failed to open ledger");
- opencb.openComplete(-1, null, null);
- mlMetaCounter.countDown();
- return null;
+ .thenCompose(bk -> bk.newOpenLedgerOp()
+ .withRecovery(false)
+ .withLedgerId(id)
+
.withDigestType(digestType.toApiDigestType())
+ .withPassword(password)
+ .withLoggerContext(
+
log.with().attr("managedLedger", managedLedgerName).build())
+ .execute())
+ .whenComplete((rh, ex) -> {
+ if (ex != null) {
+ log.warn().attr("managedLedger",
managedLedgerName)
+ .attr("ledgerId", id)
+ .exception(ex)
+ .log("Failed to open
ledger");
+
opencb.accept(BKException.getExceptionCode(ex), null);
+ } else {
+ opencb.accept(BKException.Code.OK,
rh);
+ }
});
} else {
log.warn().attr("managedLedger",
managedLedgerName).log("Ledger list empty");
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 22d1655b718..5726701bbca 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -398,7 +398,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
this.config = config;
this.store = store;
this.name = name;
- this.log = slog.with().attr("managedLedger", name).build();
+ this.log =
slog.with().ctx(config.getLoggerContext()).attr("managedLedger", name).build();
this.ledgerMetadata =
LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
this.digestType =
BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
this.scheduledExecutor = scheduledExecutor;
@@ -657,7 +657,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// Save it back to ensure all nodes exist
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(),
ledgersStat, storeLedgersCb);
});
- }, ledgerMetadata);
+ }, ledgerMetadata, log);
}
protected void initializeCursors(final
ManagedLedgerInitializeLedgerCallback callback) {
@@ -899,7 +899,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.info("Creating a new ledger");
this.lastLedgerCreationInitiationTimestamp =
System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
- asyncCreateLedger(bookKeeper, config, digestType, this,
Collections.emptyMap());
+ asyncCreateLedger(bookKeeper, config, digestType, this,
Collections.emptyMap(), log);
}
} else {
checkArgument(state == State.LedgerOpened, "ledger=%s is not
opened", state);
@@ -2002,7 +2002,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// Use the executor here is to avoid use the Zookeeper thread to
create the ledger which will lead
// to deadlock at the zookeeper client, details to see
https://github.com/apache/pulsar/issues/13736
this.executor.execute(() ->
- asyncCreateLedger(bookKeeper, config, digestType, this,
Collections.emptyMap()));
+ asyncCreateLedger(bookKeeper, config, digestType, this,
Collections.emptyMap(), log));
}
}
@@ -2245,7 +2245,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
&& info != null && info.hasOffloadContext()
&& !info.getOffloadContext().isBookkeeperDeleted()) {
openFuture =
bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
-
.withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
+
.withDigestType(config.getDigestType()).withPassword(config.getPassword())
+ .withLoggerContext(log).execute();
} else if (info != null && info.hasOffloadContext() &&
info.getOffloadContext().isComplete()) {
@@ -2260,7 +2261,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
offloadDriverMetadata);
} else {
openFuture =
bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
-
.withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
+
.withDigestType(config.getDigestType()).withPassword(config.getPassword())
+ .withLoggerContext(log).execute();
}
openFuture.whenCompleteAsync((res, ex) -> {
mbean.endDataLedgerOpenOp();
@@ -4653,9 +4655,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
* @param digestType
* @param cb
* @param metadata
+ * @param ctxLogger logger whose context attributes are propagated to the
ledger handle logger
*/
protected void asyncCreateLedger(BookKeeper bookKeeper,
ManagedLedgerConfig config, DigestType digestType,
- CreateCallback cb, Map<String, byte[]> metadata) {
+ CreateCallback cb, Map<String, byte[]> metadata, Logger ctxLogger)
{
CompletableFuture<LedgerHandle> ledgerFutureHook = new
CompletableFuture<>();
Map<String, byte[]> finalMetadata = new HashMap<>();
finalMetadata.putAll(ledgerMetadata);
@@ -4675,8 +4678,22 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
createdLedgerCustomMetadata = finalMetadata;
try {
- bookKeeper.asyncCreateLedger(config.getEnsembleSize(),
config.getWriteQuorumSize(),
- config.getAckQuorumSize(), digestType,
config.getPassword(), cb, ledgerFutureHook, finalMetadata);
+ bookKeeper.newCreateLedgerOp()
+ .withEnsembleSize(config.getEnsembleSize())
+ .withWriteQuorumSize(config.getWriteQuorumSize())
+ .withAckQuorumSize(config.getAckQuorumSize())
+ .withDigestType(digestType.toApiDigestType())
+ .withPassword(config.getPassword())
+ .withCustomMetadata(finalMetadata)
+ .withLoggerContext(ctxLogger)
+ .execute()
+ .whenComplete((writeHandle, ex) -> {
+ if (ex != null) {
+
cb.createComplete(BKException.getExceptionCode(ex), null, ledgerFutureHook);
+ } else {
+ cb.createComplete(Code.OK, (LedgerHandle)
writeHandle, ledgerFutureHook);
+ }
+ });
} catch (Throwable cause) {
log.error().exception(cause).log("Encountered unexpected error
when creating ledger");
ledgerFutureHook.completeExceptionally(cause);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 98143e7cda5..6df03c348fa 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -75,7 +75,8 @@ public class ReadOnlyManagedLedgerImpl extends
ManagedLedgerImpl {
// Fetch last add confirmed for last ledger
bookKeeper.newOpenLedgerOp().withRecovery(false).withLedgerId(lastLedgerId)
-
.withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute()
+
.withDigestType(config.getDigestType()).withPassword(config.getPassword())
+ .withLoggerContext(log).execute()
.thenAccept(readHandle -> {
readHandle.readLastAddConfirmedAsync().thenAccept(lastAddConfirmed -> {
LedgerInfo info = new
LedgerInfo().setLedgerId(lastLedgerId)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
index 9385f0983eb..b408c1e9136 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.mledger.impl;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
@@ -37,6 +36,7 @@ import lombok.CustomLog;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -142,12 +143,9 @@ public class ManagedLedgerFactoryShutdownTest {
cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class));
return null;
}).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(),
any(), anyBoolean());
- doAnswer(inv -> {
- AsyncCallback.CreateCallback cb = inv.getArgument(5,
AsyncCallback.CreateCallback.class);
- cb.createComplete(0, newLedgerHandle, inv.getArgument(6,
Object.class));
- return null;
- }).when(bookKeeper)
- .asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(),
any()/*callback*/, any(), any());
+ CreateBuilder createBuilder = mock(CreateBuilder.class,
Mockito.RETURNS_SELF);
+ doAnswer(inv ->
CompletableFuture.completedFuture(newLedgerHandle)).when(createBuilder).execute();
+ given(bookKeeper.newCreateLedgerOp()).willReturn(createBuilder);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerLoggerContextTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerLoggerContextTest.java
new file mode 100644
index 00000000000..3a6ec2609bc
--- /dev/null
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerLoggerContextTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import io.github.merlimat.slog.Logger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Property;
+import org.testng.annotations.Test;
+
+/**
+ * Verifies that the logger context attributes set via {@link
ManagedLedgerConfig#setLoggerContext(Logger)} are
+ * inherited by the managed ledger and cursor loggers.
+ */
+public class ManagedLedgerLoggerContextTest extends MockedBookKeeperTestCase {
+
+ private static class CapturingAppender extends AbstractAppender {
+ private final List<Map<String, String>> contextData = new
ArrayList<>();
+ private final List<String> messages = new ArrayList<>();
+
+ CapturingAppender() {
+ super("ml-logger-context-capture", null, null, false,
Property.EMPTY_ARRAY);
+ }
+
+ @Override
+ public synchronized void append(LogEvent event) {
+ // The event is a reusable mutable instance: snapshot the data
+ Map<String, String> attrs = new HashMap<>();
+ event.getContextData().forEach((k, v) -> attrs.put(k,
String.valueOf(v)));
+ contextData.add(attrs);
+ messages.add(event.getMessage().getFormattedMessage());
+ }
+
+ synchronized Map<String, String> attrsForMessage(String message) {
+ for (int i = 0; i < messages.size(); i++) {
+ if (messages.get(i).equals(message)) {
+ return contextData.get(i);
+ }
+ }
+ return null;
+ }
+ }
+
+ @Test
+ public void testLoggerContextInheritance() throws Exception {
+ LoggerContext loggerContext = (LoggerContext)
LogManager.getContext(false);
+ CapturingAppender appender = new CapturingAppender();
+ appender.start();
+ loggerContext.getConfiguration().getRootLogger().addAppender(appender,
Level.INFO, null);
+ loggerContext.updateLoggers();
+
+ try {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setLoggerContext(Logger.get(getClass())
+ .with()
+ .attr("topic", "persistent://public/default/my-topic")
+ .attr("segment", "0000-7fff-1")
+ .build());
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("logger_context_ledger", config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor("sub-1");
+
+ ledger.getLogger().info("ml-logger-probe");
+ cursor.log.info("cursor-logger-probe");
+
+ Map<String, String> mlAttrs =
appender.attrsForMessage("ml-logger-probe");
+ assertTrue(mlAttrs != null, "Expected the managed ledger probe
event to be captured");
+ assertEquals(mlAttrs.get("topic"),
"persistent://public/default/my-topic");
+ assertEquals(mlAttrs.get("segment"), "0000-7fff-1");
+ assertEquals(mlAttrs.get("managedLedger"),
"logger_context_ledger");
+
+ Map<String, String> cursorAttrs =
appender.attrsForMessage("cursor-logger-probe");
+ assertTrue(cursorAttrs != null, "Expected the cursor probe event
to be captured");
+ assertEquals(cursorAttrs.get("topic"),
"persistent://public/default/my-topic");
+ assertEquals(cursorAttrs.get("segment"), "0000-7fff-1");
+ assertEquals(cursorAttrs.get("managedLedger"),
"logger_context_ledger");
+ assertEquals(cursorAttrs.get("cursor"), "sub-1");
+ } finally {
+
loggerContext.getConfiguration().getRootLogger().removeAppender(appender.getName());
+ loggerContext.updateLoggers();
+ appender.stop();
+ }
+ }
+}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 5dbdcaa71e8..67d2ab17e4f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -103,9 +103,11 @@ import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.client.PulsarMockReadHandleInterceptor;
+import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.common.util.BoundedScheduledExecutorService;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -822,7 +824,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
}, null);
log.info("after add, ledger state: " + ledger.getState());
return invocationOnMock.callRealMethod();
- }).when(ledger).asyncCreateLedger(any(), any(), any(), any(), any());
+ }).when(ledger).asyncCreateLedger(any(), any(), any(), any(), any(),
any());
doAnswer(invocationOnMock -> {
Object o = invocationOnMock.callRealMethod();
log.info("createComplete finished, state: " + ledger.getState());
@@ -3333,15 +3335,17 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("timeout_ledger_test", config);
BookKeeper bk = mock(BookKeeper.class);
- doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(),
any(), any(), any(), any(), any());
+ CreateBuilder createBuilder = mock(CreateBuilder.class,
Mockito.RETURNS_SELF);
+ doReturn(new
CompletableFuture<WriteHandle>()).when(createBuilder).execute();
+ doReturn(createBuilder).when(bk).newCreateLedgerOp();
AtomicInteger response = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Object> ctxHolder = new AtomicReference<>();
- ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {
+ ledger.asyncCreateLedger(bk, config, BookKeeper.DigestType.CRC32C,
(rc, lh, ctx) -> {
response.set(rc);
latch.countDown();
ctxHolder.set(ctx);
- }, Collections.emptyMap());
+ }, Collections.emptyMap(), ledger.getLogger());
latch.await(config.getMetadataOperationsTimeoutSeconds() + 2,
TimeUnit.SECONDS);
assertEquals(response.get(), BKException.Code.TimeoutException);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 57f41b41e76..09bbb23d8c0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -169,19 +169,23 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
Map<String, byte[]> metadata =
LedgerMetadataUtils.buildMetadataForDelayedIndexBucket(bucketKey,
topicName, cursorName);
- bookKeeper.asyncCreateLedger(
- config.getManagedLedgerDefaultEnsembleSize(),
- config.getManagedLedgerDefaultWriteQuorum(),
- config.getManagedLedgerDefaultAckQuorum(),
-
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
- LedgerPassword,
- (rc, handle, ctx) -> {
- if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Create
ledger", rc, -1));
+ bookKeeper.newCreateLedgerOp()
+ .withEnsembleSize(config.getManagedLedgerDefaultEnsembleSize())
+
.withWriteQuorumSize(config.getManagedLedgerDefaultWriteQuorum())
+ .withAckQuorumSize(config.getManagedLedgerDefaultAckQuorum())
+ .withDigestType(config.getManagedLedgerDigestType())
+ .withPassword(LedgerPassword)
+ .withCustomMetadata(metadata)
+ .withLoggerContext(log.with().attr("topic",
topicName).attr("cursor", cursorName).build())
+ .execute()
+ .whenComplete((writeHandle, ex) -> {
+ if (ex != null) {
+ future.completeExceptionally(bkException("Create
ledger",
+ BKException.getExceptionCode(ex), -1));
} else {
- future.complete(handle);
+ future.complete((LedgerHandle) writeHandle);
}
- }, null, metadata);
+ });
return future;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d180048b1b3..f702ffc1afe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -29,6 +29,7 @@ import static
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInte
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.RateLimiter;
+import io.github.merlimat.slog.LoggerBuilder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
@@ -1949,6 +1950,16 @@ public class BrokerService implements Closeable {
// Once we have the configuration, we can proceed with the async
open operation
ManagedLedgerFactory managedLedgerFactory =
getManagedLedgerFactoryForTopic(topicName,
managedLedgerConfig.getStorageClassName());
+ LoggerBuilder loggerContextBuilder = log.with();
+ if (topicName.isSegment()) {
+ loggerContextBuilder
+ .attr("topic",
TopicName.get(TopicDomain.topic.value(), topicName.getTenant(),
+ topicName.getNamespacePortion(),
topicName.getLocalName()).toString())
+ .attr("segment", topicName.getSegmentDescriptor());
+ } else {
+ loggerContextBuilder.attr("topic", topicName.toString());
+ }
+ managedLedgerConfig.setLoggerContext(loggerContextBuilder.build());
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(),
managedLedgerConfig,
new OpenLedgerCallback() {
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index a7921b8b8d0..246da337f50 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -645,20 +645,24 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
Map<String, byte[]> metadata =
LedgerMetadataUtils.buildMetadataForSchema(schemaId);
final CompletableFuture<LedgerHandle> future = new
CompletableFuture<>();
try {
- bookKeeper.asyncCreateLedger(
- config.getManagedLedgerDefaultEnsembleSize(),
- config.getManagedLedgerDefaultWriteQuorum(),
- config.getManagedLedgerDefaultAckQuorum(),
-
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
- LedgerPassword,
- (rc, handle, ctx) -> {
- if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed
to create ledger", rc, -1, -1,
+ bookKeeper.newCreateLedgerOp()
+
.withEnsembleSize(config.getManagedLedgerDefaultEnsembleSize())
+
.withWriteQuorumSize(config.getManagedLedgerDefaultWriteQuorum())
+
.withAckQuorumSize(config.getManagedLedgerDefaultAckQuorum())
+ .withDigestType(config.getManagedLedgerDigestType())
+ .withPassword(LedgerPassword)
+ .withCustomMetadata(metadata)
+ .withLoggerContext(log.with().attr("schemaId",
schemaId).build())
+ .execute()
+ .whenComplete((writeHandle, ex) -> {
+ if (ex != null) {
+ future.completeExceptionally(bkException("Failed
to create ledger",
+ BKException.getExceptionCode(ex), -1, -1,
config.isSchemaLedgerForceRecovery()));
} else {
- future.complete(handle);
+ future.complete((LedgerHandle) writeHandle);
}
- }, null, metadata);
+ });
} catch (Throwable t) {
log.error()
.attr("schemaId", schemaId)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index cdc6323f0ee..29c77374e2f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -164,6 +164,10 @@ public class MLPendingAckStoreProvider implements
TransactionPendingAckStoreProv
Timer
brokerClientSharedTimer,
PersistentTopic
originPersistentTopic) {
config.setCreateIfMissing(true);
+ config.setLoggerContext(log.with()
+ .attr("topic", topicName.toString())
+ .attr("subscription", subscription.getName())
+ .build());
brokerService
.getManagedLedgerFactoryForTopic(topicName,
config.getStorageClassName())
.asyncOpen(pendingAckTopicNameObject.getPersistenceNamingEncoding(),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index af989d181fe..7255421b84e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -185,7 +185,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
Map<String, MessageId> latestForKey, BookKeeper bk) {
Map<String, byte[]> metadata =
LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(),
to.toByteArray());
- return createLedger(bk, metadata).thenCompose((ledger) -> {
+ return createLedger(bk, metadata, reader.getTopic()).thenCompose((ledger)
-> {
log.info()
.attr("topic", reader.getTopic())
.attr("from", from)
@@ -384,22 +384,26 @@ public abstract class AbstractTwoPhaseCompactor<T>
extends Compactor {
}
protected CompletableFuture<LedgerHandle> createLedger(BookKeeper bk,
- Map<String, byte[]> metadata) {
+ Map<String, byte[]> metadata, String topic) {
CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
try {
- bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
- conf.getManagedLedgerDefaultWriteQuorum(),
- conf.getManagedLedgerDefaultAckQuorum(),
- Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
- Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
- (rc, ledger, ctx) -> {
- if (rc != BKException.Code.OK) {
- bkf.completeExceptionally(BKException.create(rc));
+ bk.newCreateLedgerOp()
+ .withEnsembleSize(conf.getManagedLedgerDefaultEnsembleSize())
+ .withWriteQuorumSize(conf.getManagedLedgerDefaultWriteQuorum())
+ .withAckQuorumSize(conf.getManagedLedgerDefaultAckQuorum())
+
.withDigestType(Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE.toApiDigestType())
+ .withPassword(Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD)
+ .withCustomMetadata(metadata)
+ .withLoggerContext(log.with().attr("topic", topic).build())
+ .execute()
+ .whenComplete((writeHandle, ex) -> {
+ if (ex != null) {
+
bkf.completeExceptionally(BKException.create(BKException.getExceptionCode(ex)));
} else {
- bkf.complete(ledger);
+ bkf.complete((LedgerHandle) writeHandle);
}
- }, null, metadata);
+ });
} catch (Throwable t) {
log.error().exception(t).log("Encountered unexpected error when creating
compaction ledger");
return FutureUtil.failedFuture(t);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index a4688830ff3..0569c0f6850 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -334,7 +334,7 @@ public class StrategicTwoPhaseCompactor extends
PublishingOrderCompactor {
Map<String, byte[]> metadata =
LedgerMetadataUtils.buildMetadataForCompactedLedger(
phaseOneResult.topic,
phaseOneResult.lastId.toByteArray());
- return createLedger(bk, metadata)
+ return createLedger(bk, metadata, phaseOneResult.topic)
.thenCompose((ledger) -> {
log.info()
.attr("topic", phaseOneResult.topic)
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 692673ccdee..d085b40a1c8 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -91,6 +91,7 @@ public class MLTransactionLogImpl implements TransactionLog {
this.tcId = tcID.getId();
this.managedLedgerFactory = managedLedgerFactory;
this.managedLedgerConfig = managedLedgerConfig;
+ this.managedLedgerConfig.setLoggerContext(log.with().attr("tcId",
tcId).build());
this.timer = timer;
this.txnLogBufferedWriterConfig = txnLogBufferedWriterConfig;
if (txnLogBufferedWriterConfig.isBatchEnabled()) {
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 0e1e75248e4..617edaa1f0c 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -47,10 +48,14 @@ import lombok.Setter;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.api.CreateAdvBuilder;
+import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.client.impl.OpenBuilderBase;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -239,6 +244,80 @@ public class PulsarMockBookKeeper extends BookKeeper {
+ @Override
+ public CreateBuilder newCreateLedgerOp() {
+ return new CreateBuilder() {
+ private int ensembleSize = 3;
+ private int writeQuorumSize = 2;
+ private int ackQuorumSize = 2;
+ private byte[] password = new byte[0];
+ private org.apache.bookkeeper.client.api.DigestType digestType =
+ org.apache.bookkeeper.client.api.DigestType.CRC32;
+ private Map<String, byte[]> customMetadata =
Collections.emptyMap();
+
+ @Override
+ public CreateBuilder withEnsembleSize(int ensembleSize) {
+ this.ensembleSize = ensembleSize;
+ return this;
+ }
+
+ @Override
+ public CreateBuilder withWriteQuorumSize(int writeQuorumSize) {
+ this.writeQuorumSize = writeQuorumSize;
+ return this;
+ }
+
+ @Override
+ public CreateBuilder withAckQuorumSize(int ackQuorumSize) {
+ this.ackQuorumSize = ackQuorumSize;
+ return this;
+ }
+
+ @Override
+ public CreateBuilder withPassword(byte[] password) {
+ this.password = password;
+ return this;
+ }
+
+ @Override
+ public CreateBuilder withWriteFlags(EnumSet<WriteFlag> writeFlags)
{
+ return this;
+ }
+
+ @Override
+ public CreateBuilder withCustomMetadata(Map<String, byte[]>
customMetadata) {
+ this.customMetadata = customMetadata;
+ return this;
+ }
+
+ @Override
+ public CreateBuilder
withDigestType(org.apache.bookkeeper.client.api.DigestType digestType) {
+ this.digestType = digestType;
+ return this;
+ }
+
+ @Override
+ public CreateAdvBuilder makeAdv() {
+ throw new UnsupportedOperationException("Adv ledger creation
is not supported by the mock");
+ }
+
+ @Override
+ public CompletableFuture<WriteHandle> execute() {
+ CompletableFuture<WriteHandle> future = new
CompletableFuture<>();
+ asyncCreateLedger(ensembleSize, writeQuorumSize, ackQuorumSize,
+ DigestType.fromApiDigestType(digestType), password,
+ (rc, lh, ctx) -> {
+ if (rc != BKException.Code.OK) {
+
future.completeExceptionally(BKException.create(rc));
+ } else {
+ future.complete(lh);
+ }
+ }, null, customMetadata);
+ return future;
+ }
+ };
+ }
+
@Override
public OpenBuilder newOpenLedgerOp() {
return new OpenBuilderBase() {