This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.14 by this push:
     new dab1e981bc Testing the memory limit
dab1e981bc is described below

commit dab1e981bc1497341f108e55de2dc3c5f1447615
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Jan 18 14:46:44 2022 +0800

    Testing the memory limit
    
    (cherry picked from commit 741c6635b65d5bef400f205127261c3b8d558aac)
---
 .../bookkeeper/conf/ClientConfiguration.java       | 22 +++++++
 .../apache/bookkeeper/proto/BookieClientImpl.java  | 75 ++++++++++++++++++++--
 .../bookkeeper/proto/BookieRequestProcessor.java   | 18 ++++++
 .../proto/BookkeeperInternalCallbacks.java         |  4 ++
 .../bookkeeper/proto/PerChannelBookieClient.java   | 24 +++++--
 .../apache/bookkeeper/test/BookieClientTest.java   | 16 +++++
 6 files changed, 148 insertions(+), 11 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 80c2ad4b9d..e900388485 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -198,6 +198,10 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     protected static final String 
CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
             "clientConnectBookieUnavailableLogThrottling";
 
+    // client memory limit options
+    protected static final String CLIENT_MEMORY_LIMIT_ENABLED = 
"clientMemoryLimitEnabled";
+    protected static final String CLIENT_MEMORY_LIMIT_BY_BYTES = 
"clientMemoryLimitByBytes";
+
     /**
      * Construct a default client-side configuration.
      */
@@ -2008,6 +2012,24 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
         return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING, 
5_000L);
     }
 
+    public ClientConfiguration setClientMemoryLimitEnabled(boolean enabled) {
+        setProperty(CLIENT_MEMORY_LIMIT_ENABLED, enabled);
+        return this;
+    }
+
+    public boolean getClientMemoryLimitEnabled() {
+        return getBoolean(CLIENT_MEMORY_LIMIT_ENABLED, false);
+    }
+
+    public ClientConfiguration setClientMemoryLimitByBytes(int bytes) {
+        setProperty(CLIENT_MEMORY_LIMIT_BY_BYTES, bytes);
+        return this;
+    }
+
+    public int getClientMemoryLimitByBytes() {
+        return getInt(CLIENT_MEMORY_LIMIT_BY_BYTES, 64 * 1024 * 1024);
+    }
+
     @Override
     protected ClientConfiguration getThis() {
         return this;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index 03e3c068e4..b9493f43b2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -38,6 +38,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -52,6 +53,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.MemoryLimitController;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -65,6 +67,7 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
@@ -103,6 +106,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
     private final BookieAddressResolver bookieAddressResolver;
 
     private final long bookieErrorThresholdPerInterval;
+    private Optional<MemoryLimitController> memoryLimitController;
 
     public BookieClientImpl(ClientConfiguration conf, EventLoopGroup 
eventLoopGroup,
                             ByteBufAllocator allocator,
@@ -137,6 +141,16 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
         } else {
             this.timeoutFuture = null;
         }
+
+        if (conf.getClientMemoryLimitEnabled()) {
+            memoryLimitController = Optional.of(new 
MemoryLimitController(conf.getClientMemoryLimitByBytes()));
+        } else {
+            memoryLimitController = Optional.empty();
+        }
+    }
+
+    public Optional<MemoryLimitController> getMemoryLimitController() {
+        return memoryLimitController;
     }
 
     private int getRc(int rc) {
@@ -325,11 +339,33 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
         // Retain the buffer, since the connection could be obtained after
         // the PendingApp might have already failed
         toSend.retain();
-
+        Optional<WriteAndFlushCallback> callback = Optional.empty();
+        try {
+            callback = setMemoryLimit(entryId, toSend.readableBytes());
+        } catch (InterruptedException e) {
+            completeAdd(getRc(BKException.Code.IllegalOpException), ledgerId, 
entryId, addr, cb, ctx);
+            LOG.error("Failed to set memory limit when adding entry {}:{}", 
ledgerId, entryId, e);
+            return;
+        }
         client.obtain(ChannelReadyForAddEntryCallback.create(
-                              this, toSend, ledgerId, entryId, addr,
-                                  ctx, cb, options, masterKey, allowFastFail, 
writeFlags),
-                      ledgerId);
+                this, toSend, ledgerId, entryId, addr,
+                ctx, cb, options, masterKey, allowFastFail, writeFlags, 
callback),
+            ledgerId);
+    }
+
+    private Optional<WriteAndFlushCallback> setMemoryLimit(final long entryId, 
final long entrySize) throws InterruptedException {
+        if (getMemoryLimitController().isPresent()) {
+            MemoryLimitController mlc = getMemoryLimitController().get();
+            mlc.reserveMemory(entrySize);
+            LOG.debug("Acquire memory size {} for entry {}, current usage {} 
", entrySize, entryId,
+                mlc.currentUsage());
+            WriteAndFlushCallbackImpl callback = new 
WriteAndFlushCallbackImpl();
+            callback.setBookieClient(this);
+            callback.setSize(entrySize);
+            callback.setEntryId(entryId);
+            return Optional.of(callback);
+        }
+        return Optional.empty();
     }
 
     @Override
@@ -378,6 +414,31 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
         }
     }
 
+    private static class WriteAndFlushCallbackImpl implements 
WriteAndFlushCallback {
+
+        private BookieClientImpl bookieClient;
+        private long size;
+        private long entryId;
+
+        public void setBookieClient(BookieClientImpl bookieClient) {
+            this.bookieClient = bookieClient;
+        }
+
+        public void setSize(long size) {
+            this.size = size;
+        }
+
+        public void setEntryId(long entryId) {
+            this.entryId = entryId;
+        }
+
+        @Override
+        public void complete() {
+            bookieClient.getMemoryLimitController().get().releaseMemory(size);
+            LOG.debug("Release memory size {} for entry {}", size, entryId);
+        }
+    }
+
     private static class ChannelReadyForAddEntryCallback
         implements GenericCallback<PerChannelBookieClient> {
         private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
@@ -393,12 +454,13 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
         private byte[] masterKey;
         private boolean allowFastFail;
         private EnumSet<WriteFlag> writeFlags;
+        private Optional<WriteAndFlushCallback> writeAndFlushCallback;
 
         static ChannelReadyForAddEntryCallback create(
                 BookieClientImpl bookieClient, ByteBufList toSend, long 
ledgerId,
                 long entryId, BookieId addr, Object ctx,
                 WriteCallback cb, int options, byte[] masterKey, boolean 
allowFastFail,
-                EnumSet<WriteFlag> writeFlags) {
+                EnumSet<WriteFlag> writeFlags, Optional<WriteAndFlushCallback> 
writeAndFlushCallback) {
             ChannelReadyForAddEntryCallback callback = RECYCLER.get();
             callback.bookieClient = bookieClient;
             callback.toSend = toSend;
@@ -411,6 +473,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
             callback.masterKey = masterKey;
             callback.allowFastFail = allowFastFail;
             callback.writeFlags = writeFlags;
+            callback.writeAndFlushCallback = writeAndFlushCallback;
             return callback;
         }
 
@@ -421,7 +484,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
                 bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
             } else {
                 pcbc.addEntry(ledgerId, masterKey, entryId,
-                              toSend, cb, ctx, options, allowFastFail, 
writeFlags);
+                    toSend, cb, ctx, options, allowFastFail, writeFlags, 
writeAndFlushCallback);
             }
 
             toSend.release();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 67f83e9ce5..e36b579942 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -298,6 +298,8 @@ public class BookieRequestProcessor implements 
RequestProcessor {
         }
     }
 
+    private static final String SLEEP_TIME = 
System.getenv("SLEEP_TIME_FOR_TESTING");
+
     @Override
     public void processRequest(Object msg, Channel c) {
         // If we can decode this packet as a Request protobuf packet, process
@@ -309,6 +311,14 @@ public class BookieRequestProcessor implements 
RequestProcessor {
                 BookkeeperProtocol.BKPacketHeader header = r.getHeader();
                 switch (header.getOperation()) {
                     case ADD_ENTRY:
+                        LOG.info("Using v3 protocol to resolve the request, 
wait for {}", SLEEP_TIME);
+                        int seconds = Integer.parseInt(SLEEP_TIME);
+                        try {
+                            TimeUnit.MILLISECONDS.sleep(seconds);
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                        LOG.info("Continue to process the add entry request");
                         processAddRequestV3(r, c);
                         break;
                     case READ_ENTRY:
@@ -364,6 +374,14 @@ public class BookieRequestProcessor implements 
RequestProcessor {
             // process packet
             switch (r.getOpCode()) {
                 case BookieProtocol.ADDENTRY:
+                    LOG.info("Using v2 protocol to resolve the request, wait 
for {}", SLEEP_TIME);
+                    int seconds = Integer.parseInt(SLEEP_TIME);
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(seconds);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    LOG.info("Continue to process the add entry request");
                     checkArgument(r instanceof 
BookieProtocol.ParsedAddRequest);
                     processAddRequest((BookieProtocol.ParsedAddRequest) r, c);
                     break;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index 9904c90aef..f6333f6ac5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -80,6 +80,10 @@ public class BookkeeperInternalCallbacks {
         void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, 
Object ctx);
     }
 
+    public interface WriteAndFlushCallback {
+        void complete();
+    }
+
     /**
      * A last-add-confirmed (LAC) reader callback interface.
      */
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 12209a636d..56d93b77dd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -104,6 +104,7 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.StartTLSCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -754,7 +755,8 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
      *          WriteFlags
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, 
ByteBufList toSend, WriteCallback cb,
-                  Object ctx, final int options, boolean allowFastFail, final 
EnumSet<WriteFlag> writeFlags) {
+                  Object ctx, final int options, boolean allowFastFail, final 
EnumSet<WriteFlag> writeFlags,
+                  Optional<WriteAndFlushCallback> wfc) {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
@@ -763,6 +765,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                 executor.executeOrdered(ledgerId, () -> {
                     cb.writeComplete(BKException.Code.IllegalOpException, 
ledgerId, entryId, bookieId, ctx);
                 });
+                wfc.ifPresent(WriteAndFlushCallback::complete);
                 return;
             }
             completionKey = acquireV2Key(ledgerId, entryId, 
OperationType.ADD_ENTRY);
@@ -822,10 +825,11 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             // because we need to release toSend.
             errorOut(completionKey);
             toSend.release();
+            wfc.ifPresent(WriteAndFlushCallback::complete);
             return;
         } else {
             // addEntry times out on backpressure
-            writeAndFlush(c, completionKey, request, allowFastFail);
+            writeAndFlush(c, completionKey, request, allowFastFail, wfc);
         }
     }
 
@@ -1107,9 +1111,18 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
     }
 
     private void writeAndFlush(final Channel channel,
-                           final CompletionKey key,
-                           final Object request,
-                           final boolean allowFastFail) {
+                               final CompletionKey key,
+                               final Object request,
+                               final boolean allowFastFail) {
+        writeAndFlush(channel, key, request, allowFastFail, Optional.empty());
+
+    }
+
+    private void writeAndFlush(final Channel channel,
+                               final CompletionKey key,
+                               final Object request,
+                               final boolean allowFastFail,
+                               final Optional<WriteAndFlushCallback> wfc) {
         if (channel == null) {
             LOG.warn("Operation {} failed: channel == null", 
StringUtils.requestToString(request));
             errorOut(key);
@@ -1143,6 +1156,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                 } else {
                     
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), 
TimeUnit.NANOSECONDS);
                 }
+                wfc.ifPresent(WriteAndFlushCallback::complete);
             });
 
             channel.writeAndFlush(request, promise);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 944203c3e5..9d3ed9cf0d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -33,6 +33,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
@@ -343,4 +344,19 @@ public class BookieClientTest {
         assertEquals("BookieInfoSuccessCount", expectedBookieInfoSuccessCount,
                 perChannelBookieClientScopeOfThisAddr.getSuccessCount());
     }
+
+    @Test
+    public void testMemoryLimit() throws Exception {
+        ResultStruct arc = new ResultStruct();
+        BookieId addr = bs.getBookieId();
+        byte[] pwd = "".getBytes(StandardCharsets.UTF_8);
+
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setClientMemoryLimitEnabled(true);
+        conf.setClientMemoryLimitByBytes(10);
+        BookieClient bc = new BookieClientImpl();
+        ByteBufList bb = createByteBuffer(1, 1, 1);
+        System.out.println(bb.readableBytes());
+//        bc.addEntry(addr, 1, pwd, 1, );
+    }
 }

Reply via email to