This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9df8dac  Avoid acquiring closeLock.readLock() on every add/read 
operation
9df8dac is described below

commit 9df8dac295c875ee3395d78a2bab3cda0c3c9ddd
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 29 16:04:37 2018 -0700

    Avoid acquiring closeLock.readLock() on every add/read operation
    
    In the `BookieClient`, we are always acquiring a readlock when grabbing a 
connection to use for sending a write/read request.
    
    The lock is the `closeLock` and it's only acquired in "write" mode when the 
`BookKeeper` instance is closed.
    
    The problem with the read-lock is that it introduces contention between the 
threads that are acquiring it (even if all of them in read mode). Multiple 
threads can be be in read mode in the critical section, though they have 
contention when they're entering/exiting the section.
    
    Additionally, the Java implementation of read/write lock is creating and 
destroying a lot of objects when that contention happens.
    
    My understanding of the code is that we don't need to acquire the read lock 
in that point. The reason is that, we are already acquiring the lock in the 
`lookupClient()` method, although only if the pool is null. Additionally, when 
`Bookkeeper.close()` is invoked all PCBC will be set to closed as well, so it 
will not be possibile to create a new connection.
    
    All the line changes in the patch are just removing the readLock acquire 
and try/finally, and reducing the indentation level.
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Ivan Kelly <[email protected]>, Enrico Olivelli 
<[email protected]>, Jia Zhai <None>, Sijie Guo <[email protected]>
    
    This closes #1292 from merlimat/bookie-client-rw-lock and squashes the 
following commits:
    
    2104a3aa7 [Matteo Merli] Converted anonymous classes into lambdas
    cabad14e5 [Matteo Merli] Avoid acquiring closeLock.readLock() on every 
add/read operation
---
 .../org/apache/bookkeeper/proto/BookieClient.java  | 265 ++++++++-------------
 1 file changed, 104 insertions(+), 161 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 0bd9fe8..285a06a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.proto;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ExtensionRegistry;
@@ -48,6 +49,7 @@ import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -62,7 +64,6 @@ import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,12 +114,9 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
 
         this.scheduler = scheduler;
         if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
-            SafeRunnable monitor = new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        monitorPendingOperations();
-                    }
-                };
+            SafeRunnable monitor = safeRun(() -> {
+                monitorPendingOperations();
+            });
             this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
                                                                     
conf.getTimeoutMonitorIntervalSec(),
                                                                     
conf.getTimeoutMonitorIntervalSec(),
@@ -189,40 +187,29 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
 
     public void writeLac(final BookieSocketAddress addr, final long ledgerId, 
final byte[] masterKey,
             final long lac, final ByteBufList toSend, final WriteLacCallback 
cb, final Object ctx) {
-        closeLock.readLock().lock();
-        try {
-            final PerChannelBookieClientPool client = lookupClient(addr);
-            if (client == null) {
-                
cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                                  ledgerId, addr, ctx);
-                return;
-            }
-
-            toSend.retain();
-            client.obtain(new GenericCallback<PerChannelBookieClient>() {
-                @Override
-                public void operationComplete(final int rc, 
PerChannelBookieClient pcbc) {
-                    if (rc != BKException.Code.OK) {
-                        try {
-                            executor.submitOrdered(ledgerId, new 
SafeRunnable() {
-                                @Override
-                                public void safeRun() {
-                                    cb.writeLacComplete(rc, ledgerId, addr, 
ctx);
-                                }
-                            });
-                        } catch (RejectedExecutionException re) {
-                            
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, 
addr, ctx);
-                        }
-                    } else {
-                        pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, 
ctx);
-                    }
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            
cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                              ledgerId, addr, ctx);
+            return;
+        }
 
-                    toSend.release();
+        toSend.retain();
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                try {
+                    executor.submitOrdered(ledgerId, safeRun(() -> {
+                        cb.writeLacComplete(rc, ledgerId, addr, ctx);
+                    }));
+                } catch (RejectedExecutionException re) {
+                    
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, 
addr, ctx);
                 }
-            }, ledgerId);
-        } finally {
-            closeLock.readLock().unlock();
-        }
+            } else {
+                pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
+            }
+
+            toSend.release();
+        }, ledgerId);
     }
 
     private void completeAdd(final int rc,
@@ -255,26 +242,21 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
                          final WriteCallback cb,
                          final Object ctx,
                          final int options) {
-        closeLock.readLock().lock();
-        try {
-            final PerChannelBookieClientPool client = lookupClient(addr);
-            if (client == null) {
-                
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
-                            ledgerId, entryId, addr, cb, ctx);
-                return;
-            }
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
+                        ledgerId, entryId, addr, cb, ctx);
+            return;
+        }
 
-            // Retain the buffer, since the connection could be obtained after
-            // the PendingApp might have already failed
-            toSend.retain();
+        // Retain the buffer, since the connection could be obtained after
+        // the PendingApp might have already failed
+        toSend.retain();
 
-            client.obtain(ChannelReadyForAddEntryCallback.create(
-                                  this, toSend, ledgerId, entryId, addr,
-                                  ctx, cb, options, masterKey),
-                          ledgerId);
-        } finally {
-            closeLock.readLock().unlock();
-        }
+        client.obtain(ChannelReadyForAddEntryCallback.create(
+                              this, toSend, ledgerId, entryId, addr,
+                              ctx, cb, options, masterKey),
+                      ledgerId);
     }
 
     private void completeRead(final int rc,
@@ -371,37 +353,26 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
 
     public void readLac(final BookieSocketAddress addr, final long ledgerId, 
final ReadLacCallback cb,
             final Object ctx) {
-        closeLock.readLock().lock();
-        try {
-            final PerChannelBookieClientPool client = lookupClient(addr);
-            if (client == null) {
-                
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), 
ledgerId, null, null,
-                        ctx);
-                return;
-            }
-            client.obtain(new GenericCallback<PerChannelBookieClient>() {
-                @Override
-                public void operationComplete(final int rc, 
PerChannelBookieClient pcbc) {
-                    if (rc != BKException.Code.OK) {
-                        try {
-                            executor.submitOrdered(ledgerId, new 
SafeRunnable() {
-                                @Override
-                                public void safeRun() {
-                                    cb.readLacComplete(rc, ledgerId, null, 
null, ctx);
-                                }
-                            });
-                        } catch (RejectedExecutionException re) {
-                            
cb.readLacComplete(getRc(BKException.Code.InterruptedException),
-                                    ledgerId, null, null, ctx);
-                        }
-                        return;
-                    }
-                    pcbc.readLac(ledgerId, cb, ctx);
-                }
-            }, ledgerId);
-        } finally {
-            closeLock.readLock().unlock();
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), 
ledgerId, null, null,
+                    ctx);
+            return;
         }
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                try {
+                    executor.submitOrdered(ledgerId, safeRun(() -> {
+                        cb.readLacComplete(rc, ledgerId, null, null, ctx);
+                    }));
+                } catch (RejectedExecutionException re) {
+                    
cb.readLacComplete(getRc(BKException.Code.InterruptedException),
+                            ledgerId, null, null, ctx);
+                }
+            } else {
+                pcbc.readLac(ledgerId, cb, ctx);
+            }
+        }, ledgerId);
     }
 
     public void readEntry(BookieSocketAddress addr, long ledgerId, long 
entryId,
@@ -411,28 +382,20 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
 
     public void readEntry(final BookieSocketAddress addr, final long ledgerId, 
final long entryId,
                           final ReadEntryCallback cb, final Object ctx, int 
flags, byte[] masterKey) {
-        closeLock.readLock().lock();
-        try {
-            final PerChannelBookieClientPool client = lookupClient(addr);
-            if (client == null) {
-                
cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                                     ledgerId, entryId, null, ctx);
-                return;
-            }
-
-            client.obtain(new GenericCallback<PerChannelBookieClient>() {
-                @Override
-                public void operationComplete(final int rc, 
PerChannelBookieClient pcbc) {
-                    if (rc != BKException.Code.OK) {
-                        completeRead(rc, ledgerId, entryId, null, cb, ctx);
-                        return;
-                    }
-                    pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, 
masterKey);
-                }
-            }, ledgerId);
-        } finally {
-            closeLock.readLock().unlock();
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            
cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                                 ledgerId, entryId, null, ctx);
+            return;
         }
+
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                completeRead(rc, ledgerId, entryId, null, cb, ctx);
+            } else {
+                pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey);
+            }
+        }, ledgerId);
     }
 
 
@@ -444,65 +407,45 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
                                           final boolean piggyBackEntry,
                                           final ReadEntryCallback cb,
                                           final Object ctx) {
-        closeLock.readLock().lock();
-        try {
-            final PerChannelBookieClientPool client = lookupClient(addr);
-            if (client == null) {
-                
completeRead(BKException.Code.BookieHandleNotAvailableException,
-                        ledgerId, entryId, null, cb, ctx);
-                return;
-            }
-
-            client.obtain(new GenericCallback<PerChannelBookieClient>() {
-                @Override
-                public void operationComplete(final int rc, 
PerChannelBookieClient pcbc) {
-
-                    if (rc != BKException.Code.OK) {
-                        completeRead(rc, ledgerId, entryId, null, cb, ctx);
-                        return;
-                    }
-                    pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, 
previousLAC, timeOutInMillis, piggyBackEntry, cb,
-                            ctx);
-                }
-            }, ledgerId);
-        } finally {
-            closeLock.readLock().unlock();
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            completeRead(BKException.Code.BookieHandleNotAvailableException,
+                    ledgerId, entryId, null, cb, ctx);
+            return;
         }
+
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                completeRead(rc, ledgerId, entryId, null, cb, ctx);
+            } else {
+                pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, 
timeOutInMillis, piggyBackEntry, cb,
+                        ctx);
+            }
+        }, ledgerId);
     }
 
     public void getBookieInfo(final BookieSocketAddress addr, final long 
requested, final GetBookieInfoCallback cb,
             final Object ctx) {
-        closeLock.readLock().lock();
-        try {
-            final PerChannelBookieClientPool client = lookupClient(addr);
-            if (client == null) {
-                
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
 new BookieInfo(),
-                        ctx);
-                return;
-            }
-            client.obtain(new GenericCallback<PerChannelBookieClient>() {
-                @Override
-                public void operationComplete(final int rc, 
PerChannelBookieClient pcbc) {
-                    if (rc != BKException.Code.OK) {
-                        try {
-                            executor.submit(new SafeRunnable() {
-                                @Override
-                                public void safeRun() {
-                                    cb.getBookieInfoComplete(rc, new 
BookieInfo(), ctx);
-                                }
-                            });
-                        } catch (RejectedExecutionException re) {
-                            
cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
-                                    new BookieInfo(), ctx);
-                        }
-                        return;
-                    }
-                    pcbc.getBookieInfo(requested, cb, ctx);
-                }
-            }, requested);
-        } finally {
-            closeLock.readLock().unlock();
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
 new BookieInfo(),
+                    ctx);
+            return;
         }
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                try {
+                    executor.submit(safeRun(() -> {
+                        cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
+                    }));
+                } catch (RejectedExecutionException re) {
+                    
cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
+                            new BookieInfo(), ctx);
+                }
+            } else {
+                pcbc.getBookieInfo(requested, cb, ctx);
+            }
+        }, requested);
     }
 
     private void monitorPendingOperations() {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to