This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 94c7ce0 BP-14 forceLedger wire protocol server side implementation
94c7ce0 is described below
commit 94c7ce0ba535181bc0fa78c4cb4cc612b1d6b773
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon May 14 08:16:22 2018 +0200
BP-14 forceLedger wire protocol server side implementation
Implementation of the wire protocol for the BP-14 force ledger API on
bookie side
Author: Enrico Olivelli <[email protected]>
Author: eolivelli <[email protected]>
Reviewers: Sijie Guo <[email protected]>, Venkateswararao Jujjuri (JV)
<[email protected]>
This closes #1393 from eolivelli/bp14-simple-force-ledger-proto
---
.../src/main/proto/BookkeeperProtocol.proto | 12 ++
.../bookkeeper/bookie/BookKeeperServerStats.java | 2 +
.../java/org/apache/bookkeeper/bookie/Bookie.java | 2 +-
.../bookkeeper/proto/BookieRequestProcessor.java | 43 +++++++
.../bookkeeper/proto/ForceLedgerProcessorV3.java | 142 +++++++++++++++++++++
.../org/apache/bookkeeper/proto/RequestUtils.java | 5 +
.../proto/ForceLedgerProcessorV3Test.java | 129 +++++++++++++++++++
7 files changed, 334 insertions(+), 1 deletion(-)
diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index d4a7d2e..bac9411 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -63,6 +63,7 @@ enum OperationType {
READ_LAC = 7;
GET_BOOKIE_INFO = 8;
START_TLS = 9;
+ FORCE_LEDGER = 10;
}
/**
@@ -85,6 +86,7 @@ message Request {
optional ReadLacRequest readLacRequest = 104;
optional GetBookieInfoRequest getBookieInfoRequest = 105;
optional StartTLSRequest startTLSRequest = 106;
+ optional ForceLedgerRequest forceLedgerRequest = 107;
}
message ReadRequest {
@@ -126,6 +128,10 @@ message WriteLacRequest {
required bytes body = 4;
}
+message ForceLedgerRequest {
+ required int64 ledgerId = 1;
+}
+
message ReadLacRequest {
required int64 ledgerId = 1;
}
@@ -153,6 +159,7 @@ message Response {
optional ReadLacResponse readLacResponse = 104;
optional GetBookieInfoResponse getBookieInfoResponse = 105;
optional StartTLSResponse startTLSResponse = 106;
+ optional ForceLedgerResponse forceLedgerResponse = 107;
}
message ReadResponse {
@@ -181,6 +188,11 @@ message WriteLacResponse {
required int64 ledgerId = 2;
}
+message ForceLedgerResponse {
+ required StatusCode status = 1;
+ required int64 ledgerId = 2;
+}
+
message ReadLacResponse {
required StatusCode status = 1;
required int64 ledgerId = 2;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index d2ce94b..d488bc9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -44,6 +44,8 @@ public interface BookKeeperServerStats {
// Stats
String ADD_ENTRY_REQUEST = "ADD_ENTRY_REQUEST";
String ADD_ENTRY = "ADD_ENTRY";
+ String FORCE_LEDGER_REQUEST = "FORCE_LEDGER_REQUEST";
+ String FORCE_LEDGER = "FORCE_LEDGER";
String READ_ENTRY_REQUEST = "READ_ENTRY_REQUEST";
String READ_ENTRY = "READ_ENTRY";
String READ_ENTRY_SCHEDULING_DELAY = "READ_ENTRY_SCHEDULING_DELAY";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 76af6eb..2b89387 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -120,7 +120,7 @@ public class Bookie extends BookieCriticalThread {
static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
static final long METAENTRY_ID_FENCE_KEY = -0x2000;
- static final long METAENTRY_ID_FORCE_LEDGER = -0x4000;
+ public static final long METAENTRY_ID_FORCE_LEDGER = -0x4000;
private final LedgerDirsManager ledgerDirsManager;
private LedgerDirsManager indexDirsManager;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index edb8924..2aebbb9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -24,6 +24,8 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
@@ -130,6 +132,8 @@ public class BookieRequestProcessor implements
RequestProcessor {
private final OpStatsLogger addEntryStats;
final OpStatsLogger readRequestStats;
final OpStatsLogger readEntryStats;
+ final OpStatsLogger forceLedgerStats;
+ final OpStatsLogger forceLedgerRequestStats;
final OpStatsLogger fenceReadRequestStats;
final OpStatsLogger fenceReadEntryStats;
final OpStatsLogger fenceReadWaitStats;
@@ -191,6 +195,8 @@ public class BookieRequestProcessor implements
RequestProcessor {
this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
+ this.forceLedgerStats = statsLogger.getOpStatsLogger(FORCE_LEDGER);
+ this.forceLedgerRequestStats =
statsLogger.getOpStatsLogger(FORCE_LEDGER_REQUEST);
this.readRequestStats =
statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
this.fenceReadEntryStats =
statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ);
this.fenceReadRequestStats =
statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST);
@@ -258,6 +264,9 @@ public class BookieRequestProcessor implements
RequestProcessor {
case READ_ENTRY:
processReadRequestV3(r, c);
break;
+ case FORCE_LEDGER:
+ processForceLedgerRequestV3(r, c);
+ break;
case AUTH:
LOG.info("Ignoring auth operation from client {}",
c.remoteAddress());
BookkeeperProtocol.AuthMessage message =
BookkeeperProtocol.AuthMessage
@@ -369,6 +378,40 @@ public class BookieRequestProcessor implements
RequestProcessor {
}
}
+ private void processForceLedgerRequestV3(final BookkeeperProtocol.Request
r, final Channel c) {
+ ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, c,
this);
+
+ final OrderedExecutor threadPool;
+ if (RequestUtils.isHighPriority(r)) {
+ threadPool = highPriorityThreadPool;
+ } else {
+ threadPool = writeThreadPool;
+ }
+
+ if (null == threadPool) {
+ forceLedger.run();
+ } else {
+ try {
+
threadPool.executeOrdered(r.getForceLedgerRequest().getLedgerId(), forceLedger);
+ } catch (RejectedExecutionException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to process request to force ledger {}.
Too many pending requests",
+ r.getForceLedgerRequest().getLedgerId());
+ }
+ BookkeeperProtocol.ForceLedgerResponse.Builder
forceLedgerResponse =
+ BookkeeperProtocol.ForceLedgerResponse.newBuilder()
+ .setLedgerId(r.getForceLedgerRequest().getLedgerId())
+
.setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
+ BookkeeperProtocol.Response.Builder response =
BookkeeperProtocol.Response.newBuilder()
+ .setHeader(forceLedger.getHeader())
+ .setStatus(forceLedgerResponse.getStatus())
+ .setForceLedgerResponse(forceLedgerResponse);
+ BookkeeperProtocol.Response resp = response.build();
+ forceLedger.sendResponse(forceLedgerResponse.getStatus(),
resp, forceLedgerRequestStats);
+ }
+ }
+ }
+
private void processReadRequestV3(final BookkeeperProtocol.Request r,
final Channel c) {
ExecutorService fenceThread = null == highPriorityThreadPool ? null :
highPriorityThreadPool.chooseThread(c);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
new file mode 100644
index 0000000..0c8ef01
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
@@ -0,0 +1,142 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable
{
+ private static final Logger logger =
LoggerFactory.getLogger(ForceLedgerProcessorV3.class);
+
+ public ForceLedgerProcessorV3(Request request, Channel channel,
+ BookieRequestProcessor requestProcessor) {
+ super(request, channel, requestProcessor);
+ }
+
+ // Returns null if there is no exception thrown
+ private ForceLedgerResponse getForceLedgerResponse() {
+ final long startTimeNanos = MathUtils.nowInNano();
+ ForceLedgerRequest forceLedgerRequest =
request.getForceLedgerRequest();
+ long ledgerId = forceLedgerRequest.getLedgerId();
+
+ final ForceLedgerResponse.Builder forceLedgerResponse =
ForceLedgerResponse.newBuilder().setLedgerId(ledgerId);
+
+ if (!isVersionCompatible()) {
+ forceLedgerResponse.setStatus(StatusCode.EBADVERSION);
+ return forceLedgerResponse.build();
+ }
+
+ BookkeeperInternalCallbacks.WriteCallback wcb =
+ (int rc, long ledgerId1, long entryId, BookieSocketAddress
addr, Object ctx) -> {
+
+ checkArgument(entryId == Bookie.METAENTRY_ID_FORCE_LEDGER,
+ "entryId must be METAENTRY_ID_FORCE_LEDGER but was {}",
entryId);
+
+ checkArgument(ledgerId1 == ledgerId,
+ "ledgerId must be {} but was {}", ledgerId, ledgerId1);
+
+ if (BookieProtocol.EOK == rc) {
+ requestProcessor.getForceLedgerStats()
+
.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+ TimeUnit.NANOSECONDS);
+ } else {
+ requestProcessor.getForceLedgerStats()
+
.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+ TimeUnit.NANOSECONDS);
+ }
+
+ StatusCode status;
+ switch (rc) {
+ case BookieProtocol.EOK:
+ status = StatusCode.EOK;
+ break;
+ case BookieProtocol.EIO:
+ status = StatusCode.EIO;
+ break;
+ default:
+ status = StatusCode.EUA;
+ break;
+ }
+ forceLedgerResponse.setStatus(status);
+ Response.Builder response = Response.newBuilder()
+ .setHeader(getHeader())
+ .setStatus(forceLedgerResponse.getStatus())
+ .setForceLedgerResponse(forceLedgerResponse);
+ Response resp = response.build();
+ sendResponse(status, resp,
requestProcessor.getForceLedgerRequestStats());
+ };
+ StatusCode status = null;
+ try {
+ requestProcessor.getBookie().forceLedger(ledgerId, wcb, channel);
+ status = StatusCode.EOK;
+ } catch (Throwable t) {
+ logger.error("Unexpected exception while forcing ledger {} : ",
ledgerId, t);
+ // some bad request which cause unexpected exception
+ status = StatusCode.EBADREQ;
+ }
+
+ // If everything is okay, we return null so that the calling function
+ // doesn't return a response back to the caller.
+ if (!status.equals(StatusCode.EOK)) {
+ forceLedgerResponse.setStatus(status);
+ return forceLedgerResponse.build();
+ }
+ return null;
+ }
+
+ @Override
+ public void safeRun() {
+ ForceLedgerResponse forceLedgerResponse = getForceLedgerResponse();
+ if (null != forceLedgerResponse) {
+ Response.Builder response = Response.newBuilder()
+ .setHeader(getHeader())
+ .setStatus(forceLedgerResponse.getStatus())
+ .setForceLedgerResponse(forceLedgerResponse);
+ Response resp = response.build();
+ sendResponse(forceLedgerResponse.getStatus(), resp,
requestProcessor.getForceLedgerRequestStats());
+ }
+ }
+
+ /**
+ * this toString method filters out body and masterKey from the output.
+ * masterKey contains the password of the ledger and body is customer data,
+ * so it is not appropriate to have these in logs or system output.
+ */
+ @Override
+ public String toString() {
+ return RequestUtils.toSafeString(request);
+ }
+}
+
+
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
index fb61165..d384c81 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
@@ -89,6 +89,11 @@ class RequestUtils {
stringHelper.add("ledgerId", writeLacRequest.getLedgerId());
stringHelper.add("lac", writeLacRequest.getLac());
return stringHelper.toString();
+ } else if (request.hasForceLedgerRequest()) {
+ BookkeeperProtocol.ForceLedgerRequest forceLedgerRequest =
request.getForceLedgerRequest();
+ includeHeaderFields(stringHelper, header);
+ stringHelper.add("ledgerId", forceLedgerRequest.getLedgerId());
+ return stringHelper.toString();
} else {
return request.toString();
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
new file mode 100644
index 0000000..90a51c8
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ForceLedgerProcessorV3}.
+ */
+public class ForceLedgerProcessorV3Test {
+
+ private Request request;
+ private ForceLedgerProcessorV3 processor;
+ private Channel channel;
+ private BookieRequestProcessor requestProcessor;
+ private Bookie bookie;
+
+ @Before
+ public void setup() {
+ request = Request.newBuilder()
+ .setHeader(BKPacketHeader.newBuilder()
+ .setTxnId(System.currentTimeMillis())
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.ADD_ENTRY)
+ .build())
+ .setForceLedgerRequest(ForceLedgerRequest.newBuilder()
+ .setLedgerId(System.currentTimeMillis())
+ .build())
+ .build();
+ channel = mock(Channel.class);
+ bookie = mock(Bookie.class);
+ requestProcessor = mock(BookieRequestProcessor.class);
+ when(requestProcessor.getBookie()).thenReturn(bookie);
+ when(requestProcessor.getForceLedgerStats())
+
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger"));
+ when(requestProcessor.getForceLedgerRequestStats())
+
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger_request"));
+ processor = new ForceLedgerProcessorV3(
+ request,
+ channel,
+ requestProcessor);
+ }
+
+ @Test
+ public void testForceLedger() throws Exception {
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+ doAnswer(invocationOnMock -> {
+ WriteCallback wc = invocationOnMock.getArgument(1);
+
+ wc.writeComplete(
+ 0,
+ request.getForceLedgerRequest().getLedgerId(),
+ Bookie.METAENTRY_ID_FORCE_LEDGER,
+ null,
+ null);
+ return null;
+ }).when(bookie).forceLedger(
+ eq(request.getForceLedgerRequest().getLedgerId()),
+ any(WriteCallback.class),
+ same(channel));
+
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any());
+
+ processor.run();
+
+ verify(bookie, times(1))
+ .forceLedger(eq(request.getForceLedgerRequest().getLedgerId()),
+ any(WriteCallback.class), same(channel));
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+ latch.await();
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(StatusCode.EOK, response.getStatus());
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].