This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 0b0504fed0 [BP-62] Bookie server introduces the
BatchedReadEntryProcessor to handle batch read request (#4187)
0b0504fed0 is described below
commit 0b0504fed09fccfb4e71c19ea29d97b8bcb6a416
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Jan 24 10:59:24 2024 +0800
[BP-62] Bookie server introduces the BatchedReadEntryProcessor to handle
batch read request (#4187)
Descriptions of the changes in this PR:
This is the second PR for the batch
read(https://github.com/apache/bookkeeper/pull/4051) feature.
Bookie server introduces the BatchedReadEntryProcessor to handle batch read
request
---
.../bookkeeper/conf/ServerConfiguration.java | 23 +++
.../proto/BatchedReadEntryProcessor.java | 109 ++++++++++
.../apache/bookkeeper/proto/BookieProtocol.java | 2 +-
.../bookkeeper/proto/BookieRequestProcessor.java | 7 +-
.../bookkeeper/proto/ReadEntryProcessor.java | 35 +++-
.../apache/bookkeeper/proto/ResponseBuilder.java | 14 +-
.../proto/BatchedReadEntryProcessorTest.java | 224 +++++++++++++++++++++
7 files changed, 399 insertions(+), 15 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 454069a04d..5b12a8f9e4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -344,6 +344,9 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD =
"skipReplayJournalInvalidRecord";
+ protected static final String MAX_BATCH_READ_SIZE = "maxBatchReadSize";
+ protected static final int DEFAULT_MAX_BATCH_READ_SIZE = 5 * 1024 * 1024;
// 5MB
+
/**
* Construct a default configuration object.
*/
@@ -4126,4 +4129,24 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
public int getMaxOperationNumbersInSingleRocksDBBatch() {
return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH,
100000);
}
+
+ /**
+ * Set the max batch read size.
+ *
+ * @param maxBatchReadSize
+ * @return
+ */
+ public ServerConfiguration setMaxBatchReadSize(long maxBatchReadSize) {
+ this.setProperty(MAX_BATCH_READ_SIZE, maxBatchReadSize);
+ return this;
+ }
+
+ /**
+ * Get the max batch read size.
+ *
+ * @return
+ */
+ public long getMaxBatchReadSize() {
+ return this.getLong(MAX_BATCH_READ_SIZE, DEFAULT_MAX_BATCH_READ_SIZE);
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
new file mode 100644
index 0000000000..700952042f
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCounted;
+import java.util.concurrent.ExecutorService;
+import org.apache.bookkeeper.proto.BookieProtocol.BatchedReadRequest;
+import org.apache.bookkeeper.util.ByteBufList;
+
+public class BatchedReadEntryProcessor extends ReadEntryProcessor {
+
+ private long maxBatchReadSize;
+
+ public static BatchedReadEntryProcessor create(BatchedReadRequest request,
+ BookieRequestHandler requestHandler,
+ BookieRequestProcessor requestProcessor,
+ ExecutorService fenceThreadPool,
+ boolean throttleReadResponses,
+ long maxBatchReadSize) {
+ BatchedReadEntryProcessor rep = RECYCLER.get();
+ rep.init(request, requestHandler, requestProcessor);
+ rep.fenceThreadPool = fenceThreadPool;
+ rep.throttleReadResponses = throttleReadResponses;
+ rep.maxBatchReadSize = maxBatchReadSize;
+ requestProcessor.onReadRequestStart(requestHandler.ctx().channel());
+ return rep;
+ }
+
+ @Override
+ protected ReferenceCounted readData() throws Exception {
+ ByteBufList data = null;
+ BatchedReadRequest batchRequest = (BatchedReadRequest) request;
+ int maxCount = batchRequest.getMaxCount();
+ if (maxCount <= 0) {
+ maxCount = Integer.MAX_VALUE;
+ }
+ long maxSize = Math.min(batchRequest.getMaxSize(), maxBatchReadSize);
+ //See BookieProtoEncoding.ResponseEnDeCoderPreV3#encode on
BatchedReadResponse case.
+ long frameSize = 24 + 8 + 4;
+ for (int i = 0; i < maxCount; i++) {
+ try {
+ ByteBuf entry =
requestProcessor.getBookie().readEntry(request.getLedgerId(),
request.getEntryId() + i);
+ frameSize += entry.readableBytes() + 4;
+ if (data == null) {
+ data = ByteBufList.get(entry);
+ } else {
+ if (frameSize > maxSize) {
+ entry.release();
+ break;
+ }
+ data.add(entry);
+ }
+ } catch (Throwable e) {
+ if (data == null) {
+ throw e;
+ }
+ break;
+ }
+ }
+ return data;
+ }
+
+ @Override
+ protected BookieProtocol.Response buildReadResponse(ReferenceCounted data)
{
+ return ResponseBuilder.buildBatchedReadResponse((ByteBufList) data,
(BatchedReadRequest) request);
+ }
+
+ protected void recycle() {
+ request.recycle();
+ super.reset();
+ if (this.recyclerHandle != null) {
+ this.recyclerHandle.recycle(this);
+ }
+ }
+
+ private final Recycler.Handle<BatchedReadEntryProcessor> recyclerHandle;
+
+ private
BatchedReadEntryProcessor(Recycler.Handle<BatchedReadEntryProcessor>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<BatchedReadEntryProcessor> RECYCLER = new
Recycler<BatchedReadEntryProcessor>() {
+ @Override
+ protected BatchedReadEntryProcessor
newObject(Recycler.Handle<BatchedReadEntryProcessor> handle) {
+ return new BatchedReadEntryProcessor(handle);
+ }
+ };
+
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 68b8d61988..b3243a992c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -330,7 +330,7 @@ public interface BookieProtocol {
private final Handle<ReadRequest> recyclerHandle;
- private ReadRequest() {
+ protected ReadRequest() {
recyclerHandle = null;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index a77b3d7bb5..ed2fe58760 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -677,8 +677,11 @@ public class BookieRequestProcessor implements
RequestProcessor {
private void processReadRequest(final BookieProtocol.ReadRequest r, final
BookieRequestHandler requestHandler) {
ExecutorService fenceThreadPool =
null == highPriorityThreadPool ? null :
highPriorityThreadPool.chooseThread(requestHandler.ctx());
- ReadEntryProcessor read = ReadEntryProcessor.create(r, requestHandler,
- this, fenceThreadPool, throttleReadResponses);
+ ReadEntryProcessor read = r instanceof
BookieProtocol.BatchedReadRequest
+ ?
BatchedReadEntryProcessor.create((BookieProtocol.BatchedReadRequest) r,
requestHandler,
+ this, fenceThreadPool, throttleReadResponses,
serverCfg.getMaxBatchReadSize())
+ : ReadEntryProcessor.create(r, requestHandler,
+ this, fenceThreadPool, throttleReadResponses);
// If it's a high priority read (fencing or as part of recovery
process), we want to make sure it
// gets executed as fast as possible, so bypass the normal
readThreadPool
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 04efd9634b..d321623c54 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -39,8 +40,8 @@ import org.slf4j.LoggerFactory;
class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
private static final Logger LOG =
LoggerFactory.getLogger(ReadEntryProcessor.class);
- private ExecutorService fenceThreadPool;
- private boolean throttleReadResponses;
+ protected ExecutorService fenceThreadPool;
+ protected boolean throttleReadResponses;
public static ReadEntryProcessor create(ReadRequest request,
BookieRequestHandler
requestHandler,
@@ -70,7 +71,7 @@ class ReadEntryProcessor extends
PacketProcessorBase<ReadRequest> {
}
int errorCode = BookieProtocol.EOK;
long startTimeNanos = MathUtils.nowInNano();
- ByteBuf data = null;
+ ReferenceCounted data = null;
try {
CompletableFuture<Boolean> fenceResult = null;
if (request.isFencing()) {
@@ -85,9 +86,9 @@ class ReadEntryProcessor extends
PacketProcessorBase<ReadRequest> {
throw
BookieException.create(BookieException.Code.UnauthorizedAccessException);
}
}
- data =
requestProcessor.getBookie().readEntry(request.getLedgerId(),
request.getEntryId());
+ data = readData();
if (LOG.isDebugEnabled()) {
- LOG.debug("##### Read entry ##### {} -- ref-count: {}",
data.readableBytes(), data.refCnt());
+ LOG.debug("##### Read entry ##### -- ref-count: {}",
data.refCnt());
}
if (fenceResult != null) {
handleReadResultForFenceRead(fenceResult, data,
startTimeNanos);
@@ -126,13 +127,17 @@ class ReadEntryProcessor extends
PacketProcessorBase<ReadRequest> {
sendResponse(data, errorCode, startTimeNanos);
}
- private void sendResponse(ByteBuf data, int errorCode, long
startTimeNanos) {
+ protected ReferenceCounted readData() throws Exception {
+ return requestProcessor.getBookie().readEntry(request.getLedgerId(),
request.getEntryId());
+ }
+
+ private void sendResponse(ReferenceCounted data, int errorCode, long
startTimeNanos) {
final RequestStats stats = requestProcessor.getRequestStats();
final OpStatsLogger logger = stats.getReadEntryStats();
BookieProtocol.Response response;
if (errorCode == BookieProtocol.EOK) {
logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
- response = ResponseBuilder.buildReadResponse(data, request);
+ response = buildReadResponse(data);
} else {
if (data != null) {
ReferenceCountUtil.release(data);
@@ -145,13 +150,17 @@ class ReadEntryProcessor extends
PacketProcessorBase<ReadRequest> {
recycle();
}
- private void sendFenceResponse(Boolean result, ByteBuf data, long
startTimeNanos) {
+ protected BookieProtocol.Response buildReadResponse(ReferenceCounted data)
{
+ return ResponseBuilder.buildReadResponse((ByteBuf) data, request);
+ }
+
+ private void sendFenceResponse(Boolean result, ReferenceCounted data, long
startTimeNanos) {
final int retCode = result != null && result ? BookieProtocol.EOK :
BookieProtocol.EIO;
sendResponse(data, retCode, startTimeNanos);
}
private void handleReadResultForFenceRead(CompletableFuture<Boolean>
fenceResult,
- ByteBuf data,
+ ReferenceCounted data,
long startTimeNanos) {
if (null != fenceThreadPool) {
fenceResult.whenCompleteAsync(new FutureEventListener<Boolean>() {
@@ -192,7 +201,9 @@ class ReadEntryProcessor extends
PacketProcessorBase<ReadRequest> {
void recycle() {
request.recycle();
super.reset();
- this.recyclerHandle.recycle(this);
+ if (this.recyclerHandle != null) {
+ this.recyclerHandle.recycle(this);
+ }
}
private final Recycler.Handle<ReadEntryProcessor> recyclerHandle;
@@ -201,6 +212,10 @@ class ReadEntryProcessor extends
PacketProcessorBase<ReadRequest> {
this.recyclerHandle = recyclerHandle;
}
+ protected ReadEntryProcessor() {
+ this.recyclerHandle = null;
+ }
+
private static final Recycler<ReadEntryProcessor> RECYCLER = new
Recycler<ReadEntryProcessor>() {
@Override
protected ReadEntryProcessor
newObject(Recycler.Handle<ReadEntryProcessor> handle) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
index 563c0a1352..5d010ec6dd 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
@@ -21,16 +21,21 @@
package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.util.ByteBufList;
class ResponseBuilder {
static BookieProtocol.Response buildErrorResponse(int errorCode,
BookieProtocol.Request r) {
if (r.getOpCode() == BookieProtocol.ADDENTRY) {
return BookieProtocol.AddResponse.create(r.getProtocolVersion(),
errorCode,
r.getLedgerId(),
r.getEntryId());
- } else {
- assert(r.getOpCode() == BookieProtocol.READENTRY);
+ } else if (r.getOpCode() == BookieProtocol.READENTRY) {
return new BookieProtocol.ReadResponse(r.getProtocolVersion(),
errorCode,
r.getLedgerId(),
r.getEntryId());
+ } else {
+ assert(r.getOpCode() == BookieProtocol.BATCH_READ_ENTRY);
+ return new
BookieProtocol.BatchedReadResponse(r.getProtocolVersion(), errorCode,
+ r.getLedgerId(), r.getEntryId(),
((BookieProtocol.BatchedReadRequest) r).getRequestId(),
+ null);
}
}
@@ -43,4 +48,9 @@ class ResponseBuilder {
return new BookieProtocol.ReadResponse(r.getProtocolVersion(),
BookieProtocol.EOK,
r.getLedgerId(), r.getEntryId(), data);
}
+
+ static BookieProtocol.Response buildBatchedReadResponse(ByteBufList data,
BookieProtocol.BatchedReadRequest r) {
+ return new BookieProtocol.BatchedReadResponse(r.getProtocolVersion(),
BookieProtocol.EOK,
+ r.getLedgerId(), r.getEntryId(), r.getRequestId(), data);
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java
new file mode 100644
index 0000000000..3f89755838
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.EventLoop;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.proto.BookieProtocol.Response;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+
+
+/**
+ * Unit test {@link ReadEntryProcessor}.
+ */
+public class BatchedReadEntryProcessorTest {
+
+ private Channel channel;
+ private BookieRequestHandler requestHandler;
+ private BookieRequestProcessor requestProcessor;
+ private Bookie bookie;
+
+ @Before
+ public void setup() throws IOException, BookieException {
+ channel = mock(Channel.class);
+ when(channel.isOpen()).thenReturn(true);
+
+ requestHandler = mock(BookieRequestHandler.class);
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(channel);
+ when(requestHandler.ctx()).thenReturn(ctx);
+
+ bookie = mock(Bookie.class);
+ requestProcessor = mock(BookieRequestProcessor.class);
+ when(requestProcessor.getBookie()).thenReturn(bookie);
+
when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
+ when(requestProcessor.getRequestStats()).thenReturn(new
RequestStats(NullStatsLogger.INSTANCE));
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+ EventLoop eventLoop = mock(EventLoop.class);
+ when(eventLoop.inEventLoop()).thenReturn(true);
+ when(channel.eventLoop()).thenReturn(eventLoop);
+ ByteBuf buffer0 = ByteBufAllocator.DEFAULT.buffer(4);
+ ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer(4);
+ ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer(4);
+ ByteBuf buffer3 = ByteBufAllocator.DEFAULT.buffer(4);
+ ByteBuf buffer4 = ByteBufAllocator.DEFAULT.buffer(4);
+
+ when(bookie.readEntry(anyLong(),
anyLong())).thenReturn(buffer0).thenReturn(buffer1).thenReturn(buffer2)
+ .thenReturn(buffer3).thenReturn(buffer4);
+ }
+
+ @Test
+ public void testSuccessfulAsynchronousFenceRequest() throws Exception {
+ testAsynchronousRequest(true, BookieProtocol.EOK);
+ }
+
+ @Test
+ public void testFailedAsynchronousFenceRequest() throws Exception {
+ testAsynchronousRequest(false, BookieProtocol.EIO);
+ }
+
+ private void testAsynchronousRequest(boolean result, int errorCode) throws
Exception {
+ CompletableFuture<Boolean> fenceResult = FutureUtils.createFuture();
+ when(bookie.fenceLedger(anyLong(), any())).thenReturn(fenceResult);
+
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ promise.setSuccess();
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any(Response.class));
+
+ long requestId = 0;
+ int maxCount = 5;
+ long maxSize = 1024;
+ ExecutorService service = Executors.newCachedThreadPool();
+ long ledgerId = System.currentTimeMillis();
+ BookieProtocol.BatchedReadRequest request =
BookieProtocol.BatchedReadRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1,
BookieProtocol.FLAG_DO_FENCING, new byte[] {},
+ requestId, maxCount, maxSize);
+ ReadEntryProcessor processor = BatchedReadEntryProcessor.create(
+ request, requestHandler, requestProcessor, service, true, 1024
* 1024 * 5);
+ processor.run();
+
+ fenceResult.complete(result);
+ latch.await();
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(1, response.getEntryId());
+ assertEquals(ledgerId, response.getLedgerId());
+ assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode());
+ assertEquals(errorCode, response.getErrorCode());
+ service.shutdown();
+ }
+
+ @Test
+ public void testSuccessfulSynchronousFenceRequest() throws Exception {
+ testSynchronousRequest(true, BookieProtocol.EOK);
+ }
+
+ @Test
+ public void testFailedSynchronousFenceRequest() throws Exception {
+ testSynchronousRequest(false, BookieProtocol.EIO);
+ }
+
+ private void testSynchronousRequest(boolean result, int errorCode) throws
Exception {
+ CompletableFuture<Boolean> fenceResult = FutureUtils.createFuture();
+ when(bookie.fenceLedger(anyLong(), any())).thenReturn(fenceResult);
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ promise.setSuccess();
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any(Response.class));
+
+ long requestId = 0;
+ int maxCount = 5;
+ long maxSize = 1024;
+ ExecutorService service = Executors.newCachedThreadPool();
+ long ledgerId = System.currentTimeMillis();
+ BookieProtocol.BatchedReadRequest request =
BookieProtocol.BatchedReadRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1,
BookieProtocol.FLAG_DO_FENCING, new byte[] {},
+ requestId, maxCount, maxSize);
+ ReadEntryProcessor processor = BatchedReadEntryProcessor.create(
+ request, requestHandler, requestProcessor, service, true, 1024
* 1024 * 5);
+ fenceResult.complete(result);
+ processor.run();
+
+ latch.await();
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(1, response.getEntryId());
+ assertEquals(ledgerId, response.getLedgerId());
+ assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode());
+ assertEquals(errorCode, response.getErrorCode());
+ }
+
+ @Test
+ public void testNonFenceRequest() throws Exception {
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ promise.setSuccess();
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any(Response.class));
+
+ long requestId = 0;
+ int maxCount = 5;
+ long maxSize = 1024;
+ ExecutorService service = Executors.newCachedThreadPool();
+ long ledgerId = System.currentTimeMillis();
+ BookieProtocol.BatchedReadRequest request =
BookieProtocol.BatchedReadRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1,
BookieProtocol.FLAG_DO_FENCING, new byte[] {},
+ requestId, maxCount, maxSize);
+ ReadEntryProcessor processor = BatchedReadEntryProcessor.create(
+ request, requestHandler, requestProcessor, service, true, 1024
* 1024 * 5);
+ processor.run();
+
+ latch.await();
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(1, response.getEntryId());
+ assertEquals(ledgerId, response.getLedgerId());
+ assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode());
+ assertEquals(BookieProtocol.EOK, response.getErrorCode());
+ }
+}
\ No newline at end of file