This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new eaa6014 Support asynchronous fence request for V2 ReadEntryProcessor
eaa6014 is described below
commit eaa6014049e6bc8253f5b0282e4ace8a3aff92e2
Author: Like <[email protected]>
AuthorDate: Fri Apr 19 08:44:54 2019 +0800
Support asynchronous fence request for V2 ReadEntryProcessor
Currently, the ```ReadEntryProcessor``` v2 does not support asynchronous
fence request, which wait for 1000 milliseconds to wait it done if it's a
fencing request. This pull request attempt to support asynchronous response if
a thread pool is provided.
Closes #283
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #2021 from liketic/refactor
---
.../bookkeeper/proto/BookieRequestProcessor.java | 4 +-
.../bookkeeper/proto/ReadEntryProcessor.java | 123 ++++++++------
.../bookkeeper/proto/ReadEntryProcessorTest.java | 180 +++++++++++++++++++++
3 files changed, 259 insertions(+), 48 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 7d5e2e6..02b0f56 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -619,7 +619,9 @@ public class BookieRequestProcessor implements
RequestProcessor {
}
private void processReadRequest(final BookieProtocol.ReadRequest r, final
Channel c) {
- ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this);
+ ExecutorService fenceThreadPool =
+ null == highPriorityThreadPool ? null :
highPriorityThreadPool.chooseThread(c);
+ ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this,
fenceThreadPool);
// If it's a high priority read (fencing or as part of recovery
process), we want to make sure it
// gets executed as fast as possible, so bypass the normal
readThreadPool
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 6566c7b..a530bd5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -17,6 +17,10 @@
*/
package org.apache.bookkeeper.proto;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.util.Recycler;
@@ -24,24 +28,31 @@ import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
+import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
private static final Logger LOG =
LoggerFactory.getLogger(ReadEntryProcessor.class);
- public static ReadEntryProcessor create(ReadRequest request, Channel
channel,
- BookieRequestProcessor
requestProcessor) {
+ private ExecutorService fenceThreadPool;
+
+ public static ReadEntryProcessor create(ReadRequest request,
+ Channel channel,
+ BookieRequestProcessor
requestProcessor,
+ ExecutorService fenceThreadPool) {
ReadEntryProcessor rep = RECYCLER.get();
rep.init(request, channel, requestProcessor);
+ rep.fenceThreadPool = fenceThreadPool;
return rep;
}
@@ -50,55 +61,29 @@ class ReadEntryProcessor extends
PacketProcessorBase<ReadRequest> {
if (LOG.isDebugEnabled()) {
LOG.debug("Received new read request: {}", request);
}
- int errorCode = BookieProtocol.EIO;
+ int errorCode = BookieProtocol.EOK;
long startTimeNanos = MathUtils.nowInNano();
ByteBuf data = null;
try {
- Future<Boolean> fenceResult = null;
+ SettableFuture<Boolean> fenceResult = null;
if (request.isFencing()) {
LOG.warn("Ledger: {} fenced by: {}", request.getLedgerId(),
channel.remoteAddress());
if (request.hasMasterKey()) {
- fenceResult =
requestProcessor.bookie.fenceLedger(request.getLedgerId(),
request.getMasterKey());
+ fenceResult =
requestProcessor.getBookie().fenceLedger(request.getLedgerId(),
+ request.getMasterKey());
} else {
LOG.error("Password not provided, Not safe to fence {}",
request.getLedgerId());
throw
BookieException.create(BookieException.Code.UnauthorizedAccessException);
}
}
- data = requestProcessor.bookie.readEntry(request.getLedgerId(),
request.getEntryId());
+ data =
requestProcessor.getBookie().readEntry(request.getLedgerId(),
request.getEntryId());
if (LOG.isDebugEnabled()) {
LOG.debug("##### Read entry ##### {} -- ref-count: {}",
data.readableBytes(), data.refCnt());
}
- if (null != fenceResult) {
- // TODO: {@link
https://github.com/apache/bookkeeper/issues/283}
- // currently we don't have readCallback to run in separated
read
- // threads. after BOOKKEEPER-429 is complete, we could improve
- // following code to make it not wait here
- //
- // For now, since we only try to wait after read entry. so
writing
- // to journal and read entry are executed in different thread
- // it would be fine.
- try {
- Boolean fenced = fenceResult.get(1000,
TimeUnit.MILLISECONDS);
- if (null == fenced || !fenced) {
- // if failed to fence, fail the read request to make
it retry.
- errorCode = BookieProtocol.EIO;
- } else {
- errorCode = BookieProtocol.EOK;
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.error("Interrupting fence read entry {}", request, ie);
- errorCode = BookieProtocol.EIO;
- } catch (ExecutionException ee) {
- LOG.error("Failed to fence read entry {}", request, ee);
- errorCode = BookieProtocol.EIO;
- } catch (TimeoutException te) {
- LOG.error("Timeout to fence read entry {}", request, te);
- errorCode = BookieProtocol.EIO;
- }
- } else {
- errorCode = BookieProtocol.EOK;
+ if (fenceResult != null) {
+ handleReadResultForFenceRead(fenceResult, data,
startTimeNanos);
+ return;
}
} catch (Bookie.NoLedgerException e) {
if (LOG.isDebugEnabled()) {
@@ -127,22 +112,66 @@ class ReadEntryProcessor extends
PacketProcessorBase<ReadRequest> {
if (LOG.isTraceEnabled()) {
LOG.trace("Read entry rc = {} for {}", errorCode, request);
}
+ sendResponse(data, errorCode, startTimeNanos);
+ }
+
+ private void sendResponse(ByteBuf data, int errorCode, long
startTimeNanos) {
+ final RequestStats stats = requestProcessor.getRequestStats();
+ final OpStatsLogger logger = stats.getReadEntryStats();
+ BookieProtocol.Response response;
if (errorCode == BookieProtocol.EOK) {
- requestProcessor.getRequestStats().getReadEntryStats()
-
.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
- sendResponse(errorCode, ResponseBuilder.buildReadResponse(data,
request),
-
requestProcessor.getRequestStats().getReadRequestStats());
+
logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
+ response = ResponseBuilder.buildReadResponse(data, request);
} else {
- ReferenceCountUtil.release(data);
-
- requestProcessor.getRequestStats().getReadEntryStats()
- .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
- sendResponse(errorCode,
ResponseBuilder.buildErrorResponse(errorCode, request),
-
requestProcessor.getRequestStats().getReadRequestStats());
+ if (data != null) {
+ ReferenceCountUtil.release(data);
+ }
+ logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
+ response = ResponseBuilder.buildErrorResponse(errorCode, request);
}
+ sendResponse(errorCode, response, stats.getReadRequestStats());
recycle();
}
+ private void sendFenceResponse(Boolean result, ByteBuf data, long
startTimeNanos) {
+ final int retCode = result != null && result ? BookieProtocol.EOK :
BookieProtocol.EIO;
+ sendResponse(data, retCode, startTimeNanos);
+ }
+
+ private void handleReadResultForFenceRead(ListenableFuture<Boolean>
fenceResult,
+ ByteBuf data,
+ long startTimeNanos) {
+ if (null != fenceThreadPool) {
+ Futures.addCallback(fenceResult, new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean result) {
+ sendFenceResponse(result, data, startTimeNanos);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Error processing fence request", t);
+ // if failed to fence, fail the read request to make it
retry.
+ sendResponse(data, BookieProtocol.EIO, startTimeNanos);
+ }
+ }, fenceThreadPool);
+ } else {
+ try {
+ Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS);
+ sendFenceResponse(fenced, data, startTimeNanos);
+ return;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupting fence read entry {}", request, ie);
+ } catch (ExecutionException ee) {
+ LOG.error("Failed to fence read entry {}", request,
ee.getCause());
+ } catch (TimeoutException te) {
+ LOG.error("Timeout to fence read entry {}", request, te);
+ }
+ sendResponse(data, BookieProtocol.EIO, startTimeNanos);
+ }
+ }
+
@Override
public String toString() {
return String.format("ReadEntry(%d, %d)", request.getLedgerId(),
request.getEntryId());
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
new file mode 100644
index 0000000..bf73c10
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
+import org.apache.bookkeeper.proto.BookieProtocol.Response;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ReadEntryProcessor}.
+ */
+public class ReadEntryProcessorTest {
+
+ private Channel channel;
+ private BookieRequestProcessor requestProcessor;
+ private Bookie bookie;
+
+ @Before
+ public void setup() throws IOException, BookieException {
+ channel = mock(Channel.class);
+ bookie = mock(Bookie.class);
+ requestProcessor = mock(BookieRequestProcessor.class);
+ when(requestProcessor.getBookie()).thenReturn(bookie);
+
when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
+ when(requestProcessor.getRequestStats()).thenReturn(new
RequestStats(NullStatsLogger.INSTANCE));
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+ }
+
+ @Test
+ public void testSuccessfulAsynchronousFenceRequest() throws Exception {
+ testAsynchronousRequest(true, BookieProtocol.EOK);
+ }
+
+ @Test
+ public void testFailedAsynchronousFenceRequest() throws Exception {
+ testAsynchronousRequest(false, BookieProtocol.EIO);
+ }
+
+ private void testAsynchronousRequest(boolean result, int errorCode) throws
Exception {
+ SettableFuture<Boolean> fenceResult = SettableFuture.create();
+ when(bookie.fenceLedger(anyLong(), any())).thenReturn(fenceResult);
+
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any(Response.class), any());
+
+ ExecutorService service = Executors.newCachedThreadPool();
+ long ledgerId = System.currentTimeMillis();
+ ReadRequest request = new
ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
+ 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
+ ReadEntryProcessor processor = ReadEntryProcessor.create(request,
channel, requestProcessor, service);
+ processor.run();
+
+ fenceResult.set(result);
+ latch.await();
+ verify(channel, times(1)).writeAndFlush(any(Response.class), any());
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(1, response.getEntryId());
+ assertEquals(ledgerId, response.getLedgerId());
+ assertEquals(BookieProtocol.READENTRY, response.getOpCode());
+ assertEquals(errorCode, response.getErrorCode());
+ }
+
+ @Test
+ public void testSuccessfulSynchronousFenceRequest() throws Exception {
+ testSynchronousRequest(true, BookieProtocol.EOK);
+ }
+
+ @Test
+ public void testFailedSynchronousFenceRequest() throws Exception {
+ testSynchronousRequest(false, BookieProtocol.EIO);
+ }
+
+ private void testSynchronousRequest(boolean result, int errorCode) throws
Exception {
+ SettableFuture<Boolean> fenceResult = SettableFuture.create();
+ when(bookie.fenceLedger(anyLong(), any())).thenReturn(fenceResult);
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any(Response.class), any());
+
+ long ledgerId = System.currentTimeMillis();
+ ReadRequest request = new
ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
+ 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
+ ReadEntryProcessor processor = ReadEntryProcessor.create(request,
channel, requestProcessor, null);
+ fenceResult.set(result);
+ processor.run();
+
+ latch.await();
+ verify(channel, times(1)).writeAndFlush(any(Response.class), any());
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(1, response.getEntryId());
+ assertEquals(ledgerId, response.getLedgerId());
+ assertEquals(BookieProtocol.READENTRY, response.getOpCode());
+ assertEquals(errorCode, response.getErrorCode());
+ }
+
+ @Test
+ public void testNonFenceRequest() throws Exception {
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any(Response.class), any());
+
+ long ledgerId = System.currentTimeMillis();
+ ReadRequest request = new
ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
+ 1, (short) 0, new byte[]{});
+ ReadEntryProcessor processor = ReadEntryProcessor.create(request,
channel, requestProcessor, null);
+ processor.run();
+
+ latch.await();
+ verify(channel, times(1)).writeAndFlush(any(Response.class), any());
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(1, response.getEntryId());
+ assertEquals(ledgerId, response.getLedgerId());
+ assertEquals(BookieProtocol.READENTRY, response.getOpCode());
+ assertEquals(BookieProtocol.EOK, response.getErrorCode());
+ }
+}