This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 9df8dac Avoid acquiring closeLock.readLock() on every add/read
operation
9df8dac is described below
commit 9df8dac295c875ee3395d78a2bab3cda0c3c9ddd
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 29 16:04:37 2018 -0700
Avoid acquiring closeLock.readLock() on every add/read operation
In the `BookieClient`, we are always acquiring a readlock when grabbing a
connection to use for sending a write/read request.
The lock is the `closeLock` and it's only acquired in "write" mode when the
`BookKeeper` instance is closed.
The problem with the read-lock is that it introduces contention between the
threads that are acquiring it (even if all of them in read mode). Multiple
threads can be be in read mode in the critical section, though they have
contention when they're entering/exiting the section.
Additionally, the Java implementation of read/write lock is creating and
destroying a lot of objects when that contention happens.
My understanding of the code is that we don't need to acquire the read lock
in that point. The reason is that, we are already acquiring the lock in the
`lookupClient()` method, although only if the pool is null. Additionally, when
`Bookkeeper.close()` is invoked all PCBC will be set to closed as well, so it
will not be possibile to create a new connection.
All the line changes in the patch are just removing the readLock acquire
and try/finally, and reducing the indentation level.
Author: Matteo Merli <[email protected]>
Reviewers: Ivan Kelly <[email protected]>, Enrico Olivelli
<[email protected]>, Jia Zhai <None>, Sijie Guo <[email protected]>
This closes #1292 from merlimat/bookie-client-rw-lock and squashes the
following commits:
2104a3aa7 [Matteo Merli] Converted anonymous classes into lambdas
cabad14e5 [Matteo Merli] Avoid acquiring closeLock.readLock() on every
add/read operation
---
.../org/apache/bookkeeper/proto/BookieClient.java | 265 ++++++++-------------
1 file changed, 104 insertions(+), 161 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 0bd9fe8..285a06a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -21,6 +21,7 @@
package org.apache.bookkeeper.proto;
import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import com.google.common.collect.Lists;
import com.google.protobuf.ExtensionRegistry;
@@ -48,6 +49,7 @@ import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -62,7 +64,6 @@ import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,12 +114,9 @@ public class BookieClient implements
PerChannelBookieClientFactory {
this.scheduler = scheduler;
if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
- SafeRunnable monitor = new SafeRunnable() {
- @Override
- public void safeRun() {
- monitorPendingOperations();
- }
- };
+ SafeRunnable monitor = safeRun(() -> {
+ monitorPendingOperations();
+ });
this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
conf.getTimeoutMonitorIntervalSec(),
conf.getTimeoutMonitorIntervalSec(),
@@ -189,40 +187,29 @@ public class BookieClient implements
PerChannelBookieClientFactory {
public void writeLac(final BookieSocketAddress addr, final long ledgerId,
final byte[] masterKey,
final long lac, final ByteBufList toSend, final WriteLacCallback
cb, final Object ctx) {
- closeLock.readLock().lock();
- try {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
-
cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
- ledgerId, addr, ctx);
- return;
- }
-
- toSend.retain();
- client.obtain(new GenericCallback<PerChannelBookieClient>() {
- @Override
- public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
- if (rc != BKException.Code.OK) {
- try {
- executor.submitOrdered(ledgerId, new
SafeRunnable() {
- @Override
- public void safeRun() {
- cb.writeLacComplete(rc, ledgerId, addr,
ctx);
- }
- });
- } catch (RejectedExecutionException re) {
-
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId,
addr, ctx);
- }
- } else {
- pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb,
ctx);
- }
+ final PerChannelBookieClientPool client = lookupClient(addr);
+ if (client == null) {
+
cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+ ledgerId, addr, ctx);
+ return;
+ }
- toSend.release();
+ toSend.retain();
+ client.obtain((rc, pcbc) -> {
+ if (rc != BKException.Code.OK) {
+ try {
+ executor.submitOrdered(ledgerId, safeRun(() -> {
+ cb.writeLacComplete(rc, ledgerId, addr, ctx);
+ }));
+ } catch (RejectedExecutionException re) {
+
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId,
addr, ctx);
}
- }, ledgerId);
- } finally {
- closeLock.readLock().unlock();
- }
+ } else {
+ pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
+ }
+
+ toSend.release();
+ }, ledgerId);
}
private void completeAdd(final int rc,
@@ -255,26 +242,21 @@ public class BookieClient implements
PerChannelBookieClientFactory {
final WriteCallback cb,
final Object ctx,
final int options) {
- closeLock.readLock().lock();
- try {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
-
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
- ledgerId, entryId, addr, cb, ctx);
- return;
- }
+ final PerChannelBookieClientPool client = lookupClient(addr);
+ if (client == null) {
+
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
+ ledgerId, entryId, addr, cb, ctx);
+ return;
+ }
- // Retain the buffer, since the connection could be obtained after
- // the PendingApp might have already failed
- toSend.retain();
+ // Retain the buffer, since the connection could be obtained after
+ // the PendingApp might have already failed
+ toSend.retain();
- client.obtain(ChannelReadyForAddEntryCallback.create(
- this, toSend, ledgerId, entryId, addr,
- ctx, cb, options, masterKey),
- ledgerId);
- } finally {
- closeLock.readLock().unlock();
- }
+ client.obtain(ChannelReadyForAddEntryCallback.create(
+ this, toSend, ledgerId, entryId, addr,
+ ctx, cb, options, masterKey),
+ ledgerId);
}
private void completeRead(final int rc,
@@ -371,37 +353,26 @@ public class BookieClient implements
PerChannelBookieClientFactory {
public void readLac(final BookieSocketAddress addr, final long ledgerId,
final ReadLacCallback cb,
final Object ctx) {
- closeLock.readLock().lock();
- try {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
-
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, null, null,
- ctx);
- return;
- }
- client.obtain(new GenericCallback<PerChannelBookieClient>() {
- @Override
- public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
- if (rc != BKException.Code.OK) {
- try {
- executor.submitOrdered(ledgerId, new
SafeRunnable() {
- @Override
- public void safeRun() {
- cb.readLacComplete(rc, ledgerId, null,
null, ctx);
- }
- });
- } catch (RejectedExecutionException re) {
-
cb.readLacComplete(getRc(BKException.Code.InterruptedException),
- ledgerId, null, null, ctx);
- }
- return;
- }
- pcbc.readLac(ledgerId, cb, ctx);
- }
- }, ledgerId);
- } finally {
- closeLock.readLock().unlock();
+ final PerChannelBookieClientPool client = lookupClient(addr);
+ if (client == null) {
+
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, null, null,
+ ctx);
+ return;
}
+ client.obtain((rc, pcbc) -> {
+ if (rc != BKException.Code.OK) {
+ try {
+ executor.submitOrdered(ledgerId, safeRun(() -> {
+ cb.readLacComplete(rc, ledgerId, null, null, ctx);
+ }));
+ } catch (RejectedExecutionException re) {
+
cb.readLacComplete(getRc(BKException.Code.InterruptedException),
+ ledgerId, null, null, ctx);
+ }
+ } else {
+ pcbc.readLac(ledgerId, cb, ctx);
+ }
+ }, ledgerId);
}
public void readEntry(BookieSocketAddress addr, long ledgerId, long
entryId,
@@ -411,28 +382,20 @@ public class BookieClient implements
PerChannelBookieClientFactory {
public void readEntry(final BookieSocketAddress addr, final long ledgerId,
final long entryId,
final ReadEntryCallback cb, final Object ctx, int
flags, byte[] masterKey) {
- closeLock.readLock().lock();
- try {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
-
cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
- ledgerId, entryId, null, ctx);
- return;
- }
-
- client.obtain(new GenericCallback<PerChannelBookieClient>() {
- @Override
- public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
- if (rc != BKException.Code.OK) {
- completeRead(rc, ledgerId, entryId, null, cb, ctx);
- return;
- }
- pcbc.readEntry(ledgerId, entryId, cb, ctx, flags,
masterKey);
- }
- }, ledgerId);
- } finally {
- closeLock.readLock().unlock();
+ final PerChannelBookieClientPool client = lookupClient(addr);
+ if (client == null) {
+
cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+ ledgerId, entryId, null, ctx);
+ return;
}
+
+ client.obtain((rc, pcbc) -> {
+ if (rc != BKException.Code.OK) {
+ completeRead(rc, ledgerId, entryId, null, cb, ctx);
+ } else {
+ pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey);
+ }
+ }, ledgerId);
}
@@ -444,65 +407,45 @@ public class BookieClient implements
PerChannelBookieClientFactory {
final boolean piggyBackEntry,
final ReadEntryCallback cb,
final Object ctx) {
- closeLock.readLock().lock();
- try {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
-
completeRead(BKException.Code.BookieHandleNotAvailableException,
- ledgerId, entryId, null, cb, ctx);
- return;
- }
-
- client.obtain(new GenericCallback<PerChannelBookieClient>() {
- @Override
- public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
-
- if (rc != BKException.Code.OK) {
- completeRead(rc, ledgerId, entryId, null, cb, ctx);
- return;
- }
- pcbc.readEntryWaitForLACUpdate(ledgerId, entryId,
previousLAC, timeOutInMillis, piggyBackEntry, cb,
- ctx);
- }
- }, ledgerId);
- } finally {
- closeLock.readLock().unlock();
+ final PerChannelBookieClientPool client = lookupClient(addr);
+ if (client == null) {
+ completeRead(BKException.Code.BookieHandleNotAvailableException,
+ ledgerId, entryId, null, cb, ctx);
+ return;
}
+
+ client.obtain((rc, pcbc) -> {
+ if (rc != BKException.Code.OK) {
+ completeRead(rc, ledgerId, entryId, null, cb, ctx);
+ } else {
+ pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC,
timeOutInMillis, piggyBackEntry, cb,
+ ctx);
+ }
+ }, ledgerId);
}
public void getBookieInfo(final BookieSocketAddress addr, final long
requested, final GetBookieInfoCallback cb,
final Object ctx) {
- closeLock.readLock().lock();
- try {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
-
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
new BookieInfo(),
- ctx);
- return;
- }
- client.obtain(new GenericCallback<PerChannelBookieClient>() {
- @Override
- public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
- if (rc != BKException.Code.OK) {
- try {
- executor.submit(new SafeRunnable() {
- @Override
- public void safeRun() {
- cb.getBookieInfoComplete(rc, new
BookieInfo(), ctx);
- }
- });
- } catch (RejectedExecutionException re) {
-
cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
- new BookieInfo(), ctx);
- }
- return;
- }
- pcbc.getBookieInfo(requested, cb, ctx);
- }
- }, requested);
- } finally {
- closeLock.readLock().unlock();
+ final PerChannelBookieClientPool client = lookupClient(addr);
+ if (client == null) {
+
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
new BookieInfo(),
+ ctx);
+ return;
}
+ client.obtain((rc, pcbc) -> {
+ if (rc != BKException.Code.OK) {
+ try {
+ executor.submit(safeRun(() -> {
+ cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
+ }));
+ } catch (RejectedExecutionException re) {
+
cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
+ new BookieInfo(), ctx);
+ }
+ } else {
+ pcbc.getBookieInfo(requested, cb, ctx);
+ }
+ }, requested);
}
private void monitorPendingOperations() {
--
To stop receiving notification emails like this one, please contact
[email protected].