This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c682ecb Use netty DefaultThreadFactory everywhere to take advantage
of fast thread local access
c682ecb is described below
commit c682ecb076c9cf8b845917244acfc2bc575f4b9d
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Dec 7 16:46:02 2017 -0800
Use netty DefaultThreadFactory everywhere to take advantage of fast thread
local access
Merging
https://github.com/yahoo/bookkeeper/commit/38ff21ad8bbd8ac8ec8a4898ef9ecf473fea94b7
from Yahoo branch.
Netty has a special `FastThreadLocal` class that is used in many places
(eg: when borrowing a buffer from a pool). The mechanism in which it works is
that the thread has to be created from a particular thread factory to take
advantage of it (it adds a couple of fields subclassing the `Thread` class).
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #823 from merlimat/thread-factory
---
.../org/apache/bookkeeper/bookie/GarbageCollectorThread.java | 8 ++++----
.../main/java/org/apache/bookkeeper/bookie/SyncThread.java | 11 +++++++----
.../main/java/org/apache/bookkeeper/client/BookKeeper.java | 9 ++++-----
.../java/org/apache/bookkeeper/client/UpdateLedgerOp.java | 7 ++++---
.../org/apache/bookkeeper/meta/AbstractZkLedgerManager.java | 7 +++----
.../java/org/apache/bookkeeper/proto/BookieNettyServer.java | 4 ++--
.../java/org/apache/bookkeeper/util/OrderedSafeExecutor.java | 8 +++++---
7 files changed, 29 insertions(+), 25 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 617a656..588c3d7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -22,7 +22,8 @@
package org.apache.bookkeeper.bookie;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@@ -111,10 +112,9 @@ public class GarbageCollectorThread extends SafeRunnable {
LedgerManager ledgerManager,
final CompactableLedgerStorage ledgerStorage)
throws IOException {
- gcExecutor = Executors.newSingleThreadScheduledExecutor(
- new
ThreadFactoryBuilder().setNameFormat("GarbageCollectorThread-%d").build()
- );
+ gcExecutor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("GarbageCollectorThread"));
this.conf = conf;
+
this.entryLogger = ledgerStorage.getEntryLogger();
this.ledgerStorage = ledgerStorage;
this.gcWaitTime = conf.getGcWaitTime();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index 44bf6ae..3fbedbc 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -22,19 +22,24 @@
package org.apache.bookkeeper.bookie;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.MathUtils;
+
/**
* SyncThread is a background thread which help checkpointing ledger storage
* when a checkpoint is requested. After a ledger storage is checkpointed,
@@ -72,9 +77,7 @@ class SyncThread implements Checkpointer {
this.dirsListener = dirsListener;
this.ledgerStorage = ledgerStorage;
this.checkpointSource = checkpointSource;
- ThreadFactoryBuilder tfb = new ThreadFactoryBuilder()
- .setNameFormat("SyncThread-" + conf.getBookiePort() + "-%d");
- this.executor =
Executors.newSingleThreadScheduledExecutor(tfb.build());
+ this.executor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("SyncThread"));
flushInterval = conf.getFlushInterval();
if (log.isDebugEnabled()) {
log.debug("Flush Interval : {}", flushInterval);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 756e360..99e9484 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -29,6 +29,8 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -81,7 +83,6 @@ import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* BookKeeper client.
*
@@ -405,10 +406,8 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
this.reorderReadSequence = conf.isReorderReadSequenceEnabled();
// initialize resources
- ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
- "BookKeeperClientScheduler-%d");
this.scheduler = Executors
- .newSingleThreadScheduledExecutor(tfb.build());
+ .newSingleThreadScheduledExecutor(new
DefaultThreadFactory("BookKeeperClientScheduler"));
this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
.name("BookKeeperClientWorker")
.numThreads(conf.getNumWorkerThreads())
@@ -1430,7 +1429,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
}
static EventLoopGroup getDefaultEventLoopGroup() {
- ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat("bookkeeper-io-%s").build();
+ ThreadFactory threadFactory = new
DefaultThreadFactory("bookkeeper-io");
final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
if (SystemUtils.IS_OS_LINUX) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
index 8a354d6..8199e67 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
@@ -23,7 +23,8 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -82,8 +83,8 @@ public class UpdateLedgerOp {
public void updateBookieIdInLedgers(final BookieSocketAddress oldBookieId,
final BookieSocketAddress newBookieId,
final int rate, final int limit, final UpdateLedgerNotifier
progressable) throws IOException {
- final ThreadFactoryBuilder tfb = new
ThreadFactoryBuilder().setNameFormat("UpdateLedgerThread").setDaemon(true);
- final ExecutorService executor =
Executors.newSingleThreadExecutor(tfb.build());
+ final ExecutorService executor = Executors
+ .newSingleThreadExecutor(new
DefaultThreadFactory("UpdateLedgerThread", true));
final AtomicInteger issuedLedgerCnt = new AtomicInteger();
final AtomicInteger updatedLedgerCnt = new AtomicInteger();
final Future<?> updateBookieCb = executor.submit(new Runnable() {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 3546d89..f5d8e7f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -18,7 +18,8 @@
package org.apache.bookkeeper.meta;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.HashSet;
@@ -147,10 +148,8 @@ abstract class AbstractZkLedgerManager implements
LedgerManager, Watcher {
this.conf = conf;
this.zk = zk;
this.ledgerRootPath = conf.getZkLedgersRootPath();
- ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
- "ZkLedgerManagerScheduler-%d");
this.scheduler = Executors
- .newSingleThreadScheduledExecutor(tfb.build());
+ .newSingleThreadScheduledExecutor(new
DefaultThreadFactory("ZkLedgerManagerScheduler"));
if (LOG.isDebugEnabled()) {
LOG.debug("Using AbstractZkLedgerManager with root path : {}",
ledgerRootPath);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 1ab80c9..a2d7fc9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.proto;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ExtensionRegistry;
import io.netty.bootstrap.ServerBootstrap;
@@ -49,6 +48,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -109,7 +109,7 @@ class BookieNettyServer {
this.authProviderFactory =
AuthProviderFactoryFactory.newBookieAuthProviderFactory(conf);
if (!conf.isDisableServerSocketBind()) {
- ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat("bookie-io-%s").build();
+ ThreadFactory threadFactory = new
DefaultThreadFactory("bookie-io");
final int numThreads = Runtime.getRuntime().availableProcessors()
* 2;
EventLoopGroup eventLoopGroup;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
index db30e55..f3af7df 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
@@ -18,12 +18,15 @@
package org.apache.bookkeeper.util;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.concurrent.Executors;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
@@ -58,12 +61,11 @@ public class OrderedSafeExecutor extends
org.apache.bookkeeper.common.util.Order
public OrderedSafeExecutor build() {
if (null == threadFactory) {
- threadFactory = Executors.defaultThreadFactory();
+ threadFactory = new
DefaultThreadFactory("bookkeeper-ordered-safe-executor");
}
return new OrderedSafeExecutor(name, numThreads, threadFactory,
statsLogger,
traceTaskExecution,
warnTimeMicroSec, maxTasksInQueue);
}
-
}
/**
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].