This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new eaa6014  Support asynchronous fence request for V2 ReadEntryProcessor
eaa6014 is described below

commit eaa6014049e6bc8253f5b0282e4ace8a3aff92e2
Author: Like <[email protected]>
AuthorDate: Fri Apr 19 08:44:54 2019 +0800

    Support asynchronous fence request for V2 ReadEntryProcessor
    
    Currently, the ```ReadEntryProcessor``` v2 does not support asynchronous 
fence request, which wait for 1000 milliseconds to wait it done if it's a 
fencing request. This pull request attempt to support asynchronous response if 
a thread pool is provided.
    
    Closes #283
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #2021 from liketic/refactor
---
 .../bookkeeper/proto/BookieRequestProcessor.java   |   4 +-
 .../bookkeeper/proto/ReadEntryProcessor.java       | 123 ++++++++------
 .../bookkeeper/proto/ReadEntryProcessorTest.java   | 180 +++++++++++++++++++++
 3 files changed, 259 insertions(+), 48 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 7d5e2e6..02b0f56 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -619,7 +619,9 @@ public class BookieRequestProcessor implements 
RequestProcessor {
     }
 
     private void processReadRequest(final BookieProtocol.ReadRequest r, final 
Channel c) {
-        ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this);
+        ExecutorService fenceThreadPool =
+                null == highPriorityThreadPool ? null : 
highPriorityThreadPool.chooseThread(c);
+        ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this, 
fenceThreadPool);
 
         // If it's a high priority read (fencing or as part of recovery 
process), we want to make sure it
         // gets executed as fast as possible, so bypass the normal 
readThreadPool
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 6566c7b..a530bd5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -17,6 +17,10 @@
  */
 package org.apache.bookkeeper.proto;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.util.Recycler;
@@ -24,24 +28,31 @@ import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ReadEntryProcessor.class);
 
-    public static ReadEntryProcessor create(ReadRequest request, Channel 
channel,
-                                            BookieRequestProcessor 
requestProcessor) {
+    private ExecutorService fenceThreadPool;
+
+    public static ReadEntryProcessor create(ReadRequest request,
+                                            Channel channel,
+                                            BookieRequestProcessor 
requestProcessor,
+                                            ExecutorService fenceThreadPool) {
         ReadEntryProcessor rep = RECYCLER.get();
         rep.init(request, channel, requestProcessor);
+        rep.fenceThreadPool = fenceThreadPool;
         return rep;
     }
 
@@ -50,55 +61,29 @@ class ReadEntryProcessor extends 
PacketProcessorBase<ReadRequest> {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received new read request: {}", request);
         }
-        int errorCode = BookieProtocol.EIO;
+        int errorCode = BookieProtocol.EOK;
         long startTimeNanos = MathUtils.nowInNano();
         ByteBuf data = null;
         try {
-            Future<Boolean> fenceResult = null;
+            SettableFuture<Boolean> fenceResult = null;
             if (request.isFencing()) {
                 LOG.warn("Ledger: {}  fenced by: {}", request.getLedgerId(), 
channel.remoteAddress());
 
                 if (request.hasMasterKey()) {
-                    fenceResult = 
requestProcessor.bookie.fenceLedger(request.getLedgerId(), 
request.getMasterKey());
+                    fenceResult = 
requestProcessor.getBookie().fenceLedger(request.getLedgerId(),
+                            request.getMasterKey());
                 } else {
                     LOG.error("Password not provided, Not safe to fence {}", 
request.getLedgerId());
                     throw 
BookieException.create(BookieException.Code.UnauthorizedAccessException);
                 }
             }
-            data = requestProcessor.bookie.readEntry(request.getLedgerId(), 
request.getEntryId());
+            data = 
requestProcessor.getBookie().readEntry(request.getLedgerId(), 
request.getEntryId());
             if (LOG.isDebugEnabled()) {
                 LOG.debug("##### Read entry ##### {} -- ref-count: {}", 
data.readableBytes(), data.refCnt());
             }
-            if (null != fenceResult) {
-                // TODO: {@link 
https://github.com/apache/bookkeeper/issues/283}
-                // currently we don't have readCallback to run in separated 
read
-                // threads. after BOOKKEEPER-429 is complete, we could improve
-                // following code to make it not wait here
-                //
-                // For now, since we only try to wait after read entry. so 
writing
-                // to journal and read entry are executed in different thread
-                // it would be fine.
-                try {
-                    Boolean fenced = fenceResult.get(1000, 
TimeUnit.MILLISECONDS);
-                    if (null == fenced || !fenced) {
-                        // if failed to fence, fail the read request to make 
it retry.
-                        errorCode = BookieProtocol.EIO;
-                    } else {
-                        errorCode = BookieProtocol.EOK;
-                    }
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    LOG.error("Interrupting fence read entry {}", request, ie);
-                    errorCode = BookieProtocol.EIO;
-                } catch (ExecutionException ee) {
-                    LOG.error("Failed to fence read entry {}", request, ee);
-                    errorCode = BookieProtocol.EIO;
-                } catch (TimeoutException te) {
-                    LOG.error("Timeout to fence read entry {}", request, te);
-                    errorCode = BookieProtocol.EIO;
-                }
-            } else {
-                errorCode = BookieProtocol.EOK;
+            if (fenceResult != null) {
+                handleReadResultForFenceRead(fenceResult, data, 
startTimeNanos);
+                return;
             }
         } catch (Bookie.NoLedgerException e) {
             if (LOG.isDebugEnabled()) {
@@ -127,22 +112,66 @@ class ReadEntryProcessor extends 
PacketProcessorBase<ReadRequest> {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Read entry rc = {} for {}", errorCode, request);
         }
+        sendResponse(data, errorCode, startTimeNanos);
+    }
+
+    private void sendResponse(ByteBuf data, int errorCode, long 
startTimeNanos) {
+        final RequestStats stats = requestProcessor.getRequestStats();
+        final OpStatsLogger logger = stats.getReadEntryStats();
+        BookieProtocol.Response response;
         if (errorCode == BookieProtocol.EOK) {
-            requestProcessor.getRequestStats().getReadEntryStats()
-                
.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
-            sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, 
request),
-                         
requestProcessor.getRequestStats().getReadRequestStats());
+            
logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+            response = ResponseBuilder.buildReadResponse(data, request);
         } else {
-            ReferenceCountUtil.release(data);
-
-            requestProcessor.getRequestStats().getReadEntryStats()
-                .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
-            sendResponse(errorCode, 
ResponseBuilder.buildErrorResponse(errorCode, request),
-                         
requestProcessor.getRequestStats().getReadRequestStats());
+            if (data != null) {
+                ReferenceCountUtil.release(data);
+            }
+            logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+            response = ResponseBuilder.buildErrorResponse(errorCode, request);
         }
+        sendResponse(errorCode, response, stats.getReadRequestStats());
         recycle();
     }
 
+    private void sendFenceResponse(Boolean result, ByteBuf data, long 
startTimeNanos) {
+        final int retCode = result != null && result ? BookieProtocol.EOK : 
BookieProtocol.EIO;
+        sendResponse(data, retCode, startTimeNanos);
+    }
+
+    private void handleReadResultForFenceRead(ListenableFuture<Boolean> 
fenceResult,
+                                              ByteBuf data,
+                                              long startTimeNanos) {
+        if (null != fenceThreadPool) {
+            Futures.addCallback(fenceResult, new FutureCallback<Boolean>() {
+                @Override
+                public void onSuccess(Boolean result) {
+                    sendFenceResponse(result, data, startTimeNanos);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    LOG.error("Error processing fence request", t);
+                    // if failed to fence, fail the read request to make it 
retry.
+                    sendResponse(data, BookieProtocol.EIO, startTimeNanos);
+                }
+            }, fenceThreadPool);
+        } else {
+            try {
+                Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS);
+                sendFenceResponse(fenced, data, startTimeNanos);
+                return;
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                LOG.error("Interrupting fence read entry {}", request, ie);
+            } catch (ExecutionException ee) {
+                LOG.error("Failed to fence read entry {}", request, 
ee.getCause());
+            } catch (TimeoutException te) {
+                LOG.error("Timeout to fence read entry {}", request, te);
+            }
+            sendResponse(data, BookieProtocol.EIO, startTimeNanos);
+        }
+    }
+
     @Override
     public String toString() {
         return String.format("ReadEntry(%d, %d)", request.getLedgerId(), 
request.getEntryId());
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
new file mode 100644
index 0000000..bf73c10
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
+import org.apache.bookkeeper.proto.BookieProtocol.Response;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ReadEntryProcessor}.
+ */
+public class ReadEntryProcessorTest {
+
+    private Channel channel;
+    private BookieRequestProcessor requestProcessor;
+    private Bookie bookie;
+
+    @Before
+    public void setup() throws IOException, BookieException {
+        channel = mock(Channel.class);
+        bookie = mock(Bookie.class);
+        requestProcessor = mock(BookieRequestProcessor.class);
+        when(requestProcessor.getBookie()).thenReturn(bookie);
+        
when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
+        when(requestProcessor.getRequestStats()).thenReturn(new 
RequestStats(NullStatsLogger.INSTANCE));
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+    }
+
+    @Test
+    public void testSuccessfulAsynchronousFenceRequest() throws Exception {
+        testAsynchronousRequest(true, BookieProtocol.EOK);
+    }
+
+    @Test
+    public void testFailedAsynchronousFenceRequest() throws Exception {
+        testAsynchronousRequest(false, BookieProtocol.EIO);
+    }
+
+    private void testAsynchronousRequest(boolean result, int errorCode) throws 
Exception {
+        SettableFuture<Boolean> fenceResult = SettableFuture.create();
+        when(bookie.fenceLedger(anyLong(), any())).thenReturn(fenceResult);
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any(Response.class), any());
+
+        ExecutorService service = Executors.newCachedThreadPool();
+        long ledgerId = System.currentTimeMillis();
+        ReadRequest request = new 
ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
+                1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
+        ReadEntryProcessor processor = ReadEntryProcessor.create(request, 
channel, requestProcessor, service);
+        processor.run();
+
+        fenceResult.set(result);
+        latch.await();
+        verify(channel, times(1)).writeAndFlush(any(Response.class), any());
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(1, response.getEntryId());
+        assertEquals(ledgerId, response.getLedgerId());
+        assertEquals(BookieProtocol.READENTRY, response.getOpCode());
+        assertEquals(errorCode, response.getErrorCode());
+    }
+
+    @Test
+    public void testSuccessfulSynchronousFenceRequest() throws Exception {
+        testSynchronousRequest(true, BookieProtocol.EOK);
+    }
+
+    @Test
+    public void testFailedSynchronousFenceRequest() throws Exception {
+        testSynchronousRequest(false, BookieProtocol.EIO);
+    }
+
+    private void testSynchronousRequest(boolean result, int errorCode) throws 
Exception {
+        SettableFuture<Boolean> fenceResult = SettableFuture.create();
+        when(bookie.fenceLedger(anyLong(), any())).thenReturn(fenceResult);
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any(Response.class), any());
+
+        long ledgerId = System.currentTimeMillis();
+        ReadRequest request = new 
ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
+                1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
+        ReadEntryProcessor processor = ReadEntryProcessor.create(request, 
channel, requestProcessor, null);
+        fenceResult.set(result);
+        processor.run();
+
+        latch.await();
+        verify(channel, times(1)).writeAndFlush(any(Response.class), any());
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(1, response.getEntryId());
+        assertEquals(ledgerId, response.getLedgerId());
+        assertEquals(BookieProtocol.READENTRY, response.getOpCode());
+        assertEquals(errorCode, response.getErrorCode());
+    }
+
+    @Test
+    public void testNonFenceRequest() throws Exception {
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any(Response.class), any());
+
+        long ledgerId = System.currentTimeMillis();
+        ReadRequest request = new 
ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
+                1, (short) 0, new byte[]{});
+        ReadEntryProcessor processor = ReadEntryProcessor.create(request, 
channel, requestProcessor, null);
+        processor.run();
+
+        latch.await();
+        verify(channel, times(1)).writeAndFlush(any(Response.class), any());
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(1, response.getEntryId());
+        assertEquals(ledgerId, response.getLedgerId());
+        assertEquals(BookieProtocol.READENTRY, response.getOpCode());
+        assertEquals(BookieProtocol.EOK, response.getErrorCode());
+    }
+}

Reply via email to