This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b0504fed0 [BP-62] Bookie server introduces the 
BatchedReadEntryProcessor to handle batch read request (#4187)
0b0504fed0 is described below

commit 0b0504fed09fccfb4e71c19ea29d97b8bcb6a416
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Jan 24 10:59:24 2024 +0800

    [BP-62] Bookie server introduces the BatchedReadEntryProcessor to handle 
batch read request (#4187)
    
    Descriptions of the changes in this PR:
    This is the second PR for the batch 
read(https://github.com/apache/bookkeeper/pull/4051) feature.
    
    Bookie server introduces the BatchedReadEntryProcessor to handle batch read 
request
---
 .../bookkeeper/conf/ServerConfiguration.java       |  23 +++
 .../proto/BatchedReadEntryProcessor.java           | 109 ++++++++++
 .../apache/bookkeeper/proto/BookieProtocol.java    |   2 +-
 .../bookkeeper/proto/BookieRequestProcessor.java   |   7 +-
 .../bookkeeper/proto/ReadEntryProcessor.java       |  35 +++-
 .../apache/bookkeeper/proto/ResponseBuilder.java   |  14 +-
 .../proto/BatchedReadEntryProcessorTest.java       | 224 +++++++++++++++++++++
 7 files changed, 399 insertions(+), 15 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 454069a04d..5b12a8f9e4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -344,6 +344,9 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
 
     protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = 
"skipReplayJournalInvalidRecord";
 
+    protected static final String MAX_BATCH_READ_SIZE = "maxBatchReadSize";
+    protected static final int DEFAULT_MAX_BATCH_READ_SIZE = 5 * 1024 * 1024; 
// 5MB
+
     /**
      * Construct a default configuration object.
      */
@@ -4126,4 +4129,24 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     public int getMaxOperationNumbersInSingleRocksDBBatch() {
         return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, 
100000);
     }
+
+    /**
+     * Set the max batch read size.
+     *
+     * @param maxBatchReadSize
+     * @return
+     */
+    public ServerConfiguration setMaxBatchReadSize(long maxBatchReadSize) {
+        this.setProperty(MAX_BATCH_READ_SIZE, maxBatchReadSize);
+        return this;
+    }
+
+    /**
+     * Get the max batch read size.
+     *
+     * @return
+     */
+    public long getMaxBatchReadSize() {
+        return this.getLong(MAX_BATCH_READ_SIZE, DEFAULT_MAX_BATCH_READ_SIZE);
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
new file mode 100644
index 0000000000..700952042f
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCounted;
+import java.util.concurrent.ExecutorService;
+import org.apache.bookkeeper.proto.BookieProtocol.BatchedReadRequest;
+import org.apache.bookkeeper.util.ByteBufList;
+
+public class BatchedReadEntryProcessor extends ReadEntryProcessor {
+
+    private long maxBatchReadSize;
+
+    public static BatchedReadEntryProcessor create(BatchedReadRequest request,
+            BookieRequestHandler requestHandler,
+            BookieRequestProcessor requestProcessor,
+            ExecutorService fenceThreadPool,
+            boolean throttleReadResponses,
+            long maxBatchReadSize) {
+        BatchedReadEntryProcessor rep = RECYCLER.get();
+        rep.init(request, requestHandler, requestProcessor);
+        rep.fenceThreadPool = fenceThreadPool;
+        rep.throttleReadResponses = throttleReadResponses;
+        rep.maxBatchReadSize = maxBatchReadSize;
+        requestProcessor.onReadRequestStart(requestHandler.ctx().channel());
+        return rep;
+    }
+
+    @Override
+    protected ReferenceCounted readData() throws Exception {
+        ByteBufList data = null;
+        BatchedReadRequest batchRequest = (BatchedReadRequest) request;
+        int maxCount = batchRequest.getMaxCount();
+        if (maxCount <= 0) {
+            maxCount = Integer.MAX_VALUE;
+        }
+        long maxSize = Math.min(batchRequest.getMaxSize(), maxBatchReadSize);
+        //See BookieProtoEncoding.ResponseEnDeCoderPreV3#encode on 
BatchedReadResponse case.
+        long frameSize = 24 + 8 + 4;
+        for (int i = 0; i < maxCount; i++) {
+            try {
+                ByteBuf entry = 
requestProcessor.getBookie().readEntry(request.getLedgerId(), 
request.getEntryId() + i);
+                frameSize += entry.readableBytes() + 4;
+                if (data == null) {
+                    data = ByteBufList.get(entry);
+                } else {
+                    if (frameSize > maxSize) {
+                        entry.release();
+                        break;
+                    }
+                    data.add(entry);
+                }
+            } catch (Throwable e) {
+                if (data == null) {
+                    throw e;
+                }
+                break;
+            }
+        }
+        return data;
+    }
+
+    @Override
+    protected BookieProtocol.Response buildReadResponse(ReferenceCounted data) 
{
+        return ResponseBuilder.buildBatchedReadResponse((ByteBufList) data, 
(BatchedReadRequest) request);
+    }
+
+    protected void recycle() {
+        request.recycle();
+        super.reset();
+        if (this.recyclerHandle != null) {
+            this.recyclerHandle.recycle(this);
+        }
+    }
+
+    private final Recycler.Handle<BatchedReadEntryProcessor> recyclerHandle;
+
+    private 
BatchedReadEntryProcessor(Recycler.Handle<BatchedReadEntryProcessor> 
recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    private static final Recycler<BatchedReadEntryProcessor> RECYCLER = new 
Recycler<BatchedReadEntryProcessor>() {
+        @Override
+        protected BatchedReadEntryProcessor 
newObject(Recycler.Handle<BatchedReadEntryProcessor> handle) {
+            return new BatchedReadEntryProcessor(handle);
+        }
+    };
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 68b8d61988..b3243a992c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -330,7 +330,7 @@ public interface BookieProtocol {
 
         private final Handle<ReadRequest> recyclerHandle;
 
-        private ReadRequest() {
+        protected ReadRequest() {
             recyclerHandle = null;
         }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index a77b3d7bb5..ed2fe58760 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -677,8 +677,11 @@ public class BookieRequestProcessor implements 
RequestProcessor {
     private void processReadRequest(final BookieProtocol.ReadRequest r, final 
BookieRequestHandler requestHandler) {
         ExecutorService fenceThreadPool =
                 null == highPriorityThreadPool ? null : 
highPriorityThreadPool.chooseThread(requestHandler.ctx());
-        ReadEntryProcessor read = ReadEntryProcessor.create(r, requestHandler,
-                this, fenceThreadPool, throttleReadResponses);
+        ReadEntryProcessor read = r instanceof 
BookieProtocol.BatchedReadRequest
+                ? 
BatchedReadEntryProcessor.create((BookieProtocol.BatchedReadRequest) r, 
requestHandler,
+                this, fenceThreadPool, throttleReadResponses, 
serverCfg.getMaxBatchReadSize())
+                : ReadEntryProcessor.create(r, requestHandler,
+                        this, fenceThreadPool, throttleReadResponses);
 
         // If it's a high priority read (fencing or as part of recovery 
process), we want to make sure it
         // gets executed as fast as possible, so bypass the normal 
readThreadPool
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 04efd9634b..d321623c54 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.proto;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -39,8 +40,8 @@ import org.slf4j.LoggerFactory;
 class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ReadEntryProcessor.class);
 
-    private ExecutorService fenceThreadPool;
-    private boolean throttleReadResponses;
+    protected ExecutorService fenceThreadPool;
+    protected boolean throttleReadResponses;
 
     public static ReadEntryProcessor create(ReadRequest request,
                                             BookieRequestHandler 
requestHandler,
@@ -70,7 +71,7 @@ class ReadEntryProcessor extends 
PacketProcessorBase<ReadRequest> {
         }
         int errorCode = BookieProtocol.EOK;
         long startTimeNanos = MathUtils.nowInNano();
-        ByteBuf data = null;
+        ReferenceCounted data = null;
         try {
             CompletableFuture<Boolean> fenceResult = null;
             if (request.isFencing()) {
@@ -85,9 +86,9 @@ class ReadEntryProcessor extends 
PacketProcessorBase<ReadRequest> {
                     throw 
BookieException.create(BookieException.Code.UnauthorizedAccessException);
                 }
             }
-            data = 
requestProcessor.getBookie().readEntry(request.getLedgerId(), 
request.getEntryId());
+            data = readData();
             if (LOG.isDebugEnabled()) {
-                LOG.debug("##### Read entry ##### {} -- ref-count: {}", 
data.readableBytes(), data.refCnt());
+                LOG.debug("##### Read entry ##### -- ref-count: {}",  
data.refCnt());
             }
             if (fenceResult != null) {
                 handleReadResultForFenceRead(fenceResult, data, 
startTimeNanos);
@@ -126,13 +127,17 @@ class ReadEntryProcessor extends 
PacketProcessorBase<ReadRequest> {
         sendResponse(data, errorCode, startTimeNanos);
     }
 
-    private void sendResponse(ByteBuf data, int errorCode, long 
startTimeNanos) {
+    protected ReferenceCounted readData() throws Exception {
+        return requestProcessor.getBookie().readEntry(request.getLedgerId(), 
request.getEntryId());
+    }
+
+    private void sendResponse(ReferenceCounted data, int errorCode, long 
startTimeNanos) {
         final RequestStats stats = requestProcessor.getRequestStats();
         final OpStatsLogger logger = stats.getReadEntryStats();
         BookieProtocol.Response response;
         if (errorCode == BookieProtocol.EOK) {
             
logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
-            response = ResponseBuilder.buildReadResponse(data, request);
+            response = buildReadResponse(data);
         } else {
             if (data != null) {
                 ReferenceCountUtil.release(data);
@@ -145,13 +150,17 @@ class ReadEntryProcessor extends 
PacketProcessorBase<ReadRequest> {
         recycle();
     }
 
-    private void sendFenceResponse(Boolean result, ByteBuf data, long 
startTimeNanos) {
+    protected BookieProtocol.Response buildReadResponse(ReferenceCounted data) 
{
+        return ResponseBuilder.buildReadResponse((ByteBuf) data, request);
+    }
+
+    private void sendFenceResponse(Boolean result, ReferenceCounted data, long 
startTimeNanos) {
         final int retCode = result != null && result ? BookieProtocol.EOK : 
BookieProtocol.EIO;
         sendResponse(data, retCode, startTimeNanos);
     }
 
     private void handleReadResultForFenceRead(CompletableFuture<Boolean> 
fenceResult,
-                                              ByteBuf data,
+                                              ReferenceCounted data,
                                               long startTimeNanos) {
         if (null != fenceThreadPool) {
             fenceResult.whenCompleteAsync(new FutureEventListener<Boolean>() {
@@ -192,7 +201,9 @@ class ReadEntryProcessor extends 
PacketProcessorBase<ReadRequest> {
     void recycle() {
         request.recycle();
         super.reset();
-        this.recyclerHandle.recycle(this);
+        if (this.recyclerHandle != null) {
+            this.recyclerHandle.recycle(this);
+        }
     }
 
     private final Recycler.Handle<ReadEntryProcessor> recyclerHandle;
@@ -201,6 +212,10 @@ class ReadEntryProcessor extends 
PacketProcessorBase<ReadRequest> {
         this.recyclerHandle = recyclerHandle;
     }
 
+    protected ReadEntryProcessor() {
+        this.recyclerHandle = null;
+    }
+
     private static final Recycler<ReadEntryProcessor> RECYCLER = new 
Recycler<ReadEntryProcessor>() {
         @Override
         protected ReadEntryProcessor 
newObject(Recycler.Handle<ReadEntryProcessor> handle) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
index 563c0a1352..5d010ec6dd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
@@ -21,16 +21,21 @@
 package org.apache.bookkeeper.proto;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.util.ByteBufList;
 
 class ResponseBuilder {
     static BookieProtocol.Response buildErrorResponse(int errorCode, 
BookieProtocol.Request r) {
         if (r.getOpCode() == BookieProtocol.ADDENTRY) {
             return BookieProtocol.AddResponse.create(r.getProtocolVersion(), 
errorCode,
                                                   r.getLedgerId(), 
r.getEntryId());
-        } else {
-            assert(r.getOpCode() == BookieProtocol.READENTRY);
+        } else if (r.getOpCode() == BookieProtocol.READENTRY) {
             return new BookieProtocol.ReadResponse(r.getProtocolVersion(), 
errorCode,
                                                    r.getLedgerId(), 
r.getEntryId());
+        } else {
+            assert(r.getOpCode() == BookieProtocol.BATCH_READ_ENTRY);
+            return new 
BookieProtocol.BatchedReadResponse(r.getProtocolVersion(), errorCode,
+                    r.getLedgerId(), r.getEntryId(), 
((BookieProtocol.BatchedReadRequest) r).getRequestId(),
+                    null);
         }
     }
 
@@ -43,4 +48,9 @@ class ResponseBuilder {
         return new BookieProtocol.ReadResponse(r.getProtocolVersion(), 
BookieProtocol.EOK,
                 r.getLedgerId(), r.getEntryId(), data);
     }
+
+    static BookieProtocol.Response buildBatchedReadResponse(ByteBufList data, 
BookieProtocol.BatchedReadRequest r) {
+        return new BookieProtocol.BatchedReadResponse(r.getProtocolVersion(), 
BookieProtocol.EOK,
+                r.getLedgerId(), r.getEntryId(), r.getRequestId(), data);
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java
new file mode 100644
index 0000000000..3f89755838
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.EventLoop;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.proto.BookieProtocol.Response;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+
+
+/**
+ * Unit test {@link ReadEntryProcessor}.
+ */
+public class BatchedReadEntryProcessorTest {
+
+    private Channel channel;
+    private BookieRequestHandler requestHandler;
+    private BookieRequestProcessor requestProcessor;
+    private Bookie bookie;
+
+    @Before
+    public void setup() throws IOException, BookieException {
+        channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
+
+        requestHandler = mock(BookieRequestHandler.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(channel);
+        when(requestHandler.ctx()).thenReturn(ctx);
+
+        bookie = mock(Bookie.class);
+        requestProcessor = mock(BookieRequestProcessor.class);
+        when(requestProcessor.getBookie()).thenReturn(bookie);
+        
when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
+        when(requestProcessor.getRequestStats()).thenReturn(new 
RequestStats(NullStatsLogger.INSTANCE));
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+        EventLoop eventLoop = mock(EventLoop.class);
+        when(eventLoop.inEventLoop()).thenReturn(true);
+        when(channel.eventLoop()).thenReturn(eventLoop);
+        ByteBuf buffer0 = ByteBufAllocator.DEFAULT.buffer(4);
+        ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer(4);
+        ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer(4);
+        ByteBuf buffer3 = ByteBufAllocator.DEFAULT.buffer(4);
+        ByteBuf buffer4 = ByteBufAllocator.DEFAULT.buffer(4);
+
+        when(bookie.readEntry(anyLong(), 
anyLong())).thenReturn(buffer0).thenReturn(buffer1).thenReturn(buffer2)
+                .thenReturn(buffer3).thenReturn(buffer4);
+    }
+
+    @Test
+    public void testSuccessfulAsynchronousFenceRequest() throws Exception {
+        testAsynchronousRequest(true, BookieProtocol.EOK);
+    }
+
+    @Test
+    public void testFailedAsynchronousFenceRequest() throws Exception {
+        testAsynchronousRequest(false, BookieProtocol.EIO);
+    }
+
+    private void testAsynchronousRequest(boolean result, int errorCode) throws 
Exception {
+        CompletableFuture<Boolean> fenceResult = FutureUtils.createFuture();
+        when(bookie.fenceLedger(anyLong(), any())).thenReturn(fenceResult);
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            promise.setSuccess();
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any(Response.class));
+
+        long requestId = 0;
+        int maxCount = 5;
+        long maxSize = 1024;
+        ExecutorService service = Executors.newCachedThreadPool();
+        long ledgerId = System.currentTimeMillis();
+        BookieProtocol.BatchedReadRequest request = 
BookieProtocol.BatchedReadRequest.create(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, 
BookieProtocol.FLAG_DO_FENCING, new byte[] {},
+                requestId, maxCount, maxSize);
+        ReadEntryProcessor processor = BatchedReadEntryProcessor.create(
+                request, requestHandler, requestProcessor, service, true, 1024 
* 1024 * 5);
+        processor.run();
+
+        fenceResult.complete(result);
+        latch.await();
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(1, response.getEntryId());
+        assertEquals(ledgerId, response.getLedgerId());
+        assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode());
+        assertEquals(errorCode, response.getErrorCode());
+        service.shutdown();
+    }
+
+    @Test
+    public void testSuccessfulSynchronousFenceRequest() throws Exception {
+        testSynchronousRequest(true, BookieProtocol.EOK);
+    }
+
+    @Test
+    public void testFailedSynchronousFenceRequest() throws Exception {
+        testSynchronousRequest(false, BookieProtocol.EIO);
+    }
+
+    private void testSynchronousRequest(boolean result, int errorCode) throws 
Exception {
+        CompletableFuture<Boolean> fenceResult = FutureUtils.createFuture();
+        when(bookie.fenceLedger(anyLong(), any())).thenReturn(fenceResult);
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            promise.setSuccess();
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any(Response.class));
+
+        long requestId = 0;
+        int maxCount = 5;
+        long maxSize = 1024;
+        ExecutorService service = Executors.newCachedThreadPool();
+        long ledgerId = System.currentTimeMillis();
+        BookieProtocol.BatchedReadRequest request = 
BookieProtocol.BatchedReadRequest.create(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, 
BookieProtocol.FLAG_DO_FENCING, new byte[] {},
+                requestId, maxCount, maxSize);
+        ReadEntryProcessor processor = BatchedReadEntryProcessor.create(
+                request, requestHandler, requestProcessor, service, true, 1024 
* 1024 * 5);
+        fenceResult.complete(result);
+        processor.run();
+
+        latch.await();
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(1, response.getEntryId());
+        assertEquals(ledgerId, response.getLedgerId());
+        assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode());
+        assertEquals(errorCode, response.getErrorCode());
+    }
+
+    @Test
+    public void testNonFenceRequest() throws Exception {
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            promise.setSuccess();
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any(Response.class));
+
+        long requestId = 0;
+        int maxCount = 5;
+        long maxSize = 1024;
+        ExecutorService service = Executors.newCachedThreadPool();
+        long ledgerId = System.currentTimeMillis();
+        BookieProtocol.BatchedReadRequest request = 
BookieProtocol.BatchedReadRequest.create(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, 
BookieProtocol.FLAG_DO_FENCING, new byte[] {},
+                requestId, maxCount, maxSize);
+        ReadEntryProcessor processor = BatchedReadEntryProcessor.create(
+                request, requestHandler, requestProcessor, service, true, 1024 
* 1024 * 5);
+        processor.run();
+
+        latch.await();
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(1, response.getEntryId());
+        assertEquals(ledgerId, response.getLedgerId());
+        assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode());
+        assertEquals(BookieProtocol.EOK, response.getErrorCode());
+    }
+}
\ No newline at end of file

Reply via email to