This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 94c7ce0  BP-14 forceLedger wire protocol server side implementation
94c7ce0 is described below

commit 94c7ce0ba535181bc0fa78c4cb4cc612b1d6b773
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon May 14 08:16:22 2018 +0200

    BP-14 forceLedger wire protocol server side implementation
    
    Implementation of the wire protocol for the BP-14 force ledger API on 
bookie side
    
    Author: Enrico Olivelli <[email protected]>
    Author: eolivelli <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>,  Venkateswararao Jujjuri (JV) 
<[email protected]>
    
    This closes #1393 from eolivelli/bp14-simple-force-ledger-proto
---
 .../src/main/proto/BookkeeperProtocol.proto        |  12 ++
 .../bookkeeper/bookie/BookKeeperServerStats.java   |   2 +
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |   2 +-
 .../bookkeeper/proto/BookieRequestProcessor.java   |  43 +++++++
 .../bookkeeper/proto/ForceLedgerProcessorV3.java   | 142 +++++++++++++++++++++
 .../org/apache/bookkeeper/proto/RequestUtils.java  |   5 +
 .../proto/ForceLedgerProcessorV3Test.java          | 129 +++++++++++++++++++
 7 files changed, 334 insertions(+), 1 deletion(-)

diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto 
b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index d4a7d2e..bac9411 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -63,6 +63,7 @@ enum OperationType {
     READ_LAC = 7;
     GET_BOOKIE_INFO = 8;
     START_TLS = 9;
+    FORCE_LEDGER = 10;
 }
 
 /**
@@ -85,6 +86,7 @@ message Request {
     optional ReadLacRequest readLacRequest = 104;
     optional GetBookieInfoRequest getBookieInfoRequest = 105;
     optional StartTLSRequest startTLSRequest = 106;
+    optional ForceLedgerRequest forceLedgerRequest = 107;
 }
 
 message ReadRequest {
@@ -126,6 +128,10 @@ message WriteLacRequest {
     required bytes body = 4;
 }
 
+message ForceLedgerRequest {
+    required int64 ledgerId = 1;
+}
+
 message ReadLacRequest {
     required int64 ledgerId = 1;
 }
@@ -153,6 +159,7 @@ message Response {
     optional ReadLacResponse readLacResponse = 104;
     optional GetBookieInfoResponse getBookieInfoResponse = 105;
     optional StartTLSResponse startTLSResponse = 106;
+    optional ForceLedgerResponse forceLedgerResponse = 107;
 }
 
 message ReadResponse {
@@ -181,6 +188,11 @@ message WriteLacResponse {
     required int64 ledgerId = 2;
 }
 
+message ForceLedgerResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+}
+
 message ReadLacResponse {
     required StatusCode status = 1;
     required int64 ledgerId = 2;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index d2ce94b..d488bc9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -44,6 +44,8 @@ public interface BookKeeperServerStats {
     // Stats
     String ADD_ENTRY_REQUEST = "ADD_ENTRY_REQUEST";
     String ADD_ENTRY = "ADD_ENTRY";
+    String FORCE_LEDGER_REQUEST = "FORCE_LEDGER_REQUEST";
+    String FORCE_LEDGER = "FORCE_LEDGER";
     String READ_ENTRY_REQUEST = "READ_ENTRY_REQUEST";
     String READ_ENTRY = "READ_ENTRY";
     String READ_ENTRY_SCHEDULING_DELAY = "READ_ENTRY_SCHEDULING_DELAY";
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 76af6eb..2b89387 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -120,7 +120,7 @@ public class Bookie extends BookieCriticalThread {
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
     static final long METAENTRY_ID_FENCE_KEY  = -0x2000;
-    static final long METAENTRY_ID_FORCE_LEDGER  = -0x4000;
+    public static final long METAENTRY_ID_FORCE_LEDGER  = -0x4000;
 
     private final LedgerDirsManager ledgerDirsManager;
     private LedgerDirsManager indexDirsManager;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index edb8924..2aebbb9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -24,6 +24,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
+import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
@@ -130,6 +132,8 @@ public class BookieRequestProcessor implements 
RequestProcessor {
     private final OpStatsLogger addEntryStats;
     final OpStatsLogger readRequestStats;
     final OpStatsLogger readEntryStats;
+    final OpStatsLogger forceLedgerStats;
+    final OpStatsLogger forceLedgerRequestStats;
     final OpStatsLogger fenceReadRequestStats;
     final OpStatsLogger fenceReadEntryStats;
     final OpStatsLogger fenceReadWaitStats;
@@ -191,6 +195,8 @@ public class BookieRequestProcessor implements 
RequestProcessor {
         this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
         this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
         this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
+        this.forceLedgerStats = statsLogger.getOpStatsLogger(FORCE_LEDGER);
+        this.forceLedgerRequestStats = 
statsLogger.getOpStatsLogger(FORCE_LEDGER_REQUEST);
         this.readRequestStats = 
statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
         this.fenceReadEntryStats = 
statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ);
         this.fenceReadRequestStats = 
statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST);
@@ -258,6 +264,9 @@ public class BookieRequestProcessor implements 
RequestProcessor {
                 case READ_ENTRY:
                     processReadRequestV3(r, c);
                     break;
+                case FORCE_LEDGER:
+                    processForceLedgerRequestV3(r, c);
+                    break;
                 case AUTH:
                     LOG.info("Ignoring auth operation from client {}", 
c.remoteAddress());
                     BookkeeperProtocol.AuthMessage message = 
BookkeeperProtocol.AuthMessage
@@ -369,6 +378,40 @@ public class BookieRequestProcessor implements 
RequestProcessor {
         }
     }
 
+    private void processForceLedgerRequestV3(final BookkeeperProtocol.Request 
r, final Channel c) {
+        ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, c, 
this);
+
+        final OrderedExecutor threadPool;
+        if (RequestUtils.isHighPriority(r)) {
+            threadPool = highPriorityThreadPool;
+        } else {
+            threadPool = writeThreadPool;
+        }
+
+        if (null == threadPool) {
+            forceLedger.run();
+        } else {
+            try {
+                
threadPool.executeOrdered(r.getForceLedgerRequest().getLedgerId(), forceLedger);
+            } catch (RejectedExecutionException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to process request to force ledger {}. 
Too many pending requests",
+                              r.getForceLedgerRequest().getLedgerId());
+                }
+                BookkeeperProtocol.ForceLedgerResponse.Builder 
forceLedgerResponse =
+                        BookkeeperProtocol.ForceLedgerResponse.newBuilder()
+                        .setLedgerId(r.getForceLedgerRequest().getLedgerId())
+                        
.setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
+                BookkeeperProtocol.Response.Builder response = 
BookkeeperProtocol.Response.newBuilder()
+                        .setHeader(forceLedger.getHeader())
+                        .setStatus(forceLedgerResponse.getStatus())
+                        .setForceLedgerResponse(forceLedgerResponse);
+                BookkeeperProtocol.Response resp = response.build();
+                forceLedger.sendResponse(forceLedgerResponse.getStatus(), 
resp, forceLedgerRequestStats);
+            }
+        }
+    }
+
     private void processReadRequestV3(final BookkeeperProtocol.Request r, 
final Channel c) {
         ExecutorService fenceThread = null == highPriorityThreadPool ? null : 
highPriorityThreadPool.chooseThread(c);
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
new file mode 100644
index 0000000..0c8ef01
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
@@ -0,0 +1,142 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable 
{
+    private static final Logger logger = 
LoggerFactory.getLogger(ForceLedgerProcessorV3.class);
+
+    public ForceLedgerProcessorV3(Request request, Channel channel,
+                             BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
+    }
+
+    // Returns null if there is no exception thrown
+    private ForceLedgerResponse getForceLedgerResponse() {
+        final long startTimeNanos = MathUtils.nowInNano();
+        ForceLedgerRequest forceLedgerRequest = 
request.getForceLedgerRequest();
+        long ledgerId = forceLedgerRequest.getLedgerId();
+
+        final ForceLedgerResponse.Builder forceLedgerResponse = 
ForceLedgerResponse.newBuilder().setLedgerId(ledgerId);
+
+        if (!isVersionCompatible()) {
+            forceLedgerResponse.setStatus(StatusCode.EBADVERSION);
+            return forceLedgerResponse.build();
+        }
+
+        BookkeeperInternalCallbacks.WriteCallback wcb =
+                (int rc, long ledgerId1, long entryId, BookieSocketAddress 
addr, Object ctx) -> {
+
+            checkArgument(entryId == Bookie.METAENTRY_ID_FORCE_LEDGER,
+                    "entryId must be METAENTRY_ID_FORCE_LEDGER but was {}", 
entryId);
+
+            checkArgument(ledgerId1 == ledgerId,
+                    "ledgerId must be {} but was {}", ledgerId, ledgerId1);
+
+            if (BookieProtocol.EOK == rc) {
+                requestProcessor.getForceLedgerStats()
+                        
.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+                                TimeUnit.NANOSECONDS);
+            } else {
+                requestProcessor.getForceLedgerStats()
+                        
.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+                                TimeUnit.NANOSECONDS);
+            }
+
+            StatusCode status;
+            switch (rc) {
+                case BookieProtocol.EOK:
+                    status = StatusCode.EOK;
+                    break;
+                case BookieProtocol.EIO:
+                    status = StatusCode.EIO;
+                    break;
+                default:
+                    status = StatusCode.EUA;
+                    break;
+            }
+            forceLedgerResponse.setStatus(status);
+            Response.Builder response = Response.newBuilder()
+                    .setHeader(getHeader())
+                    .setStatus(forceLedgerResponse.getStatus())
+                    .setForceLedgerResponse(forceLedgerResponse);
+            Response resp = response.build();
+            sendResponse(status, resp, 
requestProcessor.getForceLedgerRequestStats());
+        };
+        StatusCode status = null;
+        try {
+            requestProcessor.getBookie().forceLedger(ledgerId, wcb, channel);
+            status = StatusCode.EOK;
+        } catch (Throwable t) {
+            logger.error("Unexpected exception while forcing ledger {} : ", 
ledgerId, t);
+            // some bad request which cause unexpected exception
+            status = StatusCode.EBADREQ;
+        }
+
+        // If everything is okay, we return null so that the calling function
+        // doesn't return a response back to the caller.
+        if (!status.equals(StatusCode.EOK)) {
+            forceLedgerResponse.setStatus(status);
+            return forceLedgerResponse.build();
+        }
+        return null;
+    }
+
+    @Override
+    public void safeRun() {
+        ForceLedgerResponse forceLedgerResponse = getForceLedgerResponse();
+        if (null != forceLedgerResponse) {
+            Response.Builder response = Response.newBuilder()
+                    .setHeader(getHeader())
+                    .setStatus(forceLedgerResponse.getStatus())
+                    .setForceLedgerResponse(forceLedgerResponse);
+            Response resp = response.build();
+            sendResponse(forceLedgerResponse.getStatus(), resp, 
requestProcessor.getForceLedgerRequestStats());
+        }
+    }
+
+    /**
+     * this toString method filters out body and masterKey from the output.
+     * masterKey contains the password of the ledger and body is customer data,
+     * so it is not appropriate to have these in logs or system output.
+     */
+    @Override
+    public String toString() {
+        return RequestUtils.toSafeString(request);
+    }
+}
+
+
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
index fb61165..d384c81 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
@@ -89,6 +89,11 @@ class RequestUtils {
             stringHelper.add("ledgerId", writeLacRequest.getLedgerId());
             stringHelper.add("lac", writeLacRequest.getLac());
             return stringHelper.toString();
+        } else if (request.hasForceLedgerRequest()) {
+            BookkeeperProtocol.ForceLedgerRequest forceLedgerRequest = 
request.getForceLedgerRequest();
+            includeHeaderFields(stringHelper, header);
+            stringHelper.add("ledgerId", forceLedgerRequest.getLedgerId());
+            return stringHelper.toString();
         } else {
             return request.toString();
         }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
new file mode 100644
index 0000000..90a51c8
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ForceLedgerProcessorV3}.
+ */
+public class ForceLedgerProcessorV3Test {
+
+    private Request request;
+    private ForceLedgerProcessorV3 processor;
+    private Channel channel;
+    private BookieRequestProcessor requestProcessor;
+    private Bookie bookie;
+
+    @Before
+    public void setup() {
+        request = Request.newBuilder()
+            .setHeader(BKPacketHeader.newBuilder()
+                .setTxnId(System.currentTimeMillis())
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.ADD_ENTRY)
+                .build())
+            .setForceLedgerRequest(ForceLedgerRequest.newBuilder()
+                .setLedgerId(System.currentTimeMillis())
+                .build())
+            .build();
+        channel = mock(Channel.class);
+        bookie = mock(Bookie.class);
+        requestProcessor = mock(BookieRequestProcessor.class);
+        when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getForceLedgerStats())
+            
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger"));
+        when(requestProcessor.getForceLedgerRequestStats())
+            
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger_request"));
+        processor = new ForceLedgerProcessorV3(
+            request,
+            channel,
+            requestProcessor);
+    }
+
+    @Test
+    public void testForceLedger() throws Exception {
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            WriteCallback wc = invocationOnMock.getArgument(1);
+
+            wc.writeComplete(
+                0,
+                request.getForceLedgerRequest().getLedgerId(),
+                Bookie.METAENTRY_ID_FORCE_LEDGER,
+                null,
+                null);
+            return null;
+        }).when(bookie).forceLedger(
+            eq(request.getForceLedgerRequest().getLedgerId()),
+            any(WriteCallback.class),
+            same(channel));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(bookie, times(1))
+            .forceLedger(eq(request.getForceLedgerRequest().getLedgerId()),
+                    any(WriteCallback.class), same(channel));
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.EOK, response.getStatus());
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to