eolivelli commented on code in PR #3139:
URL: https://github.com/apache/bookkeeper/pull/3139#discussion_r895119048


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java:
##########
@@ -200,6 +200,10 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     protected static final String 
CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
             "clientConnectBookieUnavailableLogThrottling";
 
+    // client memory limit options
+    protected static final String CLIENT_MEMORY_LIMIT_ENABLED = 
"clientMemoryLimitEnabled";

Review Comment:
   It looks like this is only for writes.
   We should reflect this in the name



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java:
##########
@@ -325,11 +339,33 @@ public void addEntry(final BookieId addr,
         // Retain the buffer, since the connection could be obtained after
         // the PendingApp might have already failed
         toSend.retain();
-
+        Optional<WriteAndFlushCallback> callback = Optional.empty();
+        try {
+            callback = setMemoryLimit(entryId, toSend.readableBytes());
+        } catch (InterruptedException e) {
+            completeAdd(getRc(BKException.Code.IllegalOpException), ledgerId, 
entryId, addr, cb, ctx);
+            LOG.error("Failed to set memory limit when adding entry {}:{}", 
ledgerId, entryId, e);
+            return;
+        }
         client.obtain(ChannelReadyForAddEntryCallback.create(
-                              this, toSend, ledgerId, entryId, addr,
-                                  ctx, cb, options, masterKey, allowFastFail, 
writeFlags),
-                      ledgerId);
+                this, toSend, ledgerId, entryId, addr,
+                ctx, cb, options, masterKey, allowFastFail, writeFlags, 
callback),
+            ledgerId);
+    }
+
+    private Optional<WriteAndFlushCallback> setMemoryLimit(final long entryId, 
final long entrySize) throws InterruptedException {
+        if (getMemoryLimitController().isPresent()) {
+            MemoryLimitController mlc = getMemoryLimitController().get();
+            mlc.reserveMemory(entrySize);

Review Comment:
   We should handle this with a timeout otherwise we may block forever?  



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java:
##########
@@ -378,6 +414,31 @@ public void safeRun() {
         }
     }
 
+    private static class WriteAndFlushCallbackImpl implements 
WriteAndFlushCallback {

Review Comment:
   This should be probably recycled



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java:
##########
@@ -325,11 +339,33 @@ public void addEntry(final BookieId addr,
         // Retain the buffer, since the connection could be obtained after
         // the PendingApp might have already failed
         toSend.retain();
-
+        Optional<WriteAndFlushCallback> callback = Optional.empty();
+        try {
+            callback = setMemoryLimit(entryId, toSend.readableBytes());

Review Comment:
   It seems to me that here it is to late to do the memory accounting as the 
buffer is already allocated we should move this before the allocation of the 
buffer that is going to be retained.
   Also, we should really make it clear which is the memory that we are 
counting with this counter.
   



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java:
##########
@@ -103,6 +106,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
     private final BookieAddressResolver bookieAddressResolver;
 
     private final long bookieErrorThresholdPerInterval;
+    private Optional<MemoryLimitController> memoryLimitController;

Review Comment:
   final?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to