This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c9dc301f Issue #850: Added request context across client and bookies.
c9dc301f is described below
commit c9dc301feb48ca170c3d6205a36fca63a4950c5a
Author: Andrey Yegorov <[email protected]>
AuthorDate: Mon Sep 17 01:06:45 2018 -0700
Issue #850: Added request context across client and bookies.
Descriptions of the changes in this PR:
- MDC context is passed to runnables, callables etc.
- protocol extended, context is sent to bookie servers, restored there and
back on client with the response.
Hopefully did not miss some nuance on the server side, largely rely on
changes in ordered executors to do all the magic.
- did microbenchmarking of the protocol changes (strings added to
protobuf, MDC context preserved/restored). Looks ok.
- added unit tests.
(bug W-5291641)
(bug W-5291648)
### Motivation
Troubleshooting of request-level failures/errors can be simplified if
request id was passed through all the stages of the request, from threadpools
on the client to bookies to the response back on the client.
Log4j/Slf4j allows logging of MDC data so it makes sense to use this
functionality for logging.
### Changes
- MDC context is passed to runnables, callables etc.
- protocol extended, context is sent to bookie servers, restored there and
back on client with the response.
Hopefully did not miss some nuance on the server side, largely rely on
changes in ordered executors to do all the magic.
- did microbenchmarking of the protocol changes (strings added to
protobuf, MDC context preserved/restored). Looks ok.
- added unit tests.
Master Issue: #850
Author: Andrey Yegorov <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>,
Venkateswararao Jujjuri (JV) <None>, Sijie Guo <[email protected]>
This closes #1672 from dlg99/feature/correlation_id, closes #850
---
.../apache/bookkeeper/common/util/MdcUtils.java | 40 ++++
.../bookkeeper/common/util/OrderedExecutor.java | 137 ++++++++++--
.../bookkeeper/common/util/OrderedScheduler.java | 21 +-
.../src/main/proto/BookkeeperProtocol.proto | 7 +
.../org/apache/bookkeeper/client/BookKeeper.java | 1 +
.../bookkeeper/conf/AbstractConfiguration.java | 23 ++
.../bookkeeper/proto/BookieRequestProcessor.java | 112 +++++----
.../bookkeeper/proto/PerChannelBookieClient.java | 82 ++++++-
.../bookkeeper/util/OrderedGenericCallback.java | 55 +++--
.../apache/bookkeeper/client/MdcContextTest.java | 249 +++++++++++++++++++++
.../apache/bookkeeper/proto/ProtocolBenchmark.java | 79 ++++++-
11 files changed, 703 insertions(+), 103 deletions(-)
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MdcUtils.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MdcUtils.java
new file mode 100644
index 0000000..f10f254
--- /dev/null
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/MdcUtils.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import java.util.Map;
+
+import org.slf4j.MDC;
+
+/**
+ * Utils for work with Slf4j MDC.
+ */
+public class MdcUtils {
+
+ public static void restoreContext(Map<String, String> mdcContextMap) {
+ if (mdcContextMap == null || mdcContextMap.isEmpty()) {
+ MDC.clear();
+ } else {
+ MDC.setContextMap(mdcContextMap);
+ }
+ }
+}
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
index 6a86141..520787b 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
@@ -28,6 +28,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -38,6 +39,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -46,6 +48,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang.StringUtils;
+import org.slf4j.MDC;
/**
* This class provides 2 things over the java {@link ExecutorService}.
@@ -73,6 +76,7 @@ public class OrderedExecutor implements ExecutorService {
final OpStatsLogger taskExecutionStats;
final OpStatsLogger taskPendingStats;
final boolean traceTaskExecution;
+ final boolean preserveMdcForTaskExecution;
final long warnTimeMicroSec;
final int maxTasksInQueue;
@@ -92,7 +96,8 @@ public class OrderedExecutor implements ExecutorService {
threadFactory = new
DefaultThreadFactory("bookkeeper-ordered-safe-executor");
}
return new OrderedExecutor(name, numThreads, threadFactory,
statsLogger,
- traceTaskExecution,
warnTimeMicroSec, maxTasksInQueue);
+ traceTaskExecution,
preserveMdcForTaskExecution,
+ warnTimeMicroSec, maxTasksInQueue);
}
}
@@ -105,6 +110,7 @@ public class OrderedExecutor implements ExecutorService {
protected ThreadFactory threadFactory = null;
protected StatsLogger statsLogger = NullStatsLogger.INSTANCE;
protected boolean traceTaskExecution = false;
+ protected boolean preserveMdcForTaskExecution = false;
protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
protected int maxTasksInQueue = NO_TASK_LIMIT;
@@ -138,6 +144,11 @@ public class OrderedExecutor implements ExecutorService {
return this;
}
+ public AbstractBuilder<T> preserveMdcForTaskExecution(boolean enabled)
{
+ this.preserveMdcForTaskExecution = enabled;
+ return this;
+ }
+
public AbstractBuilder<T> traceTaskWarnTimeMicroSec(long
warnTimeMicroSec) {
this.warnTimeMicroSec = warnTimeMicroSec;
return this;
@@ -154,6 +165,7 @@ public class OrderedExecutor implements ExecutorService {
threadFactory,
statsLogger,
traceTaskExecution,
+ preserveMdcForTaskExecution,
warnTimeMicroSec,
maxTasksInQueue);
}
@@ -185,6 +197,81 @@ public class OrderedExecutor implements ExecutorService {
}
}
+ /**
+ * Decorator class for a callable that measure the execution time.
+ */
+ protected class TimedCallable<T> implements Callable<T> {
+ final Callable<T> callable;
+ final long initNanos;
+
+ TimedCallable(Callable<T> callable) {
+ this.callable = callable;
+ this.initNanos = MathUtils.nowInNano();
+ }
+
+ @Override
+ public T call() throws Exception {
+
taskPendingStats.registerSuccessfulEvent(MathUtils.elapsedNanos(initNanos),
TimeUnit.NANOSECONDS);
+ long startNanos = MathUtils.nowInNano();
+ try {
+ return this.callable.call();
+ } finally {
+ long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
+ taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec,
TimeUnit.MICROSECONDS);
+ if (elapsedMicroSec >= warnTimeMicroSec) {
+ log.warn("Callable {}:{} took too long {} micros to
execute.", callable, callable.getClass(),
+ elapsedMicroSec);
+ }
+ }
+ }
+ }
+
+ /**
+ * Decorator class for a runnable that preserves MDC context.
+ */
+ static class ContextPreservingRunnable implements Runnable {
+ private final Runnable runnable;
+ private final Map<String, String> mdcContextMap;
+
+ ContextPreservingRunnable(Runnable runnable) {
+ this.runnable = runnable;
+ this.mdcContextMap = MDC.getCopyOfContextMap();
+ }
+
+ @Override
+ public void run() {
+ MdcUtils.restoreContext(mdcContextMap);
+ try {
+ runnable.run();
+ } finally {
+ MDC.clear();
+ }
+ }
+ }
+
+ /**
+ * Decorator class for a callable that preserves MDC context.
+ */
+ static class ContextPreservingCallable<T> implements Callable<T> {
+ private final Callable<T> callable;
+ private final Map<String, String> mdcContextMap;
+
+ ContextPreservingCallable(Callable<T> callable) {
+ this.callable = callable;
+ this.mdcContextMap = MDC.getCopyOfContextMap();
+ }
+
+ @Override
+ public T call() throws Exception {
+ MdcUtils.restoreContext(mdcContextMap);
+ try {
+ return callable.call();
+ } finally {
+ MDC.clear();
+ }
+ }
+ }
+
protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory
factory) {
return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<>(), factory);
}
@@ -206,6 +293,8 @@ public class OrderedExecutor implements ExecutorService {
* - for reporting executor stats
* @param traceTaskExecution
* - should we stat task execution
+ * @param preserveMdcForTaskExecution
+ * - should we preserve MDC for task execution
* @param warnTimeMicroSec
* - log long task exec warning after this interval
* @param maxTasksInQueue
@@ -213,7 +302,7 @@ public class OrderedExecutor implements ExecutorService {
*/
protected OrderedExecutor(String baseName, int numThreads, ThreadFactory
threadFactory,
StatsLogger statsLogger, boolean
traceTaskExecution,
- long warnTimeMicroSec, int maxTasksInQueue) {
+ boolean preserveMdcForTaskExecution, long
warnTimeMicroSec, int maxTasksInQueue) {
checkArgument(numThreads > 0);
checkArgument(!StringUtils.isBlank(baseName));
@@ -280,6 +369,17 @@ public class OrderedExecutor implements ExecutorService {
this.taskExecutionStats =
statsLogger.scope(name).getOpStatsLogger("task_execution");
this.taskPendingStats =
statsLogger.scope(name).getOpStatsLogger("task_queued");
this.traceTaskExecution = traceTaskExecution;
+ this.preserveMdcForTaskExecution = preserveMdcForTaskExecution;
+ }
+
+ /**
+ * Flag describing executor's expectation in regards of MDC.
+ * All tasks submitted through executor's submit/execute methods will
automatically respect this.
+ *
+ * @return true if runnable/callable is expected to preserve MDC, false
otherwise.
+ */
+ public boolean preserveMdc() {
+ return preserveMdcForTaskExecution;
}
/**
@@ -369,12 +469,23 @@ public class OrderedExecutor implements ExecutorService {
return threads[MathUtils.signSafeMod(orderingKey, threads.length)];
}
- private Runnable timedRunnable(Runnable r) {
- if (traceTaskExecution) {
- return new TimedRunnable(r);
- } else {
- return r;
+ protected Runnable timedRunnable(Runnable r) {
+ final Runnable runMe = traceTaskExecution ? new TimedRunnable(r) : r;
+ return preserveMdcForTaskExecution ? new
ContextPreservingRunnable(runMe) : runMe;
+ }
+
+ protected <T> Callable<T> timedCallable(Callable<T> c) {
+ final Callable<T> callMe = traceTaskExecution ? new TimedCallable<>(c)
: c;
+ return preserveMdcForTaskExecution ? new
ContextPreservingCallable<>(callMe) : callMe;
+ }
+
+ protected <T> Collection<? extends Callable<T>>
timedCallables(Collection<? extends Callable<T>> tasks) {
+ if (traceTaskExecution || preserveMdcForTaskExecution) {
+ return tasks.stream()
+ .map(this::timedCallable)
+ .collect(Collectors.toList());
}
+ return tasks;
}
/**
@@ -382,7 +493,7 @@ public class OrderedExecutor implements ExecutorService {
*/
@Override
public <T> Future<T> submit(Callable<T> task) {
- return chooseThread().submit(task);
+ return chooseThread().submit(timedCallable(task));
}
/**
@@ -407,7 +518,7 @@ public class OrderedExecutor implements ExecutorService {
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks)
throws InterruptedException {
- return chooseThread().invokeAll(tasks);
+ return chooseThread().invokeAll(timedCallables(tasks));
}
/**
@@ -418,7 +529,7 @@ public class OrderedExecutor implements ExecutorService {
long timeout,
TimeUnit unit)
throws InterruptedException {
- return chooseThread().invokeAll(tasks, timeout, unit);
+ return chooseThread().invokeAll(timedCallables(tasks), timeout, unit);
}
/**
@@ -427,7 +538,7 @@ public class OrderedExecutor implements ExecutorService {
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
- return chooseThread().invokeAny(tasks);
+ return chooseThread().invokeAny(timedCallables(tasks));
}
/**
@@ -436,7 +547,7 @@ public class OrderedExecutor implements ExecutorService {
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- return chooseThread().invokeAny(tasks, timeout, unit);
+ return chooseThread().invokeAny(timedCallables(tasks), timeout, unit);
}
/**
@@ -444,7 +555,7 @@ public class OrderedExecutor implements ExecutorService {
*/
@Override
public void execute(Runnable command) {
- chooseThread().execute(command);
+ chooseThread().execute(timedRunnable(command));
}
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index 6f05832..6f366fd 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -72,6 +72,7 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
threadFactory,
statsLogger,
traceTaskExecution,
+ preserveMdcForTaskExecution,
warnTimeMicroSec,
maxTasksInQueue);
}
@@ -90,6 +91,8 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
* - for reporting executor stats
* @param traceTaskExecution
* - should we stat task execution
+ * @param preserveMdcForTaskExecution
+ * - should we preserve MDC for task execution
* @param warnTimeMicroSec
* - log long task exec warning after this interval
*/
@@ -98,9 +101,11 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
ThreadFactory threadFactory,
StatsLogger statsLogger,
boolean traceTaskExecution,
+ boolean preserveMdcForTaskExecution,
long warnTimeMicroSec,
int maxTasksInQueue) {
- super(baseName, numThreads, threadFactory, statsLogger,
traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
+ super(baseName, numThreads, threadFactory, statsLogger,
traceTaskExecution,
+ preserveMdcForTaskExecution, warnTimeMicroSec,
maxTasksInQueue);
}
@@ -150,7 +155,7 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
* will return null upon completion
*/
public ScheduledFuture<?> schedule(SafeRunnable command, long delay,
TimeUnit unit) {
- return chooseThread().schedule(command, delay, unit);
+ return chooseThread().schedule(timedRunnable(command), delay, unit);
}
/**
@@ -181,7 +186,7 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
* method will throw an exception upon cancellation
*/
public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long
initialDelay, long period, TimeUnit unit) {
- return chooseThread().scheduleAtFixedRate(command, initialDelay,
period, unit);
+ return chooseThread().scheduleAtFixedRate(timedRunnable(command),
initialDelay, period, unit);
}
/**
@@ -219,7 +224,7 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
*/
public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command,
long initialDelay, long delay,
TimeUnit unit) {
- return chooseThread().scheduleWithFixedDelay(command, initialDelay,
delay, unit);
+ return chooseThread().scheduleWithFixedDelay(timedRunnable(command),
initialDelay, delay, unit);
}
/**
@@ -252,7 +257,7 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
*/
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit) {
- return chooseThread().schedule(command, delay, unit);
+ return chooseThread().schedule(timedRunnable(command), delay, unit);
}
/**
@@ -260,7 +265,7 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
*/
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit) {
- return chooseThread().schedule(callable, delay, unit);
+ return chooseThread().schedule(timedCallable(callable), delay, unit);
}
/**
@@ -269,7 +274,7 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay, long
period, TimeUnit unit) {
- return chooseThread().scheduleAtFixedRate(command, initialDelay,
period, unit);
+ return chooseThread().scheduleAtFixedRate(timedRunnable(command),
initialDelay, period, unit);
}
/**
@@ -278,7 +283,7 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay, long
delay, TimeUnit unit) {
- return chooseThread().scheduleWithFixedDelay(command, initialDelay,
delay, unit);
+ return chooseThread().scheduleWithFixedDelay(timedRunnable(command),
initialDelay, delay, unit);
}
}
diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index bac9411..f34d56e 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -76,6 +76,11 @@ message BKPacketHeader {
optional uint32 priority = 4 [default = 0];
}
+message ContextPair {
+ required string key = 1;
+ required string value = 2;
+}
+
message Request {
required BKPacketHeader header = 1;
// Requests
@@ -87,6 +92,8 @@ message Request {
optional GetBookieInfoRequest getBookieInfoRequest = 105;
optional StartTLSRequest startTLSRequest = 106;
optional ForceLedgerRequest forceLedgerRequest = 107;
+ // to pass MDC context
+ repeated ContextPair requestContext = 200;
}
message ReadRequest {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 1396ee7..d434ffa 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -402,6 +402,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
.numThreads(conf.getNumWorkerThreads())
.statsLogger(rootStatsLogger)
.traceTaskExecution(conf.getEnableTaskExecutionStats())
+
.preserveMdcForTaskExecution(conf.getPreserveMdcForTaskExecution())
.traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
.build();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index b72b0ba..666b377 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -102,6 +102,9 @@ public abstract class AbstractConfiguration<T extends
AbstractConfiguration>
// Enable authentication of the other connection end point (mutual
authentication)
protected static final String TLS_CLIENT_AUTHENTICATION =
"tlsClientAuthentication";
+ // Preserve MDC or not for tasks in executor
+ protected static final String PRESERVE_MDC_FOR_TASK_EXECUTION =
"preserveMdcForTaskExecution";
+
// Default formatter classes
protected static final Class<? extends EntryFormatter>
DEFAULT_ENTRY_FORMATTER = StringEntryFormatter.class;
protected static final Class<? extends LedgerIdFormatter>
DEFAULT_LEDGERID_FORMATTER =
@@ -852,6 +855,26 @@ public abstract class AbstractConfiguration<T extends
AbstractConfiguration>
}
/**
+ * Whether to preserve MDC for tasks in Executor.
+ *
+ * @return flag to enable/disable MDC preservation in Executor.
+ */
+ public boolean getPreserveMdcForTaskExecution() {
+ return getBoolean(PRESERVE_MDC_FOR_TASK_EXECUTION, false);
+ }
+
+ /**
+ * Whether to preserve MDC for tasks in Executor.
+ *
+ * @param enabled
+ * flag to enable/disable MDC preservation in Executor.
+ * @return configuration.
+ */
+ public T setPreserveMdcForTaskExecution(boolean enabled) {
+ setProperty(PRESERVE_MDC_FOR_TASK_EXECUTION, enabled);
+ return getThis();
+ }
+ /**
* Trickery to allow inheritance with fluent style.
*/
protected abstract T getThis();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 8ee363f..5a50bf6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -89,6 +89,7 @@ import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
* An implementation of the RequestProcessor interface.
@@ -104,6 +105,7 @@ public class BookieRequestProcessor implements
RequestProcessor {
*/
private final ServerConfiguration serverCfg;
private final long waitTimeoutOnBackpressureMillis;
+ private final boolean preserveMdcForTaskExecution;
/**
* This is the Bookie instance that is used to handle all read and write
requests.
@@ -187,6 +189,7 @@ public class BookieRequestProcessor implements
RequestProcessor {
StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws
SecurityException {
this.serverCfg = serverCfg;
this.waitTimeoutOnBackpressureMillis =
serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
+ this.preserveMdcForTaskExecution =
serverCfg.getPreserveMdcForTaskExecution();
this.bookie = bookie;
this.readThreadPool = createExecutor(
this.serverCfg.getNumReadWorkerThreads(),
@@ -412,6 +415,7 @@ public class BookieRequestProcessor implements
RequestProcessor {
.numThreads(numThreads)
.name(nameFormat)
.traceTaskExecution(serverCfg.getEnableTaskExecutionStats())
+
.preserveMdcForTaskExecution(serverCfg.getPreserveMdcForTaskExecution())
.statsLogger(statsLogger)
.maxTasksInQueue(maxTasksInQueue)
.build();
@@ -430,52 +434,57 @@ public class BookieRequestProcessor implements
RequestProcessor {
// it as a version 3 packet. Else, just use the old protocol.
if (msg instanceof BookkeeperProtocol.Request) {
BookkeeperProtocol.Request r = (BookkeeperProtocol.Request) msg;
- BookkeeperProtocol.BKPacketHeader header = r.getHeader();
- switch (header.getOperation()) {
- case ADD_ENTRY:
- processAddRequestV3(r, c);
- break;
- case READ_ENTRY:
- processReadRequestV3(r, c);
- break;
- case FORCE_LEDGER:
- processForceLedgerRequestV3(r, c);
- break;
- case AUTH:
- LOG.info("Ignoring auth operation from client {}",
c.remoteAddress());
- BookkeeperProtocol.AuthMessage message =
BookkeeperProtocol.AuthMessage
- .newBuilder()
-
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
-
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
- .build();
- BookkeeperProtocol.Response.Builder authResponse =
BookkeeperProtocol.Response
- .newBuilder().setHeader(r.getHeader())
- .setStatus(BookkeeperProtocol.StatusCode.EOK)
- .setAuthResponse(message);
- c.writeAndFlush(authResponse.build());
- break;
- case WRITE_LAC:
- processWriteLacRequestV3(r, c);
- break;
- case READ_LAC:
- processReadLacRequestV3(r, c);
- break;
- case GET_BOOKIE_INFO:
- processGetBookieInfoRequestV3(r, c);
- break;
- case START_TLS:
- processStartTLSRequestV3(r, c);
- break;
- default:
- LOG.info("Unknown operation type {}",
header.getOperation());
- BookkeeperProtocol.Response.Builder response =
-
BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
- .setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
- c.writeAndFlush(response.build());
- if (statsEnabled) {
-
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
- }
- break;
+ restoreMdcContextFromRequest(r);
+ try {
+ BookkeeperProtocol.BKPacketHeader header = r.getHeader();
+ switch (header.getOperation()) {
+ case ADD_ENTRY:
+ processAddRequestV3(r, c);
+ break;
+ case READ_ENTRY:
+ processReadRequestV3(r, c);
+ break;
+ case FORCE_LEDGER:
+ processForceLedgerRequestV3(r, c);
+ break;
+ case AUTH:
+ LOG.info("Ignoring auth operation from client {}",
c.remoteAddress());
+ BookkeeperProtocol.AuthMessage message =
BookkeeperProtocol.AuthMessage
+ .newBuilder()
+
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
+
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
+ .build();
+ BookkeeperProtocol.Response.Builder authResponse =
BookkeeperProtocol.Response
+ .newBuilder().setHeader(r.getHeader())
+ .setStatus(BookkeeperProtocol.StatusCode.EOK)
+ .setAuthResponse(message);
+ c.writeAndFlush(authResponse.build());
+ break;
+ case WRITE_LAC:
+ processWriteLacRequestV3(r, c);
+ break;
+ case READ_LAC:
+ processReadLacRequestV3(r, c);
+ break;
+ case GET_BOOKIE_INFO:
+ processGetBookieInfoRequestV3(r, c);
+ break;
+ case START_TLS:
+ processStartTLSRequestV3(r, c);
+ break;
+ default:
+ LOG.info("Unknown operation type {}",
header.getOperation());
+ BookkeeperProtocol.Response.Builder response =
+
BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
+
.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
+ c.writeAndFlush(response.build());
+ if (statsEnabled) {
+
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
+ }
+ break;
+ }
+ } finally {
+ MDC.clear();
}
} else {
BookieProtocol.Request r = (BookieProtocol.Request) msg;
@@ -500,7 +509,16 @@ public class BookieRequestProcessor implements
RequestProcessor {
}
}
- private void processWriteLacRequestV3(final BookkeeperProtocol.Request r,
final Channel c) {
+ private void restoreMdcContextFromRequest(BookkeeperProtocol.Request req) {
+ if (preserveMdcForTaskExecution) {
+ MDC.clear();
+ for (BookkeeperProtocol.ContextPair pair:
req.getRequestContextList()) {
+ MDC.put(pair.getKey(), pair.getValue());
+ }
+ }
+ }
+
+ private void processWriteLacRequestV3(final BookkeeperProtocol.Request r,
final Channel c) {
WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, c, this);
if (null == writeThreadPool) {
writeLac.run();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 8600797..79e4de9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -70,6 +70,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
@@ -87,6 +88,7 @@ import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.MdcUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -132,6 +134,7 @@ import
org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
import org.apache.bookkeeper.util.collections.SynchronizedHashMultiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
* This class manages all details of connection to a particular bookie. It also
@@ -198,6 +201,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
private final Counter failedTlsHandshakeCounter;
private final boolean useV2WireProtocol;
+ private final boolean preserveMdcForTaskExecution;
/**
* The following member variables do not need to be concurrent, or volatile
@@ -267,6 +271,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
this.getBookieInfoTimeout = conf.getBookieInfoTimeout();
this.startTLSTimeout = conf.getStartTLSTimeout();
this.useV2WireProtocol = conf.getUseV2WireProtocol();
+ this.preserveMdcForTaskExecution =
conf.getPreserveMdcForTaskExecution();
this.authProviderFactory = authProviderFactory;
this.extRegistry = extRegistry;
@@ -466,7 +471,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
ChannelFuture future = bootstrap.connect(bookieAddr);
- future.addListener(new ConnectionFutureListener(startTime));
+ future.addListener(contextPreservingListener(new
ConnectionFutureListener(startTime)));
future.addListener(x -> makeWritable());
return future;
}
@@ -565,7 +570,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
.setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey))
.setBody(body);
- final Request writeLacRequest = Request.newBuilder()
+ final Request writeLacRequest =
withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setWriteLacRequest(writeLacBuilder)
.build();
@@ -596,7 +601,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
ForceLedgerRequest.Builder writeLacBuilder =
ForceLedgerRequest.newBuilder()
.setLedgerId(ledgerId);
- final Request forceLedgerRequest = Request.newBuilder()
+ final Request forceLedgerRequest =
withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setForceLedgerRequest(writeLacBuilder)
.build();
@@ -676,7 +681,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
addBuilder.setWriteFlags(WriteFlag.getWriteFlagsValue(writeFlags));
}
- request = Request.newBuilder()
+ request = withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setAddRequest(addBuilder)
.build();
@@ -716,7 +721,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
.setTxnId(txnId);
ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
.setLedgerId(ledgerId);
- request = Request.newBuilder()
+ request = withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setReadLacRequest(readLacBuilder)
.build();
@@ -823,7 +828,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
readBuilder.setMasterKey(ByteString.copyFrom(masterKey));
}
- request = Request.newBuilder()
+ request = withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setReadRequest(readBuilder)
.build();
@@ -851,7 +856,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
GetBookieInfoRequest.Builder getBookieInfoBuilder =
GetBookieInfoRequest.newBuilder()
.setRequested(requested);
- final Request getBookieInfoRequest = Request.newBuilder()
+ final Request getBookieInfoRequest =
withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setGetBookieInfoRequest(getBookieInfoBuilder)
.build();
@@ -1290,8 +1295,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
private void readV3Response(final Response response) {
final BKPacketHeader header = response.getHeader();
- final CompletionValue completionValue =
completionObjects.get(newCompletionKey(header.getTxnId(),
- header.getOperation()));
+ final CompletionKey key = newCompletionKey(header.getTxnId(),
header.getOperation());
+ final CompletionValue completionValue = completionObjects.get(key);
if (null == completionValue) {
// Unexpected response, so log it. The txnId should have been
present.
@@ -1304,6 +1309,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
executor.executeOrdered(orderingKey, new SafeRunnable() {
@Override
public void safeRun() {
+ completionValue.restoreMdcContext();
completionValue.handleV3Response(response);
}
@@ -1316,7 +1322,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
});
}
- completionObjects.remove(newCompletionKey(header.getTxnId(),
header.getOperation()));
+ completionObjects.remove(key);
}
void initTLSHandshake() {
@@ -1399,6 +1405,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
private final OpStatsLogger opLogger;
private final OpStatsLogger timeoutOpLogger;
private final String operationName;
+ private final Map<String, String> mdcContextMap;
protected Object ctx;
protected long ledgerId;
protected long entryId;
@@ -1416,6 +1423,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
this.startTime = MathUtils.nowInNano();
this.opLogger = opLogger;
this.timeoutOpLogger = timeoutOpLogger;
+ this.mdcContextMap = preserveMdcForTaskExecution ?
MDC.getCopyOfContextMap() : null;
}
private long latency() {
@@ -1469,6 +1477,9 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
}
+ public void restoreMdcContext() {
+ MdcUtils.restoreContext(mdcContextMap);
+ }
public abstract void errorOut();
public abstract void errorOut(int rc);
@@ -2128,6 +2139,55 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
}
+ Request.Builder withRequestContext(Request.Builder builder) {
+ if (preserveMdcForTaskExecution) {
+ return appendRequestContext(builder);
+ }
+ return builder;
+ }
+
+ static Request.Builder appendRequestContext(Request.Builder builder) {
+ final Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();
+ if (mdcContextMap == null || mdcContextMap.isEmpty()) {
+ return builder;
+ }
+ for (Map.Entry<String, String> kv : mdcContextMap.entrySet()) {
+ final BookkeeperProtocol.ContextPair context =
BookkeeperProtocol.ContextPair.newBuilder()
+ .setKey(kv.getKey())
+ .setValue(kv.getValue())
+ .build();
+ builder.addRequestContext(context);
+ }
+ return builder;
+ }
+
+ ChannelFutureListener contextPreservingListener(ChannelFutureListener
listener) {
+ return preserveMdcForTaskExecution ? new
ContextPreservingFutureListener(listener) : listener;
+ }
+
+ /**
+ * Decorator to preserve MDC for connection listener.
+ */
+ static class ContextPreservingFutureListener implements
ChannelFutureListener {
+ private final ChannelFutureListener listener;
+ private final Map<String, String> mdcContextMap;
+
+ ContextPreservingFutureListener(ChannelFutureListener listener) {
+ this.listener = listener;
+ this.mdcContextMap = MDC.getCopyOfContextMap();
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ MdcUtils.restoreContext(mdcContextMap);
+ try {
+ listener.operationComplete(future);
+ } finally {
+ MDC.clear();
+ }
+ }
+ }
+
/**
* Connection listener.
*/
@@ -2228,7 +2288,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final CompletionKey completionKey = new V3CompletionKey(txnId,
OperationType.START_TLS);
completionObjects.put(completionKey,
new StartTLSCompletion(completionKey));
- BookkeeperProtocol.Request.Builder h =
BookkeeperProtocol.Request.newBuilder();
+ BookkeeperProtocol.Request.Builder h =
withRequestContext(BookkeeperProtocol.Request.newBuilder());
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.START_TLS)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java
index 73150ad..5c16de8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java
@@ -17,13 +17,16 @@
*/
package org.apache.bookkeeper.util;
+import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
+import org.apache.bookkeeper.common.util.MdcUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
* Generic callback implementation which will run the
@@ -34,6 +37,7 @@ public abstract class OrderedGenericCallback<T> implements
GenericCallback<T> {
private final OrderedExecutor executor;
private final long orderingKey;
+ private final Map<String, String> mdcContextMap;
/**
* @param executor The executor on which to run the callback
@@ -43,33 +47,40 @@ public abstract class OrderedGenericCallback<T> implements
GenericCallback<T> {
public OrderedGenericCallback(OrderedExecutor executor, long orderingKey) {
this.executor = executor;
this.orderingKey = orderingKey;
+ this.mdcContextMap = executor.preserveMdc() ?
MDC.getCopyOfContextMap() : null;
}
@Override
public final void operationComplete(final int rc, final T result) {
- // during closing, callbacks that are error out might try to submit to
- // the scheduler again. if the submission will go to same thread, we
- // don't need to submit to executor again. this is also an
optimization for
- // callback submission
- if (Thread.currentThread().getId() ==
executor.getThreadID(orderingKey)) {
- safeOperationComplete(rc, result);
- } else {
- try {
- executor.executeOrdered(orderingKey, new SafeRunnable() {
- @Override
- public void safeRun() {
- safeOperationComplete(rc, result);
- }
- @Override
- public String toString() {
- return String.format("Callback(key=%s, name=%s)",
- orderingKey,
- OrderedGenericCallback.this);
- }
- });
- } catch (RejectedExecutionException re) {
- LOG.warn("Failed to submit callback for {} : ", orderingKey,
re);
+ MdcUtils.restoreContext(mdcContextMap);
+ try {
+ // during closing, callbacks that are error out might try to
submit to
+ // the scheduler again. if the submission will go to same thread,
we
+ // don't need to submit to executor again. this is also an
optimization for
+ // callback submission
+ if (Thread.currentThread().getId() ==
executor.getThreadID(orderingKey)) {
+ safeOperationComplete(rc, result);
+ } else {
+ try {
+ executor.executeOrdered(orderingKey, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ safeOperationComplete(rc, result);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Callback(key=%s, name=%s)",
+ orderingKey,
+ OrderedGenericCallback.this);
+ }
+ });
+ } catch (RejectedExecutionException re) {
+ LOG.warn("Failed to submit callback for {} : ",
orderingKey, re);
+ }
}
+ } finally {
+ MDC.clear();
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MdcContextTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MdcContextTest.java
new file mode 100644
index 0000000..f10427b
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MdcContextTest.java
@@ -0,0 +1,249 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.AdditionalAnswers.answerVoid;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.MDC;
+import org.apache.log4j.spi.LoggingEvent;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * Test passing of MDC context.
+ */
+@SuppressWarnings("deprecation")
+@Slf4j
+public class MdcContextTest extends BookKeeperClusterTestCase {
+ public static final String MDC_REQUEST_ID = "request_id";
+
+ final byte[] entry = "Test Entry".getBytes();
+
+ BookKeeper bkc;
+ LedgerHandle lh;
+
+ private Appender mockAppender;
+ private List<String> capturedEvents;
+ private Logger rootLogger = LogManager.getRootLogger();
+
+ public MdcContextTest() {
+ super(3);
+ baseConf.setNumAddWorkerThreads(0);
+ baseConf.setNumReadWorkerThreads(0);
+ baseConf.setPreserveMdcForTaskExecution(true);
+ baseConf.setReadOnlyModeEnabled(true);
+
+ // for read-only bookie
+
baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+ baseConf.setEntryLogFilePreAllocationEnabled(false);
+ baseConf.setMinUsableSizeForEntryLogCreation(Long.MAX_VALUE);
+ }
+
+
+ public static String mdcFormat(Object mdc, String message) {
+ return mdc == null
+ ? "[" + MDC_REQUEST_ID + ":] - " + message
+ : "[" + MDC_REQUEST_ID + ":" + mdc.toString()
+ + "] - " + message;
+ }
+
+ public void assertLogWithMdc(String mdc, String msgSubstring) {
+ assertThat(capturedEvents,
+ hasItem(CoreMatchers.allOf(
+ containsString("[" + MDC_REQUEST_ID + ":" + mdc + "] -
"),
+ containsString(msgSubstring)
+ )));
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setReadTimeout(360)
+ .setMetadataServiceUri(zkUtil.getMetadataServiceUri())
+ .setPreserveMdcForTaskExecution(true);
+
+ MDC.clear();
+ bkc = new BookKeeper(conf);
+
+ MDC.put(MDC_REQUEST_ID, "ledger_create");
+ log.info("creating ledger");
+ lh = bkc.createLedgerAdv(3, 3, 3, BookKeeper.DigestType.CRC32, new
byte[] {});
+ MDC.clear();
+
+ mockAppender = mock(Appender.class);
+ when(mockAppender.getName()).thenReturn("MockAppender");
+
+ rootLogger.addAppender(mockAppender);
+ rootLogger.setLevel(Level.INFO);
+ capturedEvents = new LinkedList<>();
+
+ doAnswer(answerVoid((LoggingEvent event) -> capturedEvents.add(
+ mdcFormat(event.getMDC(MDC_REQUEST_ID),
event.getRenderedMessage())
+ ))).when(mockAppender).doAppend(any());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ lh.close();
+ bkc.close();
+ rootLogger.removeAppender(mockAppender);
+ capturedEvents = null;
+ MDC.clear();
+ super.tearDown();
+ }
+
+ @Test
+ public void testLedgerCreateFails() throws Exception {
+ MDC.put(MDC_REQUEST_ID, "ledger_create_fail");
+ try {
+ bkc.createLedgerAdv(99, 3, 2, BookKeeper.DigestType.CRC32, new
byte[]{});
+ Assert.fail("should not get here");
+ } catch (BKException bke) {
+ // expected
+ }
+ assertLogWithMdc("ledger_create_fail", "Not enough bookies to create
ledger");
+ }
+
+ @Test
+ public void testSimpleAdd() throws Exception {
+ MDC.put(MDC_REQUEST_ID, "ledger_add_entry");
+ lh.addEntry(0, entry);
+
+ // client msg
+ assertLogWithMdc("ledger_add_entry", "Successfully connected to
bookie");
+ // bookie msg
+ assertLogWithMdc("ledger_add_entry", "Created new entry log file");
+ }
+
+ @Test
+ public void testAddWithEnsembleChange() throws Exception {
+ lh.addEntry(0, entry);
+ startNewBookie();
+ killBookie(0);
+
+ MDC.put(MDC_REQUEST_ID, "ledger_add_entry");
+ lh.addEntry(1, entry);
+ assertLogWithMdc("ledger_add_entry", "Could not connect to bookie");
+ assertLogWithMdc("ledger_add_entry", "Failed to write entry");
+ assertLogWithMdc("ledger_add_entry", "New Ensemble");
+ }
+
+ @Test
+ public void testAddFailsWithReadOnlyBookie() throws Exception {
+ for (int i = 0; i < 3; ++i) {
+ Bookie bookie = bs.get(i).getBookie();
+ File[] ledgerDirs = bsConfs.get(i).getLedgerDirs();
+ LedgerDirsManager ledgerDirsManager =
bookie.getLedgerDirsManager();
+ ledgerDirsManager.addToFilledDirs(new File(ledgerDirs[0],
"current"));
+ }
+
+ MDC.put(MDC_REQUEST_ID, "ledger_add_entry");
+ try {
+ lh.addEntry(0, entry);
+ Assert.fail("should not get here");
+ } catch (BKException bke) {
+ // expected, pass
+ }
+
+ assertLogWithMdc("ledger_add_entry", "No writable ledger dirs below
diskUsageThreshold");
+ assertLogWithMdc("ledger_add_entry", "All ledger directories are non
writable and no reserved space");
+ assertLogWithMdc("ledger_add_entry", "Error writing entry:0 to
ledger:0");
+ assertLogWithMdc("ledger_add_entry", "Add for failed on bookie");
+ assertLogWithMdc("ledger_add_entry", "Failed to find 1 bookies");
+ assertLogWithMdc("ledger_add_entry", "Could not get additional bookie
to remake ensemble, closing ledger: 0");
+ }
+
+ @Test
+ public void testAddFailsDuplicateEntry() throws Exception {
+ lh.addEntry(0, entry);
+
+ MDC.put(MDC_REQUEST_ID, "ledger_add_duplicate_entry");
+ try {
+ lh.addEntry(0, entry);
+ Assert.fail("should not get here");
+ } catch (BKException bke) {
+ // expected, pass
+ }
+
+ assertLogWithMdc("ledger_add_duplicate_entry", "Trying to re-add
duplicate entryid:0");
+ assertLogWithMdc("ledger_add_duplicate_entry", "Write of ledger entry
to quorum failed");
+ }
+
+ @Test
+ public void testReadEntryBeyondLac() throws Exception {
+ MDC.put(MDC_REQUEST_ID, "ledger_read_entry");
+
+ try {
+ lh.readEntries(100, 100);
+ fail("should not get here");
+ } catch (BKException.BKReadException e) {
+ // pass
+ }
+ assertLogWithMdc("ledger_read_entry", "ReadException on ledgerId:0
firstEntry:100 lastEntry:100");
+ }
+
+ @Test
+ public void testReadFromDeletedLedger() throws Exception {
+ lh.addEntry(0, entry);
+ lh.close();
+ bkc.deleteLedger(lh.ledgerId);
+
+ MDC.put(MDC_REQUEST_ID, "ledger_read_entry");
+
+ try {
+ lh.readEntries(100, 100);
+ fail("should not get here");
+ } catch (BKException.BKReadException e) {
+ // pass
+ }
+ assertLogWithMdc("ledger_read_entry", "ReadException on ledgerId:0
firstEntry:100 lastEntry:100");
+ }
+
+}
diff --git
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
index ce1f02b..6ababbe 100644
---
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
+++
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
@@ -24,6 +24,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
+
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookieProtoEncoding.EnDecoder;
@@ -43,9 +45,10 @@ import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
+import org.slf4j.MDC;
/**
- * Benchmarking serialization and deserilization.
+ * Benchmarking serialization and deserialization.
*/
@BenchmarkMode({ Mode.Throughput })
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -82,7 +85,7 @@ public class ProtocolBenchmark {
@Benchmark
public void testAddEntryV2() throws Exception {
- ByteBufList list = ByteBufList.get(entry.slice());
+ ByteBufList list = ByteBufList.get(entry.retainedSlice());
BookieProtocol.AddRequest req = BookieProtocol.AddRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId,
@@ -122,4 +125,76 @@ public class ProtocolBenchmark {
ReferenceCountUtil.release(res);
}
+ @Benchmark
+ public void testAddEntryV3WithMdc() throws Exception {
+ MDC.put("parent_id", "LetsPutSomeLongParentRequestIdHere");
+ MDC.put("request_id", "LetsPutSomeLongRequestIdHere");
+ // Build the request and calculate the total size to be included in
the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.ADD_ENTRY)
+ .setTxnId(0L);
+
+ ByteBuf toSend = entry.slice();
+ byte[] toSendArray = new byte[toSend.readableBytes()];
+ toSend.getBytes(toSend.readerIndex(), toSendArray);
+ AddRequest.Builder addBuilder = AddRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId)
+ .setMasterKey(ByteString.copyFrom(masterKey))
+ .setBody(ByteString.copyFrom(toSendArray))
+ .setFlag(AddRequest.Flag.RECOVERY_ADD);
+
+ Request request =
PerChannelBookieClient.appendRequestContext(Request.newBuilder())
+ .setHeader(headerBuilder)
+ .setAddRequest(addBuilder)
+ .build();
+
+ Object res = this.reqEnDeV3.encode(request, ByteBufAllocator.DEFAULT);
+ ReferenceCountUtil.release(res);
+ MDC.clear();
+ }
+
+ static Request.Builder appendRequestContextNoMdc(Request.Builder builder) {
+ final BookkeeperProtocol.ContextPair context1 =
BookkeeperProtocol.ContextPair.newBuilder()
+ .setKey("parent_id")
+ .setValue("LetsPutSomeLongParentRequestIdHere")
+ .build();
+ builder.addRequestContext(context1);
+
+ final BookkeeperProtocol.ContextPair context2 =
BookkeeperProtocol.ContextPair.newBuilder()
+ .setKey("request_id")
+ .setValue("LetsPutSomeLongRequestIdHere")
+ .build();
+ builder.addRequestContext(context2);
+
+ return builder;
+ }
+
+ @Benchmark
+ public void testAddEntryV3WithExtraContextDataNoMdc() throws Exception {
+ // Build the request and calculate the total size to be included in
the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.ADD_ENTRY)
+ .setTxnId(0L);
+
+ ByteBuf toSend = entry.slice();
+ byte[] toSendArray = new byte[toSend.readableBytes()];
+ toSend.getBytes(toSend.readerIndex(), toSendArray);
+ AddRequest.Builder addBuilder = AddRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId)
+ .setMasterKey(ByteString.copyFrom(masterKey))
+ .setBody(ByteString.copyFrom(toSendArray))
+ .setFlag(AddRequest.Flag.RECOVERY_ADD);
+
+ Request request = appendRequestContextNoMdc(Request.newBuilder())
+ .setHeader(headerBuilder)
+ .setAddRequest(addBuilder)
+ .build();
+
+ Object res = this.reqEnDeV3.encode(request, ByteBufAllocator.DEFAULT);
+ ReferenceCountUtil.release(res);
+ }
}