This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ba7db47 ISSUE #230: Enable Checkstyle on the proto package
ba7db47 is described below
commit ba7db47ab912269eca160bc1112f46cc417688d1
Author: Aaron Coburn <[email protected]>
AuthorDate: Fri Dec 1 17:05:20 2017 +0100
ISSUE #230: Enable Checkstyle on the proto package
This is part of #230 and adds Checkstyle verification to the proto
package in bookeeper-server.
The changes here should generally be non-controversial, but I would like to
highlight one change for particular review. In the `PerChannelBookieClient`
class, there is a `public static final` field called `txnIdGenerator`. The
checkstyle rules require that such public fields are ALL_CAPS. So one option
would be to change that field to `TX_ID_GENERATOR` (this would be a change to
the public API).
The relevant diff is here:
https://github.com/apache/bookkeeper/compare/master...acoburn:checkstyle_proto?expand=1#diff-e50ee2c1aec1539ea185a94605b0e550R143
However, that field is never used outside this class, so the other option
is to make the field private and keep the name as-is. (Making the field private
is also a change to the public API)
I chose the second option because the field seemed more like an internal
field. If, however, the field should be left public, I can add a commit that
changes the name to `TX_ID_GENERATOR` so that it passes the checkstyle rules.
Author: Aaron Coburn <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie
Guo <[email protected]>
This closes #792 from acoburn/checkstyle_proto, closes #230
---
.../org/apache/bookkeeper/proto/AuthHandler.java | 15 ++-
.../java/org/apache/bookkeeper/proto/BKStats.java | 44 ++++---
.../org/apache/bookkeeper/proto/BookieClient.java | 59 +++++-----
.../apache/bookkeeper/proto/BookieNettyServer.java | 63 +++++-----
.../bookkeeper/proto/BookieProtoEncoding.java | 124 +++++++++++--------
.../apache/bookkeeper/proto/BookieProtocol.java | 131 ++++++++++++---------
.../bookkeeper/proto/BookieRequestHandler.java | 11 +-
.../bookkeeper/proto/BookieRequestProcessor.java | 82 +++++++------
.../org/apache/bookkeeper/proto/BookieServer.java | 16 +--
.../proto/BookkeeperInternalCallbacks.java | 40 +++++--
.../bookkeeper/proto/ClientConnectionPeer.java | 2 +-
.../apache/bookkeeper/proto/ConnectionPeer.java | 26 ++--
.../proto/DefaultPerChannelBookieClientPool.java | 9 +-
.../bookkeeper/proto/GetBookieInfoProcessorV3.java | 9 +-
.../bookkeeper/proto/LocalBookiesRegistry.java | 20 ++--
.../proto/LongPollReadEntryProcessorV3.java | 21 ++--
.../bookkeeper/proto/PacketProcessorBase.java | 9 +-
.../bookkeeper/proto/PacketProcessorBaseV3.java | 3 +
.../bookkeeper/proto/PerChannelBookieClient.java | 102 ++++++++--------
.../proto/PerChannelBookieClientPool.java | 2 +-
.../bookkeeper/proto/ReadEntryProcessor.java | 4 +-
.../bookkeeper/proto/ReadEntryProcessorV3.java | 8 +-
.../bookkeeper/proto/ReadLacProcessorV3.java | 17 +--
.../org/apache/bookkeeper/proto/ServerStats.java | 29 +++--
.../bookkeeper/proto/WriteEntryProcessor.java | 6 +-
.../bookkeeper/proto/WriteEntryProcessorV3.java | 2 +-
.../bookkeeper/proto/WriteLacProcessorV3.java | 18 +--
...ClientConnectionPeer.java => package-info.java} | 11 +-
.../resources/bookkeeper/server-suppressions.xml | 1 -
29 files changed, 493 insertions(+), 391 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 0780785..1b1f60f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.proto;
+import static
org.apache.bookkeeper.auth.AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME;
+
import com.google.protobuf.ByteString;
import io.netty.channel.Channel;
@@ -35,7 +37,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.auth.AuthCallbacks;
-import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.AuthToken;
import org.apache.bookkeeper.auth.BookieAuthProvider;
import org.apache.bookkeeper.auth.ClientAuthProvider;
@@ -129,8 +130,7 @@ class AuthHandler {
&& req.hasStartTLSRequest()) {
super.channelRead(ctx, msg);
} else {
- BookkeeperProtocol.Response.Builder builder
- = BookkeeperProtocol.Response.newBuilder()
+ BookkeeperProtocol.Response.Builder builder =
BookkeeperProtocol.Response.newBuilder()
.setHeader(req.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EUA);
@@ -276,13 +276,12 @@ class AuthHandler {
} else {
assert (resp.hasAuthResponse());
BookkeeperProtocol.AuthMessage am =
resp.getAuthResponse();
- if
(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())){
+ if
(AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())){
SocketAddress remote =
ctx.channel().remoteAddress();
LOG.info("Authentication is not enabled."
+ "Considering this client {0}
authenticated", remote);
- AuthHandshakeCompleteCallback
authHandshakeCompleteCallback
- = new AuthHandshakeCompleteCallback(ctx);
-
authHandshakeCompleteCallback.operationComplete(BKException.Code.OK, null);
+ AuthHandshakeCompleteCallback cb = new
AuthHandshakeCompleteCallback(ctx);
+ cb.operationComplete(BKException.Code.OK,
null);
return;
}
byte[] payload = am.getPayload().toByteArray();
@@ -319,7 +318,7 @@ class AuthHandler {
}
} else if (msg instanceof BookieProtocol.Request) {
// let auth messages through, queue the rest
- BookieProtocol.Request req = (BookieProtocol.Request)msg;
+ BookieProtocol.Request req = (BookieProtocol.Request) msg;
if (BookkeeperProtocol.OperationType.AUTH.getNumber() ==
req.getOpCode()) {
super.write(ctx, msg, promise);
super.flush(ctx);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BKStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BKStats.java
index 9aa80a0..144767d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BKStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BKStats.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Bookie Server Stats
+ * Bookie Server Stats.
*/
public class BKStats {
private static final Logger LOG = LoggerFactory.getLogger(BKStats.class);
@@ -38,7 +38,7 @@ public class BKStats {
}
/**
- * A read view of stats, also used in CompositeViewData to expose to JMX
+ * A read view of stats, also used in CompositeViewData to expose to JMX.
*/
public static class OpStatData {
private final long maxLatency, minLatency;
@@ -84,10 +84,10 @@ public class BKStats {
}
/**
- * Operation Statistics
+ * Operation Statistics.
*/
public static class OpStats {
- static final int NUM_BUCKETS = 3*9 + 2;
+ static final int NUM_BUCKETS = 3 * 9 + 2;
long maxLatency = 0;
long minLatency = Long.MAX_VALUE;
@@ -99,16 +99,16 @@ public class BKStats {
OpStats() {}
/**
- * Increment number of failed operations
+ * Increment number of failed operations.
*/
- synchronized public void incrementFailedOps() {
+ public synchronized void incrementFailedOps() {
++numFailedOps;
}
/**
- * Update Latency
+ * Update Latency.
*/
- synchronized public void updateLatency(long latency) {
+ public synchronized void updateLatency(long latency) {
if (latency < 0) {
// less than 0ms . Ideally this should not happen.
// We have seen this latency negative in some cases due to the
@@ -127,11 +127,11 @@ public class BKStats {
}
int bucket;
if (latency <= 100) { // less than 100ms
- bucket = (int)(latency / 10);
+ bucket = (int) (latency / 10);
} else if (latency <= 1000) { // 100ms ~ 1000ms
- bucket = 1 * 9 + (int)(latency / 100);
+ bucket = 1 * 9 + (int) (latency / 100);
} else if (latency <= 10000) { // 1s ~ 10s
- bucket = 2 * 9 + (int)(latency / 1000);
+ bucket = 2 * 9 + (int) (latency / 1000);
} else { // more than 10s
bucket = 3 * 9 + 1;
}
@@ -141,7 +141,7 @@ public class BKStats {
public OpStatData toOpStatData() {
double avgLatency = numSuccessOps > 0 ? totalLatency /
numSuccessOps : 0.0f;
StringBuilder sb = new StringBuilder();
- for (int i=0; i<NUM_BUCKETS; i++) {
+ for (int i = 0; i < NUM_BUCKETS; i++) {
sb.append(latencyBuckets[i]);
if (i != NUM_BUCKETS - 1) {
sb.append(',');
@@ -152,7 +152,7 @@ public class BKStats {
}
/**
- * Diff with base opstats
+ * Diff with base opstats.
*
* @param base
* base opstats
@@ -172,10 +172,9 @@ public class BKStats {
}
/**
- * Copy stats from other OpStats
+ * Copy stats from other OpStats.
*
* @param other other op stats
- * @return void
*/
public synchronized void copyOf(OpStats other) {
this.maxLatency = other.maxLatency;
@@ -196,13 +195,13 @@ public class BKStats {
OpStats[] stats = new OpStats[NUM_STATS];
private BKStats() {
- for (int i=0; i<NUM_STATS; i++) {
+ for (int i = 0; i < NUM_STATS; i++) {
stats[i] = new OpStats();
}
}
/**
- * Stats of operations
+ * Stats of operations.
*
* @return op stats
*/
@@ -211,7 +210,7 @@ public class BKStats {
}
/**
- * Set stats of a specified operation
+ * Set stats of a specified operation.
*
* @param type operation type
* @param stat operation stats
@@ -221,27 +220,26 @@ public class BKStats {
}
/**
- * Diff with base stats
+ * Diff with base stats.
*
* @param base base stats
* @return diff stats
*/
public BKStats diff(BKStats base) {
BKStats diff = new BKStats();
- for (int i=0; i<NUM_STATS; i++) {
+ for (int i = 0; i < NUM_STATS; i++) {
diff.setOpStats(i, stats[i].diff(base.getOpStats(i)));
}
return diff;
}
/**
- * Copy stats from other stats
+ * Copy stats from other stats.
*
* @param other other stats
- * @return void
*/
public void copyOf(BKStats other) {
- for (int i=0; i<NUM_STATS; i++) {
+ for (int i = 0; i < NUM_STATS; i++) {
stats[i].copyOf(other.getOpStats(i));
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index ae7f806..183bb7b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -22,6 +22,20 @@ package org.apache.bookkeeper.proto;
import static com.google.common.base.Charsets.UTF_8;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ExtensionRegistry;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@@ -40,8 +54,8 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
-import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
@@ -51,20 +65,6 @@ import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ExtensionRegistry;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
/**
* Implements the client-side part of the BookKeeper protocol.
*
@@ -81,8 +81,8 @@ public class BookieClient implements
PerChannelBookieClientFactory {
new ConcurrentHashMap<BookieSocketAddress,
PerChannelBookieClientPool>();
final HashedWheelTimer requestTimer;
- final private ClientAuthProvider.Factory authProviderFactory;
- final private ExtensionRegistry registry;
+ private final ClientAuthProvider.Factory authProviderFactory;
+ private final ExtensionRegistry registry;
private final ClientConfiguration conf;
private volatile boolean closed;
@@ -337,8 +337,8 @@ public class BookieClient implements
PerChannelBookieClientFactory {
this.recyclerHandle = recyclerHandle;
}
- private static final Recycler<ChannelReadyForAddEntryCallback> RECYCLER
- = new Recycler<ChannelReadyForAddEntryCallback>() {
+ private static final Recycler<ChannelReadyForAddEntryCallback>
RECYCLER =
+ new Recycler<ChannelReadyForAddEntryCallback>() {
protected ChannelReadyForAddEntryCallback newObject(
Recycler.Handle<ChannelReadyForAddEntryCallback>
recyclerHandle) {
return new
ChannelReadyForAddEntryCallback(recyclerHandle);
@@ -380,17 +380,19 @@ public class BookieClient implements
PerChannelBookieClientFactory {
}
}
- public void readLac(final BookieSocketAddress addr, final long ledgerId,
final ReadLacCallback cb, final Object ctx) {
+ public void readLac(final BookieSocketAddress addr, final long ledgerId,
final ReadLacCallback cb,
+ final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
-
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, null, null, ctx);
+
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, null, null,
+ ctx);
return;
}
client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
- public void operationComplete(final int
rc,PerChannelBookieClient pcbc) {
+ public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
try {
executor.submitOrdered(ledgerId, new
SafeRunnable() {
@@ -438,8 +440,8 @@ public class BookieClient implements
PerChannelBookieClientFactory {
closeLock.readLock().unlock();
}
}
-
-
+
+
public void readEntryWaitForLACUpdate(final BookieSocketAddress addr,
final long ledgerId,
final long entryId,
@@ -465,7 +467,8 @@ public class BookieClient implements
PerChannelBookieClientFactory {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
return;
}
- pcbc.readEntryWaitForLACUpdate(ledgerId, entryId,
previousLAC, timeOutInMillis, piggyBackEntry, cb, ctx);
+ pcbc.readEntryWaitForLACUpdate(ledgerId, entryId,
previousLAC, timeOutInMillis, piggyBackEntry, cb,
+ ctx);
}
}, ledgerId);
} finally {
@@ -473,12 +476,14 @@ public class BookieClient implements
PerChannelBookieClientFactory {
}
}
- public void getBookieInfo(final BookieSocketAddress addr, final long
requested, final GetBookieInfoCallback cb, final Object ctx) {
+ public void getBookieInfo(final BookieSocketAddress addr, final long
requested, final GetBookieInfoCallback cb,
+ final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
-
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
new BookieInfo(), ctx);
+
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
new BookieInfo(),
+ ctx);
return;
}
client.obtain(new GenericCallback<PerChannelBookieClient>() {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 2308bc6..1ab80c9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -20,22 +20,8 @@
*/
package org.apache.bookkeeper.proto;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.auth.BookieAuthProvider;
-import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
-import org.apache.bookkeeper.processor.RequestProcessor;
-import org.apache.commons.lang.SystemUtils;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ExtensionRegistry;
import io.netty.bootstrap.ServerBootstrap;
@@ -64,25 +50,40 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.ssl.SslHandler;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Collection;
-import java.util.Collections;
import java.security.cert.Certificate;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.BookKeeperPrincipal;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.bookkeeper.auth.BookieAuthProvider;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.processor.RequestProcessor;
+import org.apache.commons.lang.SystemUtils;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Netty server for serving bookie requests
+ * Netty server for serving bookie requests.
*/
class BookieNettyServer {
- private final static Logger LOG =
LoggerFactory.getLogger(BookieNettyServer.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(BookieNettyServer.class);
final int maxFrameSize;
final ServerConfiguration conf;
@@ -296,7 +297,8 @@ class BookieNettyServer {
}
}
- BookieSideConnectionPeerContextHandler contextHandler =
new BookieSideConnectionPeerContextHandler();
+ BookieSideConnectionPeerContextHandler contextHandler =
+ new BookieSideConnectionPeerContextHandler();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("lengthbaseddecoder", new
LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
@@ -304,10 +306,12 @@ class BookieNettyServer {
pipeline.addLast("bookieProtoDecoder", new
BookieProtoEncoding.RequestDecoder(registry));
pipeline.addLast("bookieProtoEncoder", new
BookieProtoEncoding.ResponseEncoder(registry));
- pipeline.addLast("bookieAuthHandler", new
AuthHandler.ServerSideHandler(contextHandler.getConnectionPeer(),
authProviderFactory));
+ pipeline.addLast("bookieAuthHandler", new
AuthHandler.ServerSideHandler(
+ contextHandler.getConnectionPeer(),
authProviderFactory));
ChannelInboundHandler requestHandler = isRunning.get()
- ? new BookieRequestHandler(conf, requestProcessor,
allChannels) : new RejectRequestHandler();
+ ? new BookieRequestHandler(conf, requestProcessor,
allChannels)
+ : new RejectRequestHandler();
pipeline.addLast("bookieRequestHandler", requestHandler);
pipeline.addLast("contextHandler", contextHandler);
@@ -351,7 +355,8 @@ class BookieNettyServer {
}
}
- BookieSideConnectionPeerContextHandler contextHandler =
new BookieSideConnectionPeerContextHandler();
+ BookieSideConnectionPeerContextHandler contextHandler =
+ new BookieSideConnectionPeerContextHandler();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("lengthbaseddecoder", new
LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
@@ -359,10 +364,12 @@ class BookieNettyServer {
pipeline.addLast("bookieProtoDecoder", new
BookieProtoEncoding.RequestDecoder(registry));
pipeline.addLast("bookieProtoEncoder", new
BookieProtoEncoding.ResponseEncoder(registry));
- pipeline.addLast("bookieAuthHandler", new
AuthHandler.ServerSideHandler(contextHandler.getConnectionPeer(),
authProviderFactory));
+ pipeline.addLast("bookieAuthHandler", new
AuthHandler.ServerSideHandler(
+ contextHandler.getConnectionPeer(),
authProviderFactory));
ChannelInboundHandler requestHandler = isRunning.get()
- ? new BookieRequestHandler(conf, requestProcessor,
allChannels) : new RejectRequestHandler();
+ ? new BookieRequestHandler(conf, requestProcessor,
allChannels)
+ : new RejectRequestHandler();
pipeline.addLast("bookieRequestHandler", requestHandler);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index df7f3b2..18cecf8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,16 +20,6 @@
*/
package org.apache.bookkeeper.proto;
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.util.List;
-
-import org.apache.bookkeeper.client.MacDigestManager;
-import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
-import org.apache.bookkeeper.util.DoubleByteBuf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -45,10 +35,26 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+
+import org.apache.bookkeeper.client.MacDigestManager;
+import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
+import org.apache.bookkeeper.util.DoubleByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class for encoding and decoding the Bookkeeper protocol.
+ */
public class BookieProtoEncoding {
private static final Logger LOG =
LoggerFactory.getLogger(BookieProtoEncoding.class);
- public static interface EnDecoder {
+ /**
+ * An encoder/decoder interface for the Bookkeeper protocol.
+ */
+ public interface EnDecoder {
/**
* Encode a <i>object</i> into channel buffer.
*
@@ -57,7 +63,7 @@ public class BookieProtoEncoding {
* @return encode buffer.
* @throws Exception
*/
- public Object encode(Object object, ByteBufAllocator allocator) throws
Exception;
+ Object encode(Object object, ByteBufAllocator allocator) throws
Exception;
/**
* Decode a <i>packet</i> into an object.
@@ -67,15 +73,18 @@ public class BookieProtoEncoding {
* @return parsed object.
* @throws Exception
*/
- public Object decode(ByteBuf packet) throws Exception;
+ Object decode(ByteBuf packet) throws Exception;
}
+ /**
+ * An encoder/decoder for the Bookkeeper protocol before version 3.
+ */
public static class RequestEnDeCoderPreV3 implements EnDecoder {
final ExtensionRegistry extensionRegistry;
//This empty master key is used when an empty password is provided
which is the hash of an empty string
- private final static byte[] emptyPasswordMasterKey;
+ private static final byte[] emptyPasswordMasterKey;
static {
try {
emptyPasswordMasterKey = MacDigestManager.genDigest("ledger",
new byte[0]);
@@ -94,9 +103,9 @@ public class BookieProtoEncoding {
if (!(msg instanceof BookieProtocol.Request)) {
return msg;
}
- BookieProtocol.Request r = (BookieProtocol.Request)msg;
+ BookieProtocol.Request r = (BookieProtocol.Request) msg;
if (r instanceof BookieProtocol.AddRequest) {
- BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest)r;
+ BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r;
int totalHeaderSize = 4 // for the header
+ BookieProtocol.MASTER_KEY_LENGTH; // for the master key
ByteBuf buf = allocator.buffer(totalHeaderSize);
@@ -123,7 +132,7 @@ public class BookieProtoEncoding {
return buf;
} else if (r instanceof BookieProtocol.AuthRequest) {
- BookkeeperProtocol.AuthMessage am =
((BookieProtocol.AuthRequest)r).getAuthMessage();
+ BookkeeperProtocol.AuthMessage am =
((BookieProtocol.AuthRequest) r).getAuthMessage();
int totalHeaderSize = 4; // for request type
int totalSize = totalHeaderSize + am.getSerializedSize();
ByteBuf buf = allocator.buffer(totalSize);
@@ -174,8 +183,7 @@ public class BookieProtoEncoding {
return new BookieProtocol.ReadRequest(version, ledgerId,
entryId, flags);
}
case BookieProtocol.AUTH:
- BookkeeperProtocol.AuthMessage.Builder builder
- = BookkeeperProtocol.AuthMessage.newBuilder();
+ BookkeeperProtocol.AuthMessage.Builder builder =
BookkeeperProtocol.AuthMessage.newBuilder();
builder.mergeFrom(new ByteBufInputStream(packet),
extensionRegistry);
return new BookieProtocol.AuthRequest(version,
builder.build());
}
@@ -209,6 +217,9 @@ public class BookieProtoEncoding {
}
}
+ /**
+ * A response encoder/decoder for the Bookkeeper protocol before version 3.
+ */
public static class ResponseEnDeCoderPreV3 implements EnDecoder {
final ExtensionRegistry extensionRegistry;
@@ -222,7 +233,7 @@ public class BookieProtoEncoding {
if (!(msg instanceof BookieProtocol.Response)) {
return msg;
}
- BookieProtocol.Response r = (BookieProtocol.Response)msg;
+ BookieProtocol.Response r = (BookieProtocol.Response) msg;
ByteBuf buf = allocator.buffer(24);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), (short) 0));
@@ -285,8 +296,7 @@ public class BookieProtoEncoding {
}
case BookieProtocol.AUTH:
ByteBufInputStream bufStream = new ByteBufInputStream(buffer);
- BookkeeperProtocol.AuthMessage.Builder builder
- = BookkeeperProtocol.AuthMessage.newBuilder();
+ BookkeeperProtocol.AuthMessage.Builder builder =
BookkeeperProtocol.AuthMessage.newBuilder();
builder.mergeFrom(bufStream, extensionRegistry);
BookkeeperProtocol.AuthMessage am = builder.build();
return new BookieProtocol.AuthResponse(version, am);
@@ -296,6 +306,9 @@ public class BookieProtoEncoding {
}
}
+ /**
+ * A request encoder/decoder for the Bookkeeper protocol version 3.
+ */
public static class RequestEnDecoderV3 implements EnDecoder {
final ExtensionRegistry extensionRegistry;
@@ -316,6 +329,9 @@ public class BookieProtoEncoding {
}
+ /**
+ * A response encoder/decoder for the Bookkeeper protocol version 3.
+ */
public static class ResponseEnDecoderV3 implements EnDecoder {
final ExtensionRegistry extensionRegistry;
@@ -353,23 +369,26 @@ public class BookieProtoEncoding {
return buf;
}
+ /**
+ * A request message encoder.
+ */
@Sharable
public static class RequestEncoder extends MessageToMessageEncoder<Object>
{
- final EnDecoder REQ_PREV3;
- final EnDecoder REQ_V3;
+ final EnDecoder reqPreV3;
+ final EnDecoder reqV3;
public RequestEncoder(ExtensionRegistry extensionRegistry) {
- REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
- REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+ reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+ reqV3 = new RequestEnDecoderV3(extensionRegistry);
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg,
List<Object> out) throws Exception {
if (msg instanceof BookkeeperProtocol.Request) {
- out.add(REQ_V3.encode(msg, ctx.alloc()));
+ out.add(reqV3.encode(msg, ctx.alloc()));
} else if (msg instanceof BookieProtocol.Request) {
- out.add(REQ_PREV3.encode(msg, ctx.alloc()));
+ out.add(reqPreV3.encode(msg, ctx.alloc()));
} else {
LOG.error("Invalid request to encode to {}: {}",
ctx.channel(), msg.getClass().getName());
out.add(msg);
@@ -377,15 +396,18 @@ public class BookieProtoEncoding {
}
}
+ /**
+ * A request message decoder.
+ */
@Sharable
public static class RequestDecoder extends MessageToMessageDecoder<Object>
{
- final EnDecoder REQ_PREV3;
- final EnDecoder REQ_V3;
+ final EnDecoder reqPreV3;
+ final EnDecoder reqV3;
boolean usingV3Protocol;
RequestDecoder(ExtensionRegistry extensionRegistry) {
- REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
- REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+ reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+ reqV3 = new RequestEnDecoderV3(extensionRegistry);
usingV3Protocol = true;
}
@@ -403,26 +425,29 @@ public class BookieProtoEncoding {
if (usingV3Protocol) {
try {
- out.add(REQ_V3.decode(buffer));
+ out.add(reqV3.decode(buffer));
} catch (InvalidProtocolBufferException e) {
usingV3Protocol = false;
buffer.resetReaderIndex();
- out.add(REQ_PREV3.decode(buffer));
+ out.add(reqPreV3.decode(buffer));
}
} else {
- out.add(REQ_PREV3.decode(buffer));
+ out.add(reqPreV3.decode(buffer));
}
}
}
+ /**
+ * A response message encoder.
+ */
@Sharable
public static class ResponseEncoder extends
MessageToMessageEncoder<Object> {
- final EnDecoder REP_PREV3;
- final EnDecoder REP_V3;
+ final EnDecoder repPreV3;
+ final EnDecoder repV3;
ResponseEncoder(ExtensionRegistry extensionRegistry) {
- REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
- REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+ repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+ repV3 = new ResponseEnDecoderV3(extensionRegistry);
}
@Override
@@ -432,9 +457,9 @@ public class BookieProtoEncoding {
LOG.debug("Encode response {} to channel {}.", msg,
ctx.channel());
}
if (msg instanceof BookkeeperProtocol.Response) {
- out.add(REP_V3.encode(msg, ctx.alloc()));
+ out.add(repV3.encode(msg, ctx.alloc()));
} else if (msg instanceof BookieProtocol.Response) {
- out.add(REP_PREV3.encode(msg, ctx.alloc()));
+ out.add(repPreV3.encode(msg, ctx.alloc()));
} else {
LOG.error("Invalid response to encode to {}: {}",
ctx.channel(), msg.getClass().getName());
out.add(msg);
@@ -442,15 +467,18 @@ public class BookieProtoEncoding {
}
}
+ /**
+ * A response message decoder.
+ */
@Sharable
public static class ResponseDecoder extends
MessageToMessageDecoder<Object> {
- final EnDecoder REP_PREV3;
- final EnDecoder REP_V3;
+ final EnDecoder repPreV3;
+ final EnDecoder repV3;
boolean usingV3Protocol;
ResponseDecoder(ExtensionRegistry extensionRegistry) {
- REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
- REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+ repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+ repV3 = new ResponseEnDecoderV3(extensionRegistry);
usingV3Protocol = true;
}
@@ -467,15 +495,15 @@ public class BookieProtoEncoding {
if (usingV3Protocol) {
try {
- out.add(REP_V3.decode(buffer));
+ out.add(repV3.decode(buffer));
} catch (InvalidProtocolBufferException e) {
usingV3Protocol = false;
buffer.resetReaderIndex();
- out.add(REP_PREV3.decode(buffer));
+ out.add(repPreV3.decode(buffer));
}
} else {
// If in the same connection we already got preV3 messages,
don't try again to decode V3 messages
- out.add(REP_PREV3.decode(buffer));
+ out.add(repPreV3.decode(buffer));
}
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 879489a..4ad23d9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.proto;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,12 +18,12 @@ package org.apache.bookkeeper.proto;
* under the License.
*
*/
+package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import io.netty.util.ReferenceCounted;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
@@ -40,22 +38,22 @@ public interface BookieProtocol {
/**
* Lowest protocol version which will work with the bookie.
*/
- public static final byte LOWEST_COMPAT_PROTOCOL_VERSION = 0;
+ byte LOWEST_COMPAT_PROTOCOL_VERSION = 0;
/**
* Current version of the protocol, which client will use.
*/
- public static final byte CURRENT_PROTOCOL_VERSION = 2;
+ byte CURRENT_PROTOCOL_VERSION = 2;
/**
* Entry Entry ID. To be used when no valid entry id can be assigned.
*/
- public static final long INVALID_ENTRY_ID = -1;
+ long INVALID_ENTRY_ID = -1;
/**
- * Entry identifier representing a request to obtain the last add entry
confirmed
+ * Entry identifier representing a request to obtain the last add entry
confirmed.
*/
- public static final long LAST_ADD_CONFIRMED = -1;
+ long LAST_ADD_CONFIRMED = -1;
/**
* The length of the master key in add packets. This
@@ -63,7 +61,7 @@ public interface BookieProtocol {
* is always generated using the MacDigestManager regardless
* of whether Mac is being used for the digest or not
*/
- public static final int MASTER_KEY_LENGTH = 20;
+ int MASTER_KEY_LENGTH = 20;
/**
* The first int of a packet is the header.
@@ -72,10 +70,10 @@ public interface BookieProtocol {
* and just had an int representing the opCode as the
* first int. This handles that case also.
*/
- final static class PacketHeader {
+ final class PacketHeader {
public static int toInt(byte version, byte opCode, short flags) {
if (version == 0) {
- return (int)opCode;
+ return (int) opCode;
} else {
return ((version & 0xFF) << 24)
| ((opCode & 0xFF) << 16)
@@ -84,7 +82,7 @@ public interface BookieProtocol {
}
public static byte getVersion(int packetHeader) {
- return (byte)(packetHeader >> 24);
+ return (byte) (packetHeader >> 24);
}
public static byte getOpCode(int packetHeader) {
@@ -92,16 +90,16 @@ public interface BookieProtocol {
if (version == 0) {
return (byte) packetHeader;
} else {
- return (byte)((packetHeader >> 16) & 0xFF);
+ return (byte) ((packetHeader >> 16) & 0xFF);
}
}
public static short getFlags(int packetHeader) {
- byte version = (byte)(packetHeader >> 24);
+ byte version = (byte) (packetHeader >> 24);
if (version == 0) {
return 0;
} else {
- return (short)(packetHeader & 0xFFFF);
+ return (short) (packetHeader & 0xFFFF);
}
}
}
@@ -112,7 +110,7 @@ public interface BookieProtocol {
* error code followed by the 8-byte ledger number and 8-byte entry number
* of the entry written.
*/
- public static final byte ADDENTRY = 1;
+ byte ADDENTRY = 1;
/**
* The Read entry request payload will be the ledger number and entry
number
* to read. (The ledger number is an 8-byte integer and the entry number is
@@ -122,69 +120,72 @@ public interface BookieProtocol {
* requested. (Note that the first sixteen bytes of the entry happen to be
* the ledger number and entry number as well.)
*/
- public static final byte READENTRY = 2;
+ byte READENTRY = 2;
/**
* Auth message. This code is for passing auth messages between the auth
* providers on the client and bookie. The message payload is determined
* by the auth providers themselves.
*/
- public static final byte AUTH = 3;
- public static final byte READ_LAC = 4;
- public static final byte WRITE_LAC = 5;
- public static final byte GET_BOOKIE_INFO = 6;
+ byte AUTH = 3;
+ byte READ_LAC = 4;
+ byte WRITE_LAC = 5;
+ byte GET_BOOKIE_INFO = 6;
/**
- * The error code that indicates success
+ * The error code that indicates success.
*/
- public static final int EOK = 0;
+ int EOK = 0;
/**
- * The error code that indicates that the ledger does not exist
+ * The error code that indicates that the ledger does not exist.
*/
- public static final int ENOLEDGER = 1;
+ int ENOLEDGER = 1;
/**
- * The error code that indicates that the requested entry does not exist
+ * The error code that indicates that the requested entry does not exist.
*/
- public static final int ENOENTRY = 2;
+ int ENOENTRY = 2;
/**
- * The error code that indicates an invalid request type
+ * The error code that indicates an invalid request type.
*/
- public static final int EBADREQ = 100;
+ int EBADREQ = 100;
/**
- * General error occurred at the server
+ * General error occurred at the server.
*/
- public static final int EIO = 101;
+ int EIO = 101;
/**
- * Unauthorized access to ledger
+ * Unauthorized access to ledger.
*/
- public static final int EUA = 102;
+ int EUA = 102;
/**
- * The server version is incompatible with the client
+ * The server version is incompatible with the client.
*/
- public static final int EBADVERSION = 103;
+ int EBADVERSION = 103;
/**
- * Attempt to write to fenced ledger
+ * Attempt to write to fenced ledger.
*/
- public static final int EFENCED = 104;
+ int EFENCED = 104;
/**
- * The server is running as read-only mode
+ * The server is running as read-only mode.
*/
- public static final int EREADONLY = 105;
+ int EREADONLY = 105;
/**
- * Too many concurrent requests
+ * Too many concurrent requests.
*/
- public static final int ETOOMANYREQUESTS = 106;
+ int ETOOMANYREQUESTS = 106;
- public static final short FLAG_NONE = 0x0;
- public static final short FLAG_DO_FENCING = 0x0001;
- public static final short FLAG_RECOVERY_ADD = 0x0002;
+ short FLAG_NONE = 0x0;
+ short FLAG_DO_FENCING = 0x0001;
+ short FLAG_RECOVERY_ADD = 0x0002;
- static class Request {
+ /**
+ * A Bookie request object.
+ */
+ class Request {
byte protocolVersion;
byte opCode;
long ledgerId;
@@ -239,7 +240,10 @@ public interface BookieProtocol {
public void recycle() {}
}
- static class AddRequest extends Request {
+ /**
+ * A Request that adds data.
+ */
+ class AddRequest extends Request {
ByteBuf data;
static AddRequest create(byte protocolVersion, long ledgerId,
@@ -289,7 +293,10 @@ public interface BookieProtocol {
}
}
- static class ReadRequest extends Request {
+ /**
+ * A Request that reads data.
+ */
+ class ReadRequest extends Request {
ReadRequest(byte protocolVersion, long ledgerId, long entryId, short
flags) {
init(protocolVersion, READENTRY, ledgerId, entryId, flags, null);
}
@@ -304,7 +311,10 @@ public interface BookieProtocol {
}
}
- static class AuthRequest extends Request {
+ /**
+ * An authentication request.
+ */
+ class AuthRequest extends Request {
final AuthMessage authMessage;
AuthRequest(byte protocolVersion, AuthMessage authMessage) {
@@ -317,7 +327,10 @@ public interface BookieProtocol {
}
}
- static abstract class Response {
+ /**
+ * A response object.
+ */
+ abstract class Response {
byte protocolVersion;
byte opCode;
int errorCode;
@@ -362,7 +375,10 @@ public interface BookieProtocol {
abstract void recycle();
}
- static class ReadResponse extends Response {
+ /**
+ * A request that reads data.
+ */
+ class ReadResponse extends Response {
final ByteBuf data;
ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long
entryId) {
@@ -387,7 +403,10 @@ public interface BookieProtocol {
}
}
- static class AddResponse extends Response {
+ /**
+ * A response that adds data.
+ */
+ class AddResponse extends Response {
static AddResponse create(byte protocolVersion, int errorCode, long
ledgerId, long entryId) {
AddResponse response = RECYCLER.get();
response.init(protocolVersion, ADDENTRY, errorCode, ledgerId,
entryId);
@@ -410,7 +429,10 @@ public interface BookieProtocol {
}
}
- static class ErrorResponse extends Response {
+ /**
+ * An error response.
+ */
+ class ErrorResponse extends Response {
ErrorResponse(byte protocolVersion, byte opCode, int errorCode,
long ledgerId, long entryId) {
init(protocolVersion, opCode, errorCode, ledgerId, entryId);
@@ -420,7 +442,10 @@ public interface BookieProtocol {
}
}
- static class AuthResponse extends Response {
+ /**
+ * A response with an authentication message.
+ */
+ class AuthResponse extends Response {
final AuthMessage authMessage;
AuthResponse(byte protocolVersion, AuthMessage authMessage) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
index 1315dc9..d2e9548 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
@@ -20,6 +20,9 @@
*/
package org.apache.bookkeeper.proto;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
import java.nio.channels.ClosedChannelException;
@@ -28,16 +31,12 @@ import org.apache.bookkeeper.processor.RequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.group.ChannelGroup;
-
/**
- * Serverside handler for bookkeeper requests
+ * Serverside handler for bookkeeper requests.
*/
class BookieRequestHandler extends ChannelInboundHandlerAdapter {
- private final static Logger LOG =
LoggerFactory.getLogger(BookieRequestHandler.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(BookieRequestHandler.class);
private final RequestProcessor requestProcessor;
private final ChannelGroup allChannels;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 7c51143..3f2edff 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -20,16 +20,40 @@
*/
package org.apache.bookkeeper.proto;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_WAIT;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
+
import io.netty.channel.Channel;
import io.netty.handler.ssl.SslHandler;
+import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.HashedWheelTimer;
+
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.AuthToken;
import org.apache.bookkeeper.bookie.Bookie;
@@ -46,28 +70,9 @@ import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_WAIT;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
-
+/**
+ * An implementation of the RequestProcessor interface.
+ */
public class BookieRequestProcessor implements RequestProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(BookieRequestProcessor.class);
@@ -94,18 +99,18 @@ public class BookieRequestProcessor implements
RequestProcessor {
private final OrderedSafeExecutor writeThreadPool;
/**
- * TLS management
+ * TLS management.
*/
private final SecurityHandlerFactory shFactory;
/**
* The threadpool used to execute all long poll requests issued to this
server
- * after they are done waiting
+ * after they are done waiting.
*/
private final OrderedSafeExecutor longPollThreadPool;
/**
- * The Timer used to time out requests for long polling
+ * The Timer used to time out requests for long polling.
*/
private final HashedWheelTimer requestTimer;
@@ -137,9 +142,11 @@ public class BookieRequestProcessor implements
RequestProcessor {
StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws
SecurityException {
this.serverCfg = serverCfg;
this.bookie = bookie;
- this.readThreadPool =
createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" +
serverCfg.getBookiePort(),
+ this.readThreadPool =
createExecutor(this.serverCfg.getNumReadWorkerThreads(),
+ "BookieReadThread-" + serverCfg.getBookiePort(),
serverCfg.getMaxPendingReadRequestPerThread());
- this.writeThreadPool =
createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" +
serverCfg.getBookiePort(),
+ this.writeThreadPool =
createExecutor(this.serverCfg.getNumAddWorkerThreads(),
+ "BookieWriteThread-" + serverCfg.getBookiePort(),
serverCfg.getMaxPendingAddRequestPerThread());
this.longPollThreadPool =
createExecutor(
@@ -188,7 +195,8 @@ public class BookieRequestProcessor implements
RequestProcessor {
if (numThreads <= 0) {
return null;
} else {
- return
OrderedSafeExecutor.newBuilder().numThreads(numThreads).name(nameFormat).maxTasksInQueue(maxTasksInQueue).build();
+ return
OrderedSafeExecutor.newBuilder().numThreads(numThreads).name(nameFormat)
+ .maxTasksInQueue(maxTasksInQueue).build();
}
}
@@ -213,26 +221,26 @@ public class BookieRequestProcessor implements
RequestProcessor {
processReadRequestV3(r, c);
break;
case AUTH:
- LOG.info("Ignoring auth operation from client
{}",c.remoteAddress());
+ LOG.info("Ignoring auth operation from client {}",
c.remoteAddress());
BookkeeperProtocol.AuthMessage message =
BookkeeperProtocol.AuthMessage
.newBuilder()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
.build();
- BookkeeperProtocol.Response.Builder authResponse =
-
BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
- .setStatus(BookkeeperProtocol.StatusCode.EOK)
- .setAuthResponse(message);
+ BookkeeperProtocol.Response.Builder authResponse =
BookkeeperProtocol.Response
+ .newBuilder().setHeader(r.getHeader())
+ .setStatus(BookkeeperProtocol.StatusCode.EOK)
+ .setAuthResponse(message);
c.writeAndFlush(authResponse.build());
break;
case WRITE_LAC:
- processWriteLacRequestV3(r,c);
+ processWriteLacRequestV3(r, c);
break;
case READ_LAC:
- processReadLacRequestV3(r,c);
+ processReadLacRequestV3(r, c);
break;
case GET_BOOKIE_INFO:
- processGetBookieInfoRequestV3(r,c);
+ processGetBookieInfoRequestV3(r, c);
break;
case START_TLS:
processStartTLSRequestV3(r, c);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 253a871..6186e44 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -57,7 +57,7 @@ public class BookieServer {
private volatile boolean running = false;
Bookie bookie;
DeathWatcher deathWatcher;
- private final static Logger LOG =
LoggerFactory.getLogger(BookieServer.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(BookieServer.class);
int exitCode = ExitCode.OK;
@@ -97,9 +97,9 @@ public class BookieServer {
protected Bookie newBookie(ServerConfiguration conf)
throws IOException, KeeperException, InterruptedException,
BookieException {
- return conf.isForceReadOnlyBookie() ?
- new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE)) :
- new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE));
+ return conf.isForceReadOnlyBookie()
+ ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE))
+ : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE));
}
public void start() throws IOException, UnavailableException,
InterruptedException, BKException {
@@ -127,7 +127,7 @@ public class BookieServer {
}
/**
- * Suspend processing of requests in the bookie (for testing)
+ * Suspend processing of requests in the bookie (for testing).
*/
@VisibleForTesting
public void suspendProcessing() {
@@ -138,7 +138,7 @@ public class BookieServer {
}
/**
- * Resume processing requests in the bookie (for testing)
+ * Resume processing requests in the bookie (for testing).
*/
@VisibleForTesting
public void resumeProcessing() {
@@ -181,7 +181,7 @@ public class BookieServer {
}
/**
- * A thread to watch whether bookie & nioserver is still alive
+ * A thread to watch whether bookie and nioserver are still alive.
*/
private class DeathWatcher extends BookieCriticalThread {
@@ -194,7 +194,7 @@ public class BookieServer {
@Override
public void run() {
- while(true) {
+ while (true) {
try {
Thread.sleep(watchInterval);
} catch (InterruptedException ie) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index ab160ce..04a4546 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -22,15 +22,17 @@
package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
+
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
@@ -67,26 +69,44 @@ public class BookkeeperInternalCallbacks {
void onChanged(long ledgerId, LedgerMetadata metadata);
}
+ /**
+ * A writer callback interface.
+ */
public interface WriteCallback {
void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx);
}
+ /**
+ * A last-add-confirmed (LAC) reader callback interface.
+ */
public interface ReadLacCallback {
void readLacComplete(int rc, long ledgerId, ByteBuf lac, ByteBuf
buffer, Object ctx);
}
+ /**
+ * A last-add-confirmed (LAC) writer callback interface.
+ */
public interface WriteLacCallback {
void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr,
Object ctx);
}
+ /**
+ * A callback interface for a STARTTLS command.
+ */
public interface StartTLSCallback {
void startTLSComplete(int rc, Object ctx);
}
+ /**
+ * A generic callback interface.
+ */
public interface GenericCallback<T> {
void operationComplete(int rc, T result);
}
-
+
+ /**
+ * A callback implementation with an internal timer.
+ */
public static class TimedGenericCallback<T> implements GenericCallback<T> {
final GenericCallback<T> cb;
@@ -112,6 +132,9 @@ public class BookkeeperInternalCallbacks {
}
}
+ /**
+ * Declaration of a callback interface for the Last Add Confirmed context
of a reader.
+ */
public interface ReadEntryCallbackCtx {
void setLastAddConfirmed(long lac);
long getLastAddConfirmed();
@@ -145,7 +168,10 @@ public class BookkeeperInternalCallbacks {
*/
void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry,
Object ctx);
}
-
+
+ /**
+ * This is a callback interface for fetching metadata about a bookie.
+ */
public interface GetBookieInfoCallback {
void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx);
}
@@ -231,18 +257,18 @@ public class BookkeeperInternalCallbacks {
}
/**
- * Processor to process a specific element
+ * Processor to process a specific element.
*/
- public static interface Processor<T> {
+ public interface Processor<T> {
/**
- * Process a specific element
+ * Process a specific element.
*
* @param data
* data to process
* @param cb
* Callback to invoke when process has been done.
*/
- public void process(T data, AsyncCallback.VoidCallback cb);
+ void process(T data, AsyncCallback.VoidCallback cb);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ClientConnectionPeer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ClientConnectionPeer.java
index ad556f5..1c45b67 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ClientConnectionPeer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ClientConnectionPeer.java
@@ -21,7 +21,7 @@
package org.apache.bookkeeper.proto;
/**
- * Represents the connection to a Bookie, from the client side
+ * Represents the connection to a Bookie, from the client side.
*/
public interface ClientConnectionPeer extends ConnectionPeer {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ConnectionPeer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ConnectionPeer.java
index b5a1837..7d27e72 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ConnectionPeer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ConnectionPeer.java
@@ -25,45 +25,45 @@ import java.util.Collection;
import org.apache.bookkeeper.auth.BookKeeperPrincipal;
/**
- * Represents the connection to a BookKeeper client, from the Bookie side
+ * Represents the connection to a BookKeeper client, from the Bookie side.
*/
public interface ConnectionPeer {
/**
- * Address from which originated the connection
+ * Address from which originated the connection.
* @return
*/
- public SocketAddress getRemoteAddr();
+ SocketAddress getRemoteAddr();
/**
- * Additional principals bound to the connection, like TLS certificates
+ * Additional principals bound to the connection, like TLS certificates.
* @return
*/
- public Collection<Object> getProtocolPrincipals();
+ Collection<Object> getProtocolPrincipals();
/**
- * Utility function to be used from AuthProviders to drop the connection
+ * Utility function to be used from AuthProviders to drop the connection.
*/
- public void disconnect();
+ void disconnect();
/**
- * Returns the user which is bound to the connection
+ * Returns the user which is bound to the connection.
* @return the principal or null if no auth takes place
* or the auth plugin did not call {@link
#setAuthorizedId(org.apache.bookkeeper.auth.BookKeeperPrincipal)}
* @see #setAuthorizedId(org.apache.bookkeeper.auth.BookKeeperPrincipal)
*/
- public BookKeeperPrincipal getAuthorizedId();
+ BookKeeperPrincipal getAuthorizedId();
/**
- * Assign a principal to the current connection
+ * Assign a principal to the current connection.
* @param principal the id of the user
* @see #getAuthorizedId()
*/
- public void setAuthorizedId(BookKeeperPrincipal principal);
+ void setAuthorizedId(BookKeeperPrincipal principal);
/**
- * This flag returns true if a 'secure' channel in use, like TLS
+ * This flag returns true if a 'secure' channel in use, like TLS.
* @return true if the channel is 'secure'
*/
- public boolean isSecure();
+ boolean isSecure();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index ba1b444..3ac5cb6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -20,6 +20,9 @@
*/
package org.apache.bookkeeper.proto;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -32,10 +35,6 @@ import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.base.Preconditions;
-
/**
* Provide a simple round-robin style channel pool. We could improve it later
to do more
* fantastic things.
@@ -59,7 +58,7 @@ class DefaultPerChannelBookieClientPool implements
PerChannelBookieClientPool,
DefaultPerChannelBookieClientPool(ClientConfiguration conf,
PerChannelBookieClientFactory factory,
BookieSocketAddress address,
int coreSize) throws SecurityException {
- Preconditions.checkArgument(coreSize > 0);
+ checkArgument(coreSize > 0);
this.factory = factory;
this.address = address;
this.conf = conf;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index 21a780c..fc55e4a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.proto;
+import io.netty.channel.Channel;
+
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@@ -32,10 +34,11 @@ import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.channel.Channel;
-
+/**
+ * A processor class for v3 bookie metadata packets.
+ */
public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements
Runnable {
- private final static Logger LOG =
LoggerFactory.getLogger(GetBookieInfoProcessorV3.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(GetBookieInfoProcessorV3.class);
public GetBookieInfoProcessorV3(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
index 738ee48..59b6ed0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
@@ -24,23 +24,23 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.net.BookieSocketAddress;
/**
- * Local registry for embedded Bookies
+ * Local registry for embedded Bookies.
*/
public class LocalBookiesRegistry {
-
- private final static ConcurrentHashMap<BookieSocketAddress,Boolean>
localBookiesRegistry
- = new ConcurrentHashMap<>();
-
- static void registerLocalBookieAddress(BookieSocketAddress address) {
- localBookiesRegistry.put(address,Boolean.TRUE);
+
+ private static final ConcurrentHashMap<BookieSocketAddress, Boolean>
localBookiesRegistry =
+ new ConcurrentHashMap<>();
+
+ static void registerLocalBookieAddress(BookieSocketAddress address) {
+ localBookiesRegistry.put(address, Boolean.TRUE);
}
static void unregisterLocalBookieAddress(BookieSocketAddress address) {
- if (address!= null) {
+ if (address != null) {
localBookiesRegistry.remove(address);
}
}
- public static boolean isLocalBookie(BookieSocketAddress address) {
+ public static boolean isLocalBookie(BookieSocketAddress address) {
return localBookiesRegistry.containsKey(address);
}
-
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index 342e788..1638174 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -19,10 +19,12 @@ package org.apache.bookkeeper.proto;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
+
import io.netty.channel.Channel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+
import java.io.IOException;
import java.util.Observable;
import java.util.Observer;
@@ -30,10 +32,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +46,7 @@ import org.slf4j.LoggerFactory;
*/
class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements
Observer {
- private final static Logger logger =
LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
+ private static final Logger logger =
LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
private final Long previousLAC;
private Optional<Long> lastAddConfirmedUpdateTime = Optional.absent();
@@ -83,7 +86,7 @@ class LongPollReadEntryProcessorV3 extends
ReadEntryProcessorV3 implements Obser
Stopwatch startTimeSw)
throws IOException {
if (RequestUtils.shouldPiggybackEntry(readRequest)) {
- if(!readRequest.hasPreviousLAC() ||
(BookieProtocol.LAST_ADD_CONFIRMED != entryId)) {
+ if (!readRequest.hasPreviousLAC() ||
(BookieProtocol.LAST_ADD_CONFIRMED != entryId)) {
// This is not a valid request - client bug?
logger.error("Incorrect read request, entry piggyback
requested incorrectly for ledgerId {} entryId {}",
ledgerId, entryId);
@@ -104,7 +107,8 @@ class LongPollReadEntryProcessorV3 extends
ReadEntryProcessorV3 implements Obser
return super.readEntry(readResponseBuilder, entryId,
true, startTimeSw);
} catch (Bookie.NoEntryException e) {
requestProcessor.readLastEntryNoEntryErrorCounter.inc();
- logger.info("No entry found while piggyback reading
entry {} from ledger {} : previous lac = {}",
+ logger.info(
+ "No entry found while piggyback reading entry
{} from ledger {} : previous lac = {}",
new Object[] { entryId, ledgerId, previousLAC
});
// piggy back is best effort and this request can fail
genuinely because of striping
// entries across the ensemble
@@ -113,8 +117,8 @@ class LongPollReadEntryProcessorV3 extends
ReadEntryProcessorV3 implements Obser
} else {
if (knownLAC < previousLAC) {
if (logger.isDebugEnabled()) {
- logger.debug("Found smaller lac when piggy back
reading lac and entry from ledger {} :" +
- " previous lac = {}, known lac = {}",
+ logger.debug("Found smaller lac when piggy back
reading lac and entry from ledger {} :"
+ + " previous lac = {}, known lac = {}",
new Object[]{ ledgerId, previousLAC,
knownLAC });
}
}
@@ -189,10 +193,9 @@ class LongPollReadEntryProcessorV3 extends
ReadEntryProcessorV3 implements Obser
@Override
public void update(Observable observable, Object o) {
- LastAddConfirmedUpdateNotification newLACNotification =
(LastAddConfirmedUpdateNotification)o;
+ LastAddConfirmedUpdateNotification newLACNotification =
(LastAddConfirmedUpdateNotification) o;
if (newLACNotification.lastAddConfirmed > previousLAC) {
- if (newLACNotification.lastAddConfirmed != Long.MAX_VALUE &&
- !lastAddConfirmedUpdateTime.isPresent()) {
+ if (newLACNotification.lastAddConfirmed != Long.MAX_VALUE &&
!lastAddConfirmedUpdateTime.isPresent()) {
lastAddConfirmedUpdateTime =
Optional.of(newLACNotification.timestamp);
}
if (logger.isTraceEnabled()) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index 5c9d8d2..eda5b92 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -17,6 +17,8 @@
*/
package org.apache.bookkeeper.proto;
+import io.netty.channel.Channel;
+
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookieProtocol.Request;
@@ -26,10 +28,11 @@ import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.channel.Channel;
-
+/**
+ * A base class for bookeeper packet processors.
+ */
abstract class PacketProcessorBase extends SafeRunnable {
- private final static Logger logger =
LoggerFactory.getLogger(PacketProcessorBase.class);
+ private static final Logger logger =
LoggerFactory.getLogger(PacketProcessorBase.class);
Request request;
Channel channel;
BookieRequestProcessor requestProcessor;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index e398c4a..e84bbee 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -33,6 +33,9 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
+/**
+ * A base class for bookkeeper protocol v3 packet processors.
+ */
public abstract class PacketProcessorBaseV3 extends SafeRunnable {
final Request request;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index c392c79..d470fc0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -20,10 +20,11 @@ package org.apache.bookkeeper.proto;
import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
-import com.google.common.collect.Sets;
import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
+
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -33,6 +34,7 @@ import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
@@ -41,7 +43,6 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.local.LocalChannel;
@@ -61,17 +62,22 @@ import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.SocketAddress;
-import java.nio.channels.ClosedChannelException;
+import java.security.cert.Certificate;
import java.util.ArrayDeque;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Objects;
+import java.util.Collections;
+import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.SSLPeerUnverifiedException;
+
import org.apache.bookkeeper.auth.BookKeeperPrincipal;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
@@ -80,13 +86,13 @@ import
org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
-import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.StartTLSCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -115,20 +121,6 @@ import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ExtensionRegistry;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.UnpooledByteBufAllocator;
-import java.net.SocketAddress;
-
-import java.net.SocketAddress;
-import java.security.cert.Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import org.apache.bookkeeper.auth.BookKeeperPrincipal;
-
/**
* This class manages all details of connection to a particular bookie. It also
* has reconnect logic if a connection to a bookie fails.
@@ -148,7 +140,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
BKException.Code.DuplicateEntryIdException,
BKException.Code.WriteOnReadOnlyBookieException));
- public static final AtomicLong txnIdGenerator = new AtomicLong(0);
+ private static final AtomicLong txnIdGenerator = new AtomicLong(0);
final BookieSocketAddress addr;
final EventLoopGroup eventLoopGroup;
@@ -160,7 +152,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final int getBookieInfoTimeout;
final int startTLSTimeout;
- private final ConcurrentHashMap<CompletionKey, CompletionValue>
completionObjects = new ConcurrentHashMap<CompletionKey, CompletionValue>();
+ private final ConcurrentHashMap<CompletionKey, CompletionValue>
completionObjects =
+ new ConcurrentHashMap<CompletionKey, CompletionValue>();
private final StatsLogger statsLogger;
private final OpStatsLogger readEntryOpLogger;
@@ -180,7 +173,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
/**
* The following member variables do not need to be concurrent, or volatile
- * because they are always updated under a lock
+ * because they are always updated under a lock.
*/
private volatile Queue<GenericCallback<PerChannelBookieClient>> pendingOps
=
new ArrayDeque<GenericCallback<PerChannelBookieClient>>();
@@ -416,7 +409,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
pipeline.addLast("lengthprepender", new
LengthFieldPrepender(4));
pipeline.addLast("bookieProtoEncoder", new
BookieProtoEncoding.RequestEncoder(extRegistry));
pipeline.addLast("bookieProtoDecoder", new
BookieProtoEncoding.ResponseDecoder(extRegistry));
- pipeline.addLast("authHandler", new
AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
connectionPeer));
+ pipeline.addLast("authHandler", new
AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
+ connectionPeer));
pipeline.addLast("mainhandler", PerChannelBookieClient.this);
}
});
@@ -510,7 +504,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
/**
* This method should be called only after connection has been checked for
- * {@link #connectIfNeededAndDoOp(GenericCallback)}
+ * {@link #connectIfNeededAndDoOp(GenericCallback)}.
*
* @param ledgerId
* Ledger Id
@@ -653,7 +647,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
/**
- * Long Poll Reads
+ * Long Poll Reads.
*/
public void readEntryWaitForLACUpdate(final long ledgerId,
final long entryId,
@@ -838,9 +832,9 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
return;
}
- try{
+ try {
channel.writeAndFlush(request, channel.voidPromise());
- } catch(Throwable e) {
+ } catch (Throwable e) {
LOG.warn("Operation {} failed", requestToString(request), e);
errorOut(key);
}
@@ -848,8 +842,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
private static String requestToString(Object request) {
if (request instanceof BookkeeperProtocol.Request) {
- BookkeeperProtocol.BKPacketHeader header
- = ((BookkeeperProtocol.Request)request).getHeader();
+ BookkeeperProtocol.BKPacketHeader header =
((BookkeeperProtocol.Request) request).getHeader();
return String.format("Req(txnId=%d,op=%s,version=%s)",
header.getTxnId(), header.getOperation(),
header.getVersion());
@@ -905,7 +898,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
/**
- * If our channel has disconnected, we just error out the pending entries
+ * If our channel has disconnected, we just error out the pending entries.
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@@ -929,7 +922,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
/**
* Called by netty when an exception happens in one of the netty threads
- * (mostly due to what we do in the netty threads)
+ * (mostly due to what we do in the netty threads).
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
@@ -973,7 +966,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
/**
- * Called by netty when a message is received on a channel
+ * Called by netty when a message is received on a channel.
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
@@ -1163,7 +1156,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
/**
- * Boiler-plate wrapper classes follow
+ * Boiler-plate wrapper classes follow.
*
*/
@@ -1314,8 +1307,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
@Override
public void handleV3Response(BookkeeperProtocol.Response response) {
WriteLacResponse writeLacResponse = response.getWriteLacResponse();
- StatusCode status = response.getStatus() == StatusCode.EOK ?
- writeLacResponse.getStatus() : response.getStatus();
+ StatusCode status = response.getStatus() == StatusCode.EOK
+ ? writeLacResponse.getStatus() : response.getStatus();
long ledgerId = writeLacResponse.getLedgerId();
int rc = logAndConvertStatus(status,
@@ -1365,7 +1358,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
ReadLacResponse readLacResponse = response.getReadLacResponse();
ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
- StatusCode status = response.getStatus() == StatusCode.EOK ?
readLacResponse.getStatus() : response.getStatus();
+ StatusCode status = response.getStatus() == StatusCode.EOK
+ ? readLacResponse.getStatus() : response.getStatus();
if (readLacResponse.hasLacBody()) {
lacBuffer =
Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
@@ -1472,16 +1466,14 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
"entry", entryId,
"entryLength", readableBytes);
- if(buffer != null) {
+ if (buffer != null) {
buffer = buffer.slice();
}
- if (maxLAC > INVALID_ENTRY_ID
- && (ctx instanceof ReadEntryCallbackCtx)) {
- ((ReadEntryCallbackCtx)ctx).setLastAddConfirmed(maxLAC);
+ if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof
ReadEntryCallbackCtx)) {
+ ((ReadEntryCallbackCtx) ctx).setLastAddConfirmed(maxLAC);
}
- if (lacUpdateTimestamp > -1L
- && (ctx instanceof ReadLastConfirmedAndEntryContext)) {
-
((ReadLastConfirmedAndEntryContext)ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
+ if (lacUpdateTimestamp > -1L && (ctx instanceof
ReadLastConfirmedAndEntryContext)) {
+ ((ReadLastConfirmedAndEntryContext)
ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
}
cb.readEntryComplete(rc, ledgerId, entryId, buffer, ctx);
}
@@ -1570,8 +1562,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
@Override
public void handleV3Response(BookkeeperProtocol.Response response) {
- GetBookieInfoResponse getBookieInfoResponse
- = response.getGetBookieInfoResponse();
+ GetBookieInfoResponse getBookieInfoResponse =
response.getGetBookieInfoResponse();
StatusCode status = response.getStatus() == StatusCode.EOK
? getBookieInfoResponse.getStatus() : response.getStatus();
@@ -1588,7 +1579,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
}
- private final Recycler<AddCompletion> ADD_COMPLETION_RECYCLER = new
Recycler<AddCompletion>() {
+ private final Recycler<AddCompletion> addCompletionRecycler = new
Recycler<AddCompletion>() {
protected AddCompletion newObject(Recycler.Handle<AddCompletion>
handle) {
return new AddCompletion(handle);
}
@@ -1598,7 +1589,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final WriteCallback originalCallback,
final Object originalCtx,
final long ledgerId, final long
entryId) {
- AddCompletion completion = ADD_COMPLETION_RECYCLER.get();
+ AddCompletion completion = addCompletionRecycler.get();
completion.reset(key, originalCallback, originalCtx, ledgerId,
entryId);
return completion;
}
@@ -1786,7 +1777,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
return txnIdGenerator.incrementAndGet();
}
- private final Recycler<V2CompletionKey> V2_KEY_RECYCLER = new
Recycler<V2CompletionKey>() {
+ private final Recycler<V2CompletionKey> v2KeyRecycler = new
Recycler<V2CompletionKey>() {
protected V2CompletionKey newObject(
Recycler.Handle<V2CompletionKey> handle) {
return new V2CompletionKey(handle);
@@ -1795,7 +1786,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
V2CompletionKey acquireV2Key(long ledgerId, long entryId,
OperationType operationType) {
- V2CompletionKey key = V2_KEY_RECYCLER.get();
+ V2CompletionKey key = v2KeyRecycler.get();
key.reset(ledgerId, entryId, operationType);
return key;
}
@@ -1843,6 +1834,9 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
}
+ /**
+ * Connection listener.
+ */
public class ConnectionFutureListener implements ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@@ -1908,7 +1902,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
private void initiateTLS() {
- LOG.info("Initializing TLS to {}",channel);
+ LOG.info("Initializing TLS to {}", channel);
assert state == ConnectionState.CONNECTING;
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId,
OperationType.START_TLS);
@@ -1928,7 +1922,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
private void failTLS(int rc) {
LOG.error("TLS failure on: {}, rc: {}", channel, rc);
Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
- synchronized(this) {
+ synchronized (this) {
disconnect();
oldPendingOps = pendingOps;
pendingOps = new ArrayDeque<>();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index b7618e3..bd07a4e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -41,7 +41,7 @@ interface PerChannelBookieClientPool {
void obtain(GenericCallback<PerChannelBookieClient> callback, long key);
/**
- * record any read/write error on {@link PerChannelBookieClientPool}
+ * record any read/write error on {@link PerChannelBookieClientPool}.
*/
void recordError();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index f651e13..007d87b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -20,8 +20,8 @@ package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.util.Recycler;
-import io.netty.util.ReferenceCountUtil;
import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
@@ -37,7 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ReadEntryProcessor extends PacketProcessorBase {
- private final static Logger LOG =
LoggerFactory.getLogger(ReadEntryProcessor.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadEntryProcessor.class);
public static ReadEntryProcessor create(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index e01a339..73175e5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -42,17 +42,17 @@ import org.slf4j.LoggerFactory;
class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
- private final static Logger LOG =
LoggerFactory.getLogger(ReadEntryProcessorV3.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadEntryProcessorV3.class);
protected Stopwatch lastPhaseStartTime;
private final ExecutorService fenceThreadPool;
private SettableFuture<Boolean> fenceResult = null;
-
+
protected final ReadRequest readRequest;
protected final long ledgerId;
protected final long entryId;
-
+
// Stats
protected final OpStatsLogger readStats;
protected final OpStatsLogger reqStats;
@@ -75,7 +75,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
this.readStats = requestProcessor.readEntryStats;
this.reqStats = requestProcessor.readRequestStats;
}
-
+
this.fenceThreadPool = fenceThreadPool;
lastPhaseStartTime = Stopwatch.createStarted();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index 01ebead..f3faf76 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -20,6 +20,12 @@
*/
package org.apache.bookkeeper.proto;
+import com.google.protobuf.ByteString;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.util.ReferenceCountUtil;
+
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@@ -33,14 +39,11 @@ import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.protobuf.ByteString;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.util.ReferenceCountUtil;
-
+/**
+ * A read processor for v3 last add confirmed messages.
+ */
class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
- private final static Logger logger =
LoggerFactory.getLogger(ReadLacProcessorV3.class);
+ private static final Logger logger =
LoggerFactory.getLogger(ReadLacProcessorV3.class);
public ReadLacProcessorV3(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ServerStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ServerStats.java
index 4920bb9..b8fc8b1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ServerStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ServerStats.java
@@ -16,6 +16,9 @@ package org.apache.bookkeeper.proto;
import org.apache.bookkeeper.util.MathUtils;
+/**
+ * A class to hold server statistics.
+ */
public class ServerStats {
private static ServerStats instance = new ServerStats();
private long packetsSent;
@@ -25,7 +28,7 @@ public class ServerStats {
private long totalLatency = 0;
private long count = 0;
- static public ServerStats getInstance() {
+ public static ServerStats getInstance() {
return instance;
}
@@ -33,26 +36,26 @@ public class ServerStats {
}
// getters
- synchronized public long getMinLatency() {
+ public synchronized long getMinLatency() {
return (minLatency == Long.MAX_VALUE) ? 0 : minLatency;
}
- synchronized public long getAvgLatency() {
- if (count != 0)
+ public synchronized long getAvgLatency() {
+ if (count != 0) {
return totalLatency / count;
+ }
return 0;
}
- synchronized public long getMaxLatency() {
+ public synchronized long getMaxLatency() {
return maxLatency;
}
-
- synchronized public long getPacketsReceived() {
+ public synchronized long getPacketsReceived() {
return packetsReceived;
}
- synchronized public long getPacketsSent() {
+ public synchronized long getPacketsSent() {
return packetsSent;
}
@@ -77,24 +80,24 @@ public class ServerStats {
}
}
- synchronized public void resetLatency() {
+ public synchronized void resetLatency() {
totalLatency = count = maxLatency = 0;
minLatency = Long.MAX_VALUE;
}
- synchronized public void resetMaxLatency() {
+ public synchronized void resetMaxLatency() {
maxLatency = getMinLatency();
}
- synchronized public void incrementPacketsReceived() {
+ public synchronized void incrementPacketsReceived() {
packetsReceived++;
}
- synchronized public void incrementPacketsSent() {
+ public synchronized void incrementPacketsSent() {
packetsSent++;
}
- synchronized public void resetRequestCounters() {
+ public synchronized void resetRequestCounters() {
packetsReceived = packetsSent = 0;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 416a478..2a3d78c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -34,11 +34,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Processes add entry requests
+ * Processes add entry requests.
*/
class WriteEntryProcessor extends PacketProcessorBase implements WriteCallback
{
- private final static Logger LOG =
LoggerFactory.getLogger(WriteEntryProcessor.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(WriteEntryProcessor.class);
long startTimeNanos;
@@ -123,7 +123,7 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
return String.format("WriteEntry(%d, %d)",
request.getLedgerId(), request.getEntryId());
}
-
+
private void recycle() {
reset();
recyclerHandle.recycle(this);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index b4e89f8..f757a25 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -39,7 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
- private final static Logger logger =
LoggerFactory.getLogger(WriteEntryProcessorV3.class);
+ private static final Logger logger =
LoggerFactory.getLogger(WriteEntryProcessorV3.class);
public WriteEntryProcessorV3(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index d710102..9b5d98c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -20,25 +20,25 @@
*/
package org.apache.bookkeeper.proto;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-
class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
- private final static Logger logger =
LoggerFactory.getLogger(WriteLacProcessorV3.class);
+ private static final Logger logger =
LoggerFactory.getLogger(WriteLacProcessorV3.class);
public WriteLacProcessorV3(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
@@ -90,9 +90,11 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3
implements Runnable {
// If everything is okay, we return null so that the calling function
// dosn't return a response back to the caller.
if (status.equals(StatusCode.EOK)) {
-
requestProcessor.writeLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
+
requestProcessor.writeLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+ TimeUnit.NANOSECONDS);
} else {
-
requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
+
requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+ TimeUnit.NANOSECONDS);
}
writeLacResponse.setStatus(status);
return writeLacResponse.build();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ClientConnectionPeer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/package-info.java
similarity index 85%
copy from
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ClientConnectionPeer.java
copy to
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/package-info.java
index ad556f5..acddc18 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ClientConnectionPeer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/package-info.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,13 +15,9 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.bookkeeper.proto;
/**
- * Represents the connection to a Bookie, from the client side
+ * Classes related to the Bookkeeper protocol.
*/
-public interface ClientConnectionPeer extends ConnectionPeer {
-
-}
+package org.apache.bookkeeper.proto;
diff --git a/buildtools/src/main/resources/bookkeeper/server-suppressions.xml
b/buildtools/src/main/resources/bookkeeper/server-suppressions.xml
index a8371d6..3e08a64 100644
--- a/buildtools/src/main/resources/bookkeeper/server-suppressions.xml
+++ b/buildtools/src/main/resources/bookkeeper/server-suppressions.xml
@@ -26,7 +26,6 @@
<suppress checks=".*" files=".*[\\/]http[\\/].*"/>
<suppress checks=".*" files=".*[\\/]metastore[\\/].*"/>
<suppress checks=".*" files=".*[\\/]net[\\/].*"/>
- <suppress checks=".*" files=".*[\\/]proto[\\/].*"/>
<suppress checks=".*" files=".*[\\/]replication[\\/].*"/>
<suppress checks=".*" files=".*[\\/]sasl[\\/].*"/>
<suppress checks=".*" files=".*[\\/]test[\\/].*"/>
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].