This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.14 by this push:
new dab1e981bc Testing the memory limit
dab1e981bc is described below
commit dab1e981bc1497341f108e55de2dc3c5f1447615
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Jan 18 14:46:44 2022 +0800
Testing the memory limit
(cherry picked from commit 741c6635b65d5bef400f205127261c3b8d558aac)
---
.../bookkeeper/conf/ClientConfiguration.java | 22 +++++++
.../apache/bookkeeper/proto/BookieClientImpl.java | 75 ++++++++++++++++++++--
.../bookkeeper/proto/BookieRequestProcessor.java | 18 ++++++
.../proto/BookkeeperInternalCallbacks.java | 4 ++
.../bookkeeper/proto/PerChannelBookieClient.java | 24 +++++--
.../apache/bookkeeper/test/BookieClientTest.java | 16 +++++
6 files changed, 148 insertions(+), 11 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 80c2ad4b9d..e900388485 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -198,6 +198,10 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
protected static final String
CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING =
"clientConnectBookieUnavailableLogThrottling";
+ // client memory limit options
+ protected static final String CLIENT_MEMORY_LIMIT_ENABLED =
"clientMemoryLimitEnabled";
+ protected static final String CLIENT_MEMORY_LIMIT_BY_BYTES =
"clientMemoryLimitByBytes";
+
/**
* Construct a default client-side configuration.
*/
@@ -2008,6 +2012,24 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING,
5_000L);
}
+ public ClientConfiguration setClientMemoryLimitEnabled(boolean enabled) {
+ setProperty(CLIENT_MEMORY_LIMIT_ENABLED, enabled);
+ return this;
+ }
+
+ public boolean getClientMemoryLimitEnabled() {
+ return getBoolean(CLIENT_MEMORY_LIMIT_ENABLED, false);
+ }
+
+ public ClientConfiguration setClientMemoryLimitByBytes(int bytes) {
+ setProperty(CLIENT_MEMORY_LIMIT_BY_BYTES, bytes);
+ return this;
+ }
+
+ public int getClientMemoryLimitByBytes() {
+ return getInt(CLIENT_MEMORY_LIMIT_BY_BYTES, 64 * 1024 * 1024);
+ }
+
@Override
protected ClientConfiguration getThis() {
return this;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index 03e3c068e4..b9493f43b2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -38,6 +38,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -52,6 +53,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.MemoryLimitController;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -65,6 +67,7 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
@@ -103,6 +106,7 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
private final BookieAddressResolver bookieAddressResolver;
private final long bookieErrorThresholdPerInterval;
+ private Optional<MemoryLimitController> memoryLimitController;
public BookieClientImpl(ClientConfiguration conf, EventLoopGroup
eventLoopGroup,
ByteBufAllocator allocator,
@@ -137,6 +141,16 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
} else {
this.timeoutFuture = null;
}
+
+ if (conf.getClientMemoryLimitEnabled()) {
+ memoryLimitController = Optional.of(new
MemoryLimitController(conf.getClientMemoryLimitByBytes()));
+ } else {
+ memoryLimitController = Optional.empty();
+ }
+ }
+
+ public Optional<MemoryLimitController> getMemoryLimitController() {
+ return memoryLimitController;
}
private int getRc(int rc) {
@@ -325,11 +339,33 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
// Retain the buffer, since the connection could be obtained after
// the PendingApp might have already failed
toSend.retain();
-
+ Optional<WriteAndFlushCallback> callback = Optional.empty();
+ try {
+ callback = setMemoryLimit(entryId, toSend.readableBytes());
+ } catch (InterruptedException e) {
+ completeAdd(getRc(BKException.Code.IllegalOpException), ledgerId,
entryId, addr, cb, ctx);
+ LOG.error("Failed to set memory limit when adding entry {}:{}",
ledgerId, entryId, e);
+ return;
+ }
client.obtain(ChannelReadyForAddEntryCallback.create(
- this, toSend, ledgerId, entryId, addr,
- ctx, cb, options, masterKey, allowFastFail,
writeFlags),
- ledgerId);
+ this, toSend, ledgerId, entryId, addr,
+ ctx, cb, options, masterKey, allowFastFail, writeFlags,
callback),
+ ledgerId);
+ }
+
+ private Optional<WriteAndFlushCallback> setMemoryLimit(final long entryId,
final long entrySize) throws InterruptedException {
+ if (getMemoryLimitController().isPresent()) {
+ MemoryLimitController mlc = getMemoryLimitController().get();
+ mlc.reserveMemory(entrySize);
+ LOG.debug("Acquire memory size {} for entry {}, current usage {}
", entrySize, entryId,
+ mlc.currentUsage());
+ WriteAndFlushCallbackImpl callback = new
WriteAndFlushCallbackImpl();
+ callback.setBookieClient(this);
+ callback.setSize(entrySize);
+ callback.setEntryId(entryId);
+ return Optional.of(callback);
+ }
+ return Optional.empty();
}
@Override
@@ -378,6 +414,31 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
}
}
+ private static class WriteAndFlushCallbackImpl implements
WriteAndFlushCallback {
+
+ private BookieClientImpl bookieClient;
+ private long size;
+ private long entryId;
+
+ public void setBookieClient(BookieClientImpl bookieClient) {
+ this.bookieClient = bookieClient;
+ }
+
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ public void setEntryId(long entryId) {
+ this.entryId = entryId;
+ }
+
+ @Override
+ public void complete() {
+ bookieClient.getMemoryLimitController().get().releaseMemory(size);
+ LOG.debug("Release memory size {} for entry {}", size, entryId);
+ }
+ }
+
private static class ChannelReadyForAddEntryCallback
implements GenericCallback<PerChannelBookieClient> {
private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
@@ -393,12 +454,13 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
private byte[] masterKey;
private boolean allowFastFail;
private EnumSet<WriteFlag> writeFlags;
+ private Optional<WriteAndFlushCallback> writeAndFlushCallback;
static ChannelReadyForAddEntryCallback create(
BookieClientImpl bookieClient, ByteBufList toSend, long
ledgerId,
long entryId, BookieId addr, Object ctx,
WriteCallback cb, int options, byte[] masterKey, boolean
allowFastFail,
- EnumSet<WriteFlag> writeFlags) {
+ EnumSet<WriteFlag> writeFlags, Optional<WriteAndFlushCallback>
writeAndFlushCallback) {
ChannelReadyForAddEntryCallback callback = RECYCLER.get();
callback.bookieClient = bookieClient;
callback.toSend = toSend;
@@ -411,6 +473,7 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
callback.masterKey = masterKey;
callback.allowFastFail = allowFastFail;
callback.writeFlags = writeFlags;
+ callback.writeAndFlushCallback = writeAndFlushCallback;
return callback;
}
@@ -421,7 +484,7 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
} else {
pcbc.addEntry(ledgerId, masterKey, entryId,
- toSend, cb, ctx, options, allowFastFail,
writeFlags);
+ toSend, cb, ctx, options, allowFastFail, writeFlags,
writeAndFlushCallback);
}
toSend.release();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 67f83e9ce5..e36b579942 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -298,6 +298,8 @@ public class BookieRequestProcessor implements
RequestProcessor {
}
}
+ private static final String SLEEP_TIME =
System.getenv("SLEEP_TIME_FOR_TESTING");
+
@Override
public void processRequest(Object msg, Channel c) {
// If we can decode this packet as a Request protobuf packet, process
@@ -309,6 +311,14 @@ public class BookieRequestProcessor implements
RequestProcessor {
BookkeeperProtocol.BKPacketHeader header = r.getHeader();
switch (header.getOperation()) {
case ADD_ENTRY:
+ LOG.info("Using v3 protocol to resolve the request,
wait for {}", SLEEP_TIME);
+ int seconds = Integer.parseInt(SLEEP_TIME);
+ try {
+ TimeUnit.MILLISECONDS.sleep(seconds);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ LOG.info("Continue to process the add entry request");
processAddRequestV3(r, c);
break;
case READ_ENTRY:
@@ -364,6 +374,14 @@ public class BookieRequestProcessor implements
RequestProcessor {
// process packet
switch (r.getOpCode()) {
case BookieProtocol.ADDENTRY:
+ LOG.info("Using v2 protocol to resolve the request, wait
for {}", SLEEP_TIME);
+ int seconds = Integer.parseInt(SLEEP_TIME);
+ try {
+ TimeUnit.MILLISECONDS.sleep(seconds);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ LOG.info("Continue to process the add entry request");
checkArgument(r instanceof
BookieProtocol.ParsedAddRequest);
processAddRequest((BookieProtocol.ParsedAddRequest) r, c);
break;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index 9904c90aef..f6333f6ac5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -80,6 +80,10 @@ public class BookkeeperInternalCallbacks {
void writeComplete(int rc, long ledgerId, long entryId, BookieId addr,
Object ctx);
}
+ public interface WriteAndFlushCallback {
+ void complete();
+ }
+
/**
* A last-add-confirmed (LAC) reader callback interface.
*/
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 12209a636d..56d93b77dd 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -104,6 +104,7 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.StartTLSCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -754,7 +755,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
* WriteFlags
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId,
ByteBufList toSend, WriteCallback cb,
- Object ctx, final int options, boolean allowFastFail, final
EnumSet<WriteFlag> writeFlags) {
+ Object ctx, final int options, boolean allowFastFail, final
EnumSet<WriteFlag> writeFlags,
+ Optional<WriteAndFlushCallback> wfc) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
@@ -763,6 +765,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
executor.executeOrdered(ledgerId, () -> {
cb.writeComplete(BKException.Code.IllegalOpException,
ledgerId, entryId, bookieId, ctx);
});
+ wfc.ifPresent(WriteAndFlushCallback::complete);
return;
}
completionKey = acquireV2Key(ledgerId, entryId,
OperationType.ADD_ENTRY);
@@ -822,10 +825,11 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
// because we need to release toSend.
errorOut(completionKey);
toSend.release();
+ wfc.ifPresent(WriteAndFlushCallback::complete);
return;
} else {
// addEntry times out on backpressure
- writeAndFlush(c, completionKey, request, allowFastFail);
+ writeAndFlush(c, completionKey, request, allowFastFail, wfc);
}
}
@@ -1107,9 +1111,18 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
private void writeAndFlush(final Channel channel,
- final CompletionKey key,
- final Object request,
- final boolean allowFastFail) {
+ final CompletionKey key,
+ final Object request,
+ final boolean allowFastFail) {
+ writeAndFlush(channel, key, request, allowFastFail, Optional.empty());
+
+ }
+
+ private void writeAndFlush(final Channel channel,
+ final CompletionKey key,
+ final Object request,
+ final boolean allowFastFail,
+ final Optional<WriteAndFlushCallback> wfc) {
if (channel == null) {
LOG.warn("Operation {} failed: channel == null",
StringUtils.requestToString(request));
errorOut(key);
@@ -1143,6 +1156,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime),
TimeUnit.NANOSECONDS);
}
+ wfc.ifPresent(WriteAndFlushCallback::complete);
});
channel.writeAndFlush(request, promise);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 944203c3e5..9d3ed9cf0d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -33,6 +33,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@@ -343,4 +344,19 @@ public class BookieClientTest {
assertEquals("BookieInfoSuccessCount", expectedBookieInfoSuccessCount,
perChannelBookieClientScopeOfThisAddr.getSuccessCount());
}
+
+ @Test
+ public void testMemoryLimit() throws Exception {
+ ResultStruct arc = new ResultStruct();
+ BookieId addr = bs.getBookieId();
+ byte[] pwd = "".getBytes(StandardCharsets.UTF_8);
+
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setClientMemoryLimitEnabled(true);
+ conf.setClientMemoryLimitByBytes(10);
+ BookieClient bc = new BookieClientImpl();
+ ByteBufList bb = createByteBuffer(1, 1, 1);
+ System.out.println(bb.readableBytes());
+// bc.addEntry(addr, 1, pwd, 1, );
+ }
}