This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 4b446619872ead696f01ea092e590a8a0e582134 Author: Yong Zhang <[email protected]> AuthorDate: Mon Jun 13 15:37:15 2022 +0800 Revert "Testing the memory limit" This reverts commit dab1e981bc1497341f108e55de2dc3c5f1447615. --- .../bookkeeper/conf/ClientConfiguration.java | 22 ------- .../apache/bookkeeper/proto/BookieClientImpl.java | 75 ++-------------------- .../bookkeeper/proto/BookieRequestProcessor.java | 18 ------ .../proto/BookkeeperInternalCallbacks.java | 4 -- .../bookkeeper/proto/PerChannelBookieClient.java | 24 ++----- .../apache/bookkeeper/test/BookieClientTest.java | 16 ----- 6 files changed, 11 insertions(+), 148 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index e900388485..80c2ad4b9d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -198,10 +198,6 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati protected static final String CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING = "clientConnectBookieUnavailableLogThrottling"; - // client memory limit options - protected static final String CLIENT_MEMORY_LIMIT_ENABLED = "clientMemoryLimitEnabled"; - protected static final String CLIENT_MEMORY_LIMIT_BY_BYTES = "clientMemoryLimitByBytes"; - /** * Construct a default client-side configuration. */ @@ -2012,24 +2008,6 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING, 5_000L); } - public ClientConfiguration setClientMemoryLimitEnabled(boolean enabled) { - setProperty(CLIENT_MEMORY_LIMIT_ENABLED, enabled); - return this; - } - - public boolean getClientMemoryLimitEnabled() { - return getBoolean(CLIENT_MEMORY_LIMIT_ENABLED, false); - } - - public ClientConfiguration setClientMemoryLimitByBytes(int bytes) { - setProperty(CLIENT_MEMORY_LIMIT_BY_BYTES, bytes); - return this; - } - - public int getClientMemoryLimitByBytes() { - return getInt(CLIENT_MEMORY_LIMIT_BY_BYTES, 64 * 1024 * 1024); - } - @Override protected ClientConfiguration getThis() { return this; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java index b9493f43b2..03e3c068e4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java @@ -38,7 +38,6 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.util.EnumSet; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -53,7 +52,6 @@ import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.client.api.WriteFlag; -import org.apache.bookkeeper.common.util.MemoryLimitController; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.SafeRunnable; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -67,7 +65,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.tls.SecurityException; @@ -106,7 +103,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac private final BookieAddressResolver bookieAddressResolver; private final long bookieErrorThresholdPerInterval; - private Optional<MemoryLimitController> memoryLimitController; public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup, ByteBufAllocator allocator, @@ -141,16 +137,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac } else { this.timeoutFuture = null; } - - if (conf.getClientMemoryLimitEnabled()) { - memoryLimitController = Optional.of(new MemoryLimitController(conf.getClientMemoryLimitByBytes())); - } else { - memoryLimitController = Optional.empty(); - } - } - - public Optional<MemoryLimitController> getMemoryLimitController() { - return memoryLimitController; } private int getRc(int rc) { @@ -339,33 +325,11 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac // Retain the buffer, since the connection could be obtained after // the PendingApp might have already failed toSend.retain(); - Optional<WriteAndFlushCallback> callback = Optional.empty(); - try { - callback = setMemoryLimit(entryId, toSend.readableBytes()); - } catch (InterruptedException e) { - completeAdd(getRc(BKException.Code.IllegalOpException), ledgerId, entryId, addr, cb, ctx); - LOG.error("Failed to set memory limit when adding entry {}:{}", ledgerId, entryId, e); - return; - } - client.obtain(ChannelReadyForAddEntryCallback.create( - this, toSend, ledgerId, entryId, addr, - ctx, cb, options, masterKey, allowFastFail, writeFlags, callback), - ledgerId); - } - private Optional<WriteAndFlushCallback> setMemoryLimit(final long entryId, final long entrySize) throws InterruptedException { - if (getMemoryLimitController().isPresent()) { - MemoryLimitController mlc = getMemoryLimitController().get(); - mlc.reserveMemory(entrySize); - LOG.debug("Acquire memory size {} for entry {}, current usage {} ", entrySize, entryId, - mlc.currentUsage()); - WriteAndFlushCallbackImpl callback = new WriteAndFlushCallbackImpl(); - callback.setBookieClient(this); - callback.setSize(entrySize); - callback.setEntryId(entryId); - return Optional.of(callback); - } - return Optional.empty(); + client.obtain(ChannelReadyForAddEntryCallback.create( + this, toSend, ledgerId, entryId, addr, + ctx, cb, options, masterKey, allowFastFail, writeFlags), + ledgerId); } @Override @@ -414,31 +378,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac } } - private static class WriteAndFlushCallbackImpl implements WriteAndFlushCallback { - - private BookieClientImpl bookieClient; - private long size; - private long entryId; - - public void setBookieClient(BookieClientImpl bookieClient) { - this.bookieClient = bookieClient; - } - - public void setSize(long size) { - this.size = size; - } - - public void setEntryId(long entryId) { - this.entryId = entryId; - } - - @Override - public void complete() { - bookieClient.getMemoryLimitController().get().releaseMemory(size); - LOG.debug("Release memory size {} for entry {}", size, entryId); - } - } - private static class ChannelReadyForAddEntryCallback implements GenericCallback<PerChannelBookieClient> { private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle; @@ -454,13 +393,12 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac private byte[] masterKey; private boolean allowFastFail; private EnumSet<WriteFlag> writeFlags; - private Optional<WriteAndFlushCallback> writeAndFlushCallback; static ChannelReadyForAddEntryCallback create( BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId, long entryId, BookieId addr, Object ctx, WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail, - EnumSet<WriteFlag> writeFlags, Optional<WriteAndFlushCallback> writeAndFlushCallback) { + EnumSet<WriteFlag> writeFlags) { ChannelReadyForAddEntryCallback callback = RECYCLER.get(); callback.bookieClient = bookieClient; callback.toSend = toSend; @@ -473,7 +411,6 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac callback.masterKey = masterKey; callback.allowFastFail = allowFastFail; callback.writeFlags = writeFlags; - callback.writeAndFlushCallback = writeAndFlushCallback; return callback; } @@ -484,7 +421,7 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx); } else { pcbc.addEntry(ledgerId, masterKey, entryId, - toSend, cb, ctx, options, allowFastFail, writeFlags, writeAndFlushCallback); + toSend, cb, ctx, options, allowFastFail, writeFlags); } toSend.release(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index e36b579942..67f83e9ce5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -298,8 +298,6 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private static final String SLEEP_TIME = System.getenv("SLEEP_TIME_FOR_TESTING"); - @Override public void processRequest(Object msg, Channel c) { // If we can decode this packet as a Request protobuf packet, process @@ -311,14 +309,6 @@ public class BookieRequestProcessor implements RequestProcessor { BookkeeperProtocol.BKPacketHeader header = r.getHeader(); switch (header.getOperation()) { case ADD_ENTRY: - LOG.info("Using v3 protocol to resolve the request, wait for {}", SLEEP_TIME); - int seconds = Integer.parseInt(SLEEP_TIME); - try { - TimeUnit.MILLISECONDS.sleep(seconds); - } catch (InterruptedException e) { - e.printStackTrace(); - } - LOG.info("Continue to process the add entry request"); processAddRequestV3(r, c); break; case READ_ENTRY: @@ -374,14 +364,6 @@ public class BookieRequestProcessor implements RequestProcessor { // process packet switch (r.getOpCode()) { case BookieProtocol.ADDENTRY: - LOG.info("Using v2 protocol to resolve the request, wait for {}", SLEEP_TIME); - int seconds = Integer.parseInt(SLEEP_TIME); - try { - TimeUnit.MILLISECONDS.sleep(seconds); - } catch (InterruptedException e) { - e.printStackTrace(); - } - LOG.info("Continue to process the add entry request"); checkArgument(r instanceof BookieProtocol.ParsedAddRequest); processAddRequest((BookieProtocol.ParsedAddRequest) r, c); break; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java index f6333f6ac5..9904c90aef 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java @@ -80,10 +80,6 @@ public class BookkeeperInternalCallbacks { void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx); } - public interface WriteAndFlushCallback { - void complete(); - } - /** * A last-add-confirmed (LAC) reader callback interface. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 56d93b77dd..12209a636d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -104,7 +104,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.StartTLSCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteAndFlushCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader; @@ -755,8 +754,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { * WriteFlags */ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb, - Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags, - Optional<WriteAndFlushCallback> wfc) { + Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) { Object request = null; CompletionKey completionKey = null; if (useV2WireProtocol) { @@ -765,7 +763,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { executor.executeOrdered(ledgerId, () -> { cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx); }); - wfc.ifPresent(WriteAndFlushCallback::complete); return; } completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY); @@ -825,11 +822,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { // because we need to release toSend. errorOut(completionKey); toSend.release(); - wfc.ifPresent(WriteAndFlushCallback::complete); return; } else { // addEntry times out on backpressure - writeAndFlush(c, completionKey, request, allowFastFail, wfc); + writeAndFlush(c, completionKey, request, allowFastFail); } } @@ -1111,18 +1107,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { } private void writeAndFlush(final Channel channel, - final CompletionKey key, - final Object request, - final boolean allowFastFail) { - writeAndFlush(channel, key, request, allowFastFail, Optional.empty()); - - } - - private void writeAndFlush(final Channel channel, - final CompletionKey key, - final Object request, - final boolean allowFastFail, - final Optional<WriteAndFlushCallback> wfc) { + final CompletionKey key, + final Object request, + final boolean allowFastFail) { if (channel == null) { LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request)); errorOut(key); @@ -1156,7 +1143,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { } else { nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } - wfc.ifPresent(WriteAndFlushCallback::complete); }); channel.writeAndFlush(request, promise); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 9d3ed9cf0d..944203c3e5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -33,7 +33,6 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -344,19 +343,4 @@ public class BookieClientTest { assertEquals("BookieInfoSuccessCount", expectedBookieInfoSuccessCount, perChannelBookieClientScopeOfThisAddr.getSuccessCount()); } - - @Test - public void testMemoryLimit() throws Exception { - ResultStruct arc = new ResultStruct(); - BookieId addr = bs.getBookieId(); - byte[] pwd = "".getBytes(StandardCharsets.UTF_8); - - ClientConfiguration conf = new ClientConfiguration(); - conf.setClientMemoryLimitEnabled(true); - conf.setClientMemoryLimitByBytes(10); - BookieClient bc = new BookieClientImpl(); - ByteBufList bb = createByteBuffer(1, 1, 1); - System.out.println(bb.readableBytes()); -// bc.addEntry(addr, 1, pwd, 1, ); - } }
