BOOKKEEPER-901: Authentication framework Author: Ivan Kelly <[email protected]>
Reviewers: Sijie Guo<[email protected]> Closes #23 from merlimat/authentication-framework and squashes the following commits: aa01548 [Ivan Kelly] BOOKKEEPER-901: Add an authentication framework f930fbd [Ivan Kelly] BOOKKEEPER-794 BookkeeperProtocol.Response.status is completely ignored Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/b1c12c0f Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/b1c12c0f Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/b1c12c0f Branch: refs/heads/master Commit: b1c12c0f41b7c27b2452fef311f12077d771f431 Parents: e32c388 Author: Ivan Kelly <[email protected]> Authored: Mon Apr 4 23:45:51 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Mon Apr 4 23:45:51 2016 -0700 ---------------------------------------------------------------------- bookkeeper-server/pom.xml | 9 + .../auth/AuthProviderFactoryFactory.java | 111 +++ .../bookkeeper/auth/BookieAuthProvider.java | 83 ++ .../bookkeeper/auth/ClientAuthProvider.java | 89 +++ .../apache/bookkeeper/client/PendingAddOp.java | 4 +- .../bookkeeper/conf/ClientConfiguration.java | 53 +- .../bookkeeper/conf/ServerConfiguration.java | 24 + .../apache/bookkeeper/proto/AuthHandler.java | 356 +++++++++ .../apache/bookkeeper/proto/BookieClient.java | 32 +- .../bookkeeper/proto/BookieNettyServer.java | 21 +- .../bookkeeper/proto/BookieProtoEncoding.java | 125 ++- .../apache/bookkeeper/proto/BookieProtocol.java | 36 + .../proto/BookieRequestProcessor.java | 1 + .../bookkeeper/proto/BookkeeperProtocol.java | 787 ++++++++++++++++++- .../proto/PerChannelBookieClient.java | 70 +- .../src/main/proto/BookkeeperProtocol.proto | 14 +- .../org/apache/bookkeeper/auth/TestAuth.java | 654 +++++++++++++++ .../proto/TestBackwardCompatCMS42.java | 239 ++++++ .../bookkeeper/proto/TestDataFormats.java | 126 +++ .../proto/TestPerChannelBookieClient.java | 25 +- .../test/BookKeeperClusterTestCase.java | 10 + .../src/test/proto/TestDataFormats.proto | 34 + 22 files changed, 2803 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/pom.xml ---------------------------------------------------------------------- diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml index a1a74e0..eb67fd8 100644 --- a/bookkeeper-server/pom.xml +++ b/bookkeeper-server/pom.xml @@ -262,6 +262,7 @@ <!-- exclude generated file //--> <exclude>**/DataFormats.java</exclude> <exclude>**/BookkeeperProtocol.java</exclude> + <exclude>**/TestDataFormats.java</exclude> </excludes> </configuration> </plugin> @@ -324,6 +325,14 @@ <arg value="--java_out=src/main/java" /> <arg value="src/main/proto/DataFormats.proto" /> </exec> + <exec executable="protoc" failonerror="true"> + <arg value="--java_out=src/main/java" /> + <arg value="src/main/proto/BookkeeperProtocol.proto" /> + </exec> + <exec executable="protoc" failonerror="true"> + <arg value="--java_out=src/test/java" /> + <arg value="src/test/proto/TestDataFormats.proto" /> + </exec> </target> </configuration> <goals> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java new file mode 100644 index 0000000..d05c475 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java @@ -0,0 +1,111 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.auth; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ExtensionRegistry; + + + +public class AuthProviderFactoryFactory { + static Logger LOG = LoggerFactory.getLogger(AuthProviderFactoryFactory.class); + + public static BookieAuthProvider.Factory newBookieAuthProviderFactory(ServerConfiguration conf, + ExtensionRegistry registry) throws IOException { + String factoryClassName = conf.getBookieAuthProviderFactoryClass(); + + if (factoryClassName == null || factoryClassName.length() == 0) { + return new NullBookieAuthProviderFactory(); + } + + BookieAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName, + BookieAuthProvider.Factory.class); + factory.init(conf, registry); + return factory; + } + + public static ClientAuthProvider.Factory newClientAuthProviderFactory(ClientConfiguration conf, + ExtensionRegistry registry) throws IOException { + String factoryClassName = conf.getClientAuthProviderFactoryClass(); + + if (factoryClassName == null || factoryClassName.length() == 0) { + return new NullClientAuthProviderFactory(); + } + + ClientAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName, + ClientAuthProvider.Factory.class); + factory.init(conf, registry); + return factory; + } + + private final static String nullPluginName = "NULLPlugin"; + + private static class NullBookieAuthProviderFactory implements BookieAuthProvider.Factory { + @Override + public String getPluginName() { + return nullPluginName; + } + + @Override + public void init(ServerConfiguration conf, ExtensionRegistry registry) {} + + @Override + public BookieAuthProvider newProvider(InetSocketAddress addr, + GenericCallback<Void> completeCb) { + completeCb.operationComplete(BKException.Code.OK, null); + return new BookieAuthProvider() { + public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {} + }; + } + } + + private static class NullClientAuthProviderFactory implements ClientAuthProvider.Factory { + @Override + public String getPluginName() { + return nullPluginName; + } + + @Override + public void init(ClientConfiguration conf, ExtensionRegistry registry) {} + + @Override + public ClientAuthProvider newProvider(InetSocketAddress addr, + GenericCallback<Void> completeCb) { + completeCb.operationComplete(BKException.Code.OK, null); + return new ClientAuthProvider() { + public void init(GenericCallback<AuthMessage> cb) {} + public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {} + }; + } + } + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java new file mode 100644 index 0000000..4fb7d07 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java @@ -0,0 +1,83 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.auth; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; + +import com.google.protobuf.ExtensionRegistry; + +/** + * Bookie authentication provider interface. + * This must be implemented by any party wishing to implement + * an authentication mechanism for bookkeeper connections. + */ +public interface BookieAuthProvider { + interface Factory { + /** + * Initialize the factory with the server configuration + * and protobuf message registry. Implementors must + * add any extention messages which contain the auth + * payload, so that the server can decode auth messages + * it receives from the client. + */ + void init(ServerConfiguration conf, + ExtensionRegistry registry) throws IOException; + + /** + * Create a new instance of a bookie auth provider. + * Each connection should get its own instance, as they + * can hold connection specific state. + * The completeCb is used to notify the server that + * the authentication handshake is complete. + * CompleteCb should be called only once. + * If the authentication was successful, BKException.Code.OK + * should be passed as the return code. Otherwise, another + * error code should be passed. + * If authentication fails, the server will close the + * connection. + * @param addr the address of the client being authenticated + * @param completeCb callback to be notified when authentication + * is complete. + */ + BookieAuthProvider newProvider(InetSocketAddress addr, + GenericCallback<Void> completeCb); + + /** + * Get Auth provider plugin name. + * Used as a sanity check to ensure that the bookie and the client. + * are using the same auth provider. + */ + String getPluginName(); + } + + /** + * Process a request from the client. cb will receive the next + * message to be sent to the client. If there are no more messages + * to send to the client, cb should not be called, and completeCb + * must be called instead. + */ + void process(AuthMessage m, GenericCallback<AuthMessage> cb); +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java new file mode 100644 index 0000000..fba2264 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java @@ -0,0 +1,89 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.auth; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; + +import com.google.protobuf.ExtensionRegistry; + +/** + * Client authentication provider interface. + * This must be implemented by any party wishing to implement + * an authentication mechanism for bookkeeper connections. + */ +public interface ClientAuthProvider { + interface Factory { + /** + * Initialize the factory with the client configuration + * and protobuf message registry. Implementors must + * add any extention messages which contain the auth + * payload, so that the client can decode auth messages + * it receives from the server. + */ + void init(ClientConfiguration conf, + ExtensionRegistry registry) throws IOException; + + /** + * Create a new instance of a client auth provider. + * Each connection should get its own instance, as they + * can hold connection specific state. + * The completeCb is used to notify the client that + * the authentication handshake is complete. + * CompleteCb should be called only once. + * If the authentication was successful, BKException.Code.OK + * should be passed as the return code. Otherwise, another + * error code should be passed. + * @param addr the address of the socket being authenticated + * @param completeCb callback to be notified when authentication + * is complete. + */ + ClientAuthProvider newProvider(InetSocketAddress addr, + GenericCallback<Void> completeCb); + + /** + * Get Auth provider plugin name. + * Used as a sanity check to ensure that the bookie and the client. + * are using the same auth provider. + */ + String getPluginName(); + } + + /** + * Initiate the authentication. cb will receive the initial + * authentication message which should be sent to the server. + * cb may not be called if authentication is not requires. In + * this case, completeCb should be called. + */ + void init(GenericCallback<AuthMessage> cb); + + /** + * Process a response from the server. cb will receive the next + * message to be sent to the server. If there are no more messages + * to send to the server, cb should not be called, and completeCb + * must be called instead. + */ + void process(AuthMessage m, GenericCallback<AuthMessage> cb); +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index bc487f6..1946069 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -208,8 +208,8 @@ class PendingAddOp implements WriteCallback, TimerTask { lh.handleUnrecoverableErrorDuringAdd(rc); return; default: - LOG.warn("Write did not succeed: L{} E{} on {}", - new Object[] { ledgerId, entryId, addr }); + LOG.warn("Write did not succeed: L{} E{} on {}, rc = {}", + new Object[] { ledgerId, entryId, addr, rc }); lh.handleBookieFailure(addr, bookieIndex); return; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index d0750d3..b8d738b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -81,6 +81,9 @@ public class ClientConfiguration extends AbstractConfiguration { protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats"; protected final static String TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros"; + // Client auth provider factory class name + protected final static String CLIENT_AUTH_PROVIDER_FACTORY_CLASS = "clientAuthProviderFactoryClass"; + /** * Construct a default client-side configuration */ @@ -700,7 +703,7 @@ public class ClientConfiguration extends AbstractConfiguration { /** * Check if bookie health check is enabled. - * + * * @return */ public boolean isBookieHealthCheckEnabled() { @@ -709,15 +712,15 @@ public class ClientConfiguration extends AbstractConfiguration { /** * Enables the bookie health check. - * + * * <p> * If the number of read/write errors for a bookie exceeds {@link #getBookieErrorThresholdPerInterval()} per * interval, that bookie is quarantined for {@link #getBookieQuarantineTimeSeconds()} seconds. During this * quarantined period, the client will try not to use this bookie when creating new ensembles. * </p> - * + * * By default, the bookie health check is <b>disabled</b>. - * + * * @return client configuration */ public ClientConfiguration enableBookieHealthCheck() { @@ -727,7 +730,7 @@ public class ClientConfiguration extends AbstractConfiguration { /** * Get the bookie health check interval in seconds. - * + * * @return */ public int getBookieHealthCheckIntervalSeconds() { @@ -736,11 +739,11 @@ public class ClientConfiguration extends AbstractConfiguration { /** * Set the bookie health check interval. Default is 60 seconds. - * + * * <p> * Note: Please {@link #enableBookieHealthCheck()} to use this configuration. * </p> - * + * * @param interval * @param unit * @return client configuration @@ -752,7 +755,7 @@ public class ClientConfiguration extends AbstractConfiguration { /** * Get the error threshold for a bookie to be quarantined. - * + * * @return */ public long getBookieErrorThresholdPerInterval() { @@ -762,11 +765,11 @@ public class ClientConfiguration extends AbstractConfiguration { /** * Set the error threshold per interval ({@link #getBookieHealthCheckIntervalSeconds()}) for a bookie before it is * quarantined. Default is 100 errors per minute. - * + * * <p> * Note: Please {@link #enableBookieHealthCheck()} to use this configuration. * </p> - * + * * @param threshold * @param unit * @return client configuration @@ -778,7 +781,7 @@ public class ClientConfiguration extends AbstractConfiguration { /** * Get the time for which a bookie will be quarantined. - * + * * @return */ public int getBookieQuarantineTimeSeconds() { @@ -787,11 +790,11 @@ public class ClientConfiguration extends AbstractConfiguration { /** * Set the time for which a bookie will be quarantined. Default is 30 minutes. - * + * * <p> * Note: Please {@link #enableBookieHealthCheck()} to use this configuration. * </p> - * + * * @param quarantineTime * @param unit * @return client configuration @@ -800,4 +803,28 @@ public class ClientConfiguration extends AbstractConfiguration { setProperty(BOOKIE_QUARANTINE_TIME_SECONDS, unit.toSeconds(quarantineTime)); return this; } + + /** + * Set the client authentication provider factory class name. + * If this is not set, no authentication will be used + * + * @param factoryClass + * the client authentication provider factory class name + * @return client configuration + */ + public ClientConfiguration setClientAuthProviderFactoryClass( + String factoryClass) { + setProperty(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, factoryClass); + return this; + } + + /** + * Get the client authentication provider factory class name. If this returns null, no authentication will take + * place. + * + * @return the client authentication provider factory class name or null. + */ + public String getClientAuthProviderFactoryClass() { + return getString(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, null); + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 76e5037..d770650 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -122,6 +122,9 @@ public class ServerConfiguration extends AbstractConfiguration { protected final static String LEDGER_STORAGE_CLASS = "ledgerStorageClass"; + // Bookie auth provider factory class name + protected final static String BOOKIE_AUTH_PROVIDER_FACTORY_CLASS = "bookieAuthProviderFactoryClass"; + /** * Construct a default configuration object */ @@ -1566,4 +1569,25 @@ public class ServerConfiguration extends AbstractConfiguration { } } + /* + * Set the bookie authentication provider factory class name. + * If this is not set, no authentication will be used + * + * @param factoryClass + * the bookie authentication provider factory class name + * @return void + */ + public void setBookieAuthProviderFactoryClass(String factoryClass) { + setProperty(BOOKIE_AUTH_PROVIDER_FACTORY_CLASS, factoryClass); + } + + /** + * Get the bookie authentication provider factory class name. + * If this returns null, no authentication will take place. + * + * @return the bookie authentication provider factory class name or null. + */ + public String getBookieAuthProviderFactoryClass() { + return getString(BOOKIE_AUTH_PROVIDER_FACTORY_CLASS, null); + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java new file mode 100644 index 0000000..522bc0b --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -0,0 +1,356 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.proto; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.jboss.netty.channel.SimpleChannelHandler; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.DefaultExceptionEvent; +import org.jboss.netty.channel.ExceptionEvent; + +import org.apache.bookkeeper.auth.BookieAuthProvider; +import org.apache.bookkeeper.auth.ClientAuthProvider; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse; +import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType; +import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AuthHandler { + static final Logger LOG = LoggerFactory.getLogger(AuthHandler.class); + + static class ServerSideHandler extends SimpleChannelHandler { + volatile boolean authenticated = false; + final BookieAuthProvider.Factory authProviderFactory; + BookieAuthProvider authProvider; + + ServerSideHandler(BookieAuthProvider.Factory authProviderFactory) { + this.authProviderFactory = authProviderFactory; + authProvider = null; + } + + @Override + public void channelOpen(ChannelHandlerContext ctx, + ChannelStateEvent e) throws Exception { + LOG.info("Channel open {}", ctx.getChannel()); + SocketAddress remote = ctx.getChannel().getRemoteAddress(); + if (remote instanceof InetSocketAddress) { + authProvider = authProviderFactory.newProvider((InetSocketAddress)remote, + new AuthHandshakeCompleteCallback()); + } else { + LOG.error("Unknown socket type {} for {}", remote.getClass(), remote); + } + super.channelOpen(ctx, e); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, + MessageEvent e) + throws Exception { + if (authProvider == null) { + // close the channel, authProvider should only be + // null if the other end of line is an InetSocketAddress + // anything else is strange, and we don't want to deal + // with it + ctx.getChannel().close(); + return; + } + + Object event = e.getMessage(); + if (authenticated) { + super.messageReceived(ctx, e); + } else if (event instanceof BookieProtocol.AuthRequest) { // pre-PB-client + BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest)event; + assert (req.getOpCode() == BookieProtocol.AUTH); + if (checkAuthPlugin(req.getAuthMessage(), ctx.getChannel())) { + authProvider.process(req.getAuthMessage(), + new AuthResponseCallbackLegacy(req, ctx.getChannel())); + } else { + ctx.getChannel().close(); + } + } else if (event instanceof BookieProtocol.Request) { + BookieProtocol.Request req = (BookieProtocol.Request)event; + if (req.getOpCode() == BookieProtocol.ADDENTRY) { + ctx.getChannel().write( + new BookieProtocol.AddResponse( + req.getProtocolVersion(), BookieProtocol.EUA, + req.getLedgerId(), req.getEntryId())); + } else if (req.getOpCode() == BookieProtocol.READENTRY) { + ctx.getChannel().write( + new BookieProtocol.ReadResponse( + req.getProtocolVersion(), BookieProtocol.EUA, + req.getLedgerId(), req.getEntryId())); + } else { + ctx.getChannel().close(); + } + } else if (event instanceof BookkeeperProtocol.Request) { // post-PB-client + BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)event; + if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH + && req.hasAuthRequest() + && checkAuthPlugin(req.getAuthRequest(), ctx.getChannel())) { + authProvider.process(req.getAuthRequest(), + new AuthResponseCallback(req, ctx.getChannel())); + } else { + BookkeeperProtocol.Response.Builder builder + = BookkeeperProtocol.Response.newBuilder() + .setHeader(req.getHeader()) + .setStatus(BookkeeperProtocol.StatusCode.EUA); + + ctx.getChannel().write(builder.build()); + } + } else { + // close the channel, junk coming over it + ctx.getChannel().close(); + } + } + + private boolean checkAuthPlugin(AuthMessage am, final Channel src) { + if (!am.hasAuthPluginName() + || !am.getAuthPluginName().equals(authProviderFactory.getPluginName())) { + LOG.error("Received message from incompatible auth plugin. Local = {}," + + " Remote = {}, Channel = {}", + authProviderFactory.getPluginName(), am.getAuthPluginName()); + return false; + } + return true; + } + + static class AuthResponseCallbackLegacy implements GenericCallback<AuthMessage> { + final BookieProtocol.AuthRequest req; + final Channel channel; + + AuthResponseCallbackLegacy(BookieProtocol.AuthRequest req, Channel channel) { + this.req = req; + this.channel = channel; + } + + public void operationComplete(int rc, AuthMessage newam) { + if (rc != BKException.Code.OK) { + LOG.error("Error processing auth message, closing connection"); + channel.close(); + return; + } + channel.write(new BookieProtocol.AuthResponse(req.getProtocolVersion(), + newam)); + } + } + + static class AuthResponseCallback implements GenericCallback<AuthMessage> { + final BookkeeperProtocol.Request req; + final Channel channel; + + AuthResponseCallback(BookkeeperProtocol.Request req, Channel channel) { + this.req = req; + this.channel = channel; + } + + public void operationComplete(int rc, AuthMessage newam) { + BookkeeperProtocol.Response.Builder builder + = BookkeeperProtocol.Response.newBuilder() + .setHeader(req.getHeader()); + + if (rc != BKException.Code.OK) { + LOG.error("Error processing auth message, closing connection"); + + builder.setStatus(BookkeeperProtocol.StatusCode.EUA); + channel.write(builder.build()); + channel.close(); + return; + } else { + builder.setStatus(BookkeeperProtocol.StatusCode.EOK) + .setAuthResponse(newam); + channel.write(builder.build()); + } + } + } + + class AuthHandshakeCompleteCallback implements GenericCallback<Void> { + @Override + public void operationComplete(int rc, Void v) { + if (rc == BKException.Code.OK) { + authenticated = true; + } else { + LOG.debug("Authentication failed on server side"); + } + } + } + } + + static class ClientSideHandler extends SimpleChannelHandler { + volatile boolean authenticated = false; + final ClientAuthProvider.Factory authProviderFactory; + ClientAuthProvider authProvider; + AtomicLong transactionIdGenerator; + Queue<MessageEvent> waitingForAuth = new ConcurrentLinkedQueue<MessageEvent>(); + + ClientSideHandler(ClientAuthProvider.Factory authProviderFactory, + AtomicLong transactionIdGenerator) { + this.authProviderFactory = authProviderFactory; + this.transactionIdGenerator = transactionIdGenerator; + authProvider = null; + } + + @Override + public void channelConnected(ChannelHandlerContext ctx, + ChannelStateEvent e) + throws Exception { + SocketAddress remote = ctx.getChannel().getRemoteAddress(); + if (remote instanceof InetSocketAddress) { + authProvider = authProviderFactory.newProvider((InetSocketAddress)remote, + new AuthHandshakeCompleteCallback(ctx)); + authProvider.init(new AuthRequestCallback(ctx)); + } else { + LOG.error("Unknown socket type {} for {}", remote.getClass(), remote); + } + super.channelConnected(ctx, e); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, + MessageEvent e) + throws Exception { + assert (authProvider != null); + + Object event = e.getMessage(); + + if (authenticated) { + super.messageReceived(ctx, e); + } else if (event instanceof BookkeeperProtocol.Response) { + BookkeeperProtocol.Response resp = (BookkeeperProtocol.Response)event; + if (resp.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH) { + if (resp.getStatus() != BookkeeperProtocol.StatusCode.EOK) { + authenticationError(ctx, resp.getStatus().getNumber()); + } else { + assert (resp.hasAuthResponse()); + BookkeeperProtocol.AuthMessage am = resp.getAuthResponse(); + authProvider.process(am, new AuthRequestCallback(ctx)); + } + } else { + // else just drop the message, + // we're not authenticated so nothing should be coming through + } + } + } + + @Override + public void writeRequested(ChannelHandlerContext ctx, + MessageEvent e) + throws Exception { + synchronized (waitingForAuth) { + if (authenticated) { + super.writeRequested(ctx, e); + } else if (e.getMessage() instanceof BookkeeperProtocol.Request) { + // let auth messages through, queue the rest + BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)e.getMessage(); + if (req.getHeader().getOperation() + == BookkeeperProtocol.OperationType.AUTH) { + super.writeRequested(ctx, e); + } else { + waitingForAuth.add(e); + } + } // else just drop + } + } + + long newTxnId() { + return transactionIdGenerator.incrementAndGet(); + } + + void authenticationError(ChannelHandlerContext ctx, int errorCode) { + LOG.error("Error processing auth message, erroring connection {}", errorCode); + ctx.sendUpstream(new DefaultExceptionEvent(ctx.getChannel(), + new AuthenticationException( + "Auth failed with error " + errorCode))); + } + + class AuthRequestCallback implements GenericCallback<AuthMessage> { + Channel channel; + ChannelHandlerContext ctx; + + AuthRequestCallback(ChannelHandlerContext ctx) { + this.channel = ctx.getChannel(); + this.ctx = ctx; + } + + public void operationComplete(int rc, AuthMessage newam) { + if (rc != BKException.Code.OK) { + authenticationError(ctx, rc); + return; + } + + BookkeeperProtocol.BKPacketHeader header + = BookkeeperProtocol.BKPacketHeader.newBuilder() + .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE) + .setOperation(BookkeeperProtocol.OperationType.AUTH) + .setTxnId(newTxnId()).build(); + BookkeeperProtocol.Request.Builder builder + = BookkeeperProtocol.Request.newBuilder() + .setHeader(header) + .setAuthRequest(newam); + + channel.write(builder.build()); + } + } + + class AuthHandshakeCompleteCallback implements GenericCallback<Void> { + ChannelHandlerContext ctx; + AuthHandshakeCompleteCallback(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + @Override + public void operationComplete(int rc, Void v) { + if (rc == BKException.Code.OK) { + synchronized (waitingForAuth) { + authenticated = true; + MessageEvent e = waitingForAuth.poll(); + while (e != null) { + ctx.sendDownstream(e); + e = waitingForAuth.poll(); + } + } + } else { + authenticationError(ctx, rc); + LOG.debug("Authentication failed on server side"); + } + } + } + } + + static class AuthenticationException extends IOException { + AuthenticationException(String reason) { + super(reason); + } + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index 8a79547..d0052d6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -28,8 +28,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; +import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -52,6 +55,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.ExtensionRegistry; /** * Implements the client-side part of the BookKeeper protocol. @@ -60,11 +64,18 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; public class BookieClient implements PerChannelBookieClientFactory { static final Logger LOG = LoggerFactory.getLogger(BookieClient.class); - final OrderedSafeExecutor executor; - final ClientSocketChannelFactory channelFactory; + // This is global state that should be across all BookieClients + AtomicLong totalBytesOutstanding = new AtomicLong(); + + OrderedSafeExecutor executor; + ClientSocketChannelFactory channelFactory; final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels = new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>(); final HashedWheelTimer requestTimer; + + final private ClientAuthProvider.Factory authProviderFactory; + final private ExtensionRegistry registry; + private final ClientConfiguration conf; private volatile boolean closed; private final ReentrantReadWriteLock closeLock; @@ -73,17 +84,22 @@ public class BookieClient implements PerChannelBookieClientFactory { private final long bookieErrorThresholdPerInterval; - public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) { + public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, + OrderedSafeExecutor executor) throws IOException { this(conf, channelFactory, executor, NullStatsLogger.INSTANCE); } - public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor, - StatsLogger statsLogger) { + public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, + OrderedSafeExecutor executor, StatsLogger statsLogger) throws IOException { this.conf = conf; this.channelFactory = channelFactory; this.executor = executor; this.closed = false; this.closeLock = new ReentrantReadWriteLock(); + + this.registry = ExtensionRegistry.newInstance(); + this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf, registry); + this.statsLogger = statsLogger; this.numConnectionsPerBookie = conf.getNumChannelsPerBookie(); this.requestTimer = new HashedWheelTimer( @@ -120,8 +136,8 @@ public class BookieClient implements PerChannelBookieClientFactory { @Override public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool) { - return new PerChannelBookieClient(conf, executor, channelFactory, address, - requestTimer, statsLogger, pcbcPool); + return new PerChannelBookieClient(conf, executor, channelFactory, address, requestTimer, statsLogger, + authProviderFactory, registry, pcbcPool); } private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr, Object key) { @@ -133,7 +149,7 @@ public class BookieClient implements PerChannelBookieClientFactory { return null; } PerChannelBookieClientPool newClientPool = - new DefaultPerChannelBookieClientPool(this, addr, numConnectionsPerBookie); + new DefaultPerChannelBookieClientPool(this, addr, numConnectionsPerBookie); PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool); if (null == oldClientPool) { clientPool = newClientPool; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java index b623998..bb1b207 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java @@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.auth.BookieAuthProvider; +import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.zookeeper.KeeperException; import org.jboss.netty.bootstrap.ServerBootstrap; @@ -48,6 +50,7 @@ import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.ExtensionRegistry; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -66,11 +69,21 @@ class BookieNettyServer { Object suspensionLock = new Object(); boolean suspended = false; + final BookieAuthProvider.Factory authProviderFactory; + final BookieProtoEncoding.ResponseEncoder responseEncoder; + final BookieProtoEncoding.RequestDecoder requestDecoder; + BookieNettyServer(ServerConfiguration conf, RequestProcessor processor) throws IOException, KeeperException, InterruptedException, BookieException { this.conf = conf; this.requestProcessor = processor; + ExtensionRegistry registry = ExtensionRegistry.newInstance(); + authProviderFactory = AuthProviderFactoryFactory.newBookieAuthProviderFactory(conf, registry); + + responseEncoder = new BookieProtoEncoding.ResponseEncoder(registry); + requestDecoder = new BookieProtoEncoding.RequestDecoder(registry); + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); String base = "bookie-" + conf.getBookiePort() + "-netty"; serverChannelFactory = new NioServerSocketChannelFactory( @@ -140,11 +153,15 @@ class BookieNettyServer { new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4)); pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); - pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder()); - pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder()); + pipeline.addLast("bookieProtoDecoder", requestDecoder); + pipeline.addLast("bookieProtoEncoder", responseEncoder); + pipeline.addLast("bookieAuthHandler", + new AuthHandler.ServerSideHandler(authProviderFactory)); + SimpleChannelHandler requestHandler = isRunning.get() ? new BookieRequestHandler(conf, requestProcessor, allChannels) : new RejectRequestHandler(); + pipeline.addLast("bookieRequestHandler", requestHandler); return pipeline; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 6ece56e..683a6fb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -20,29 +20,26 @@ */ package org.apache.bookkeeper.proto; +import com.google.protobuf.ByteString; +import com.google.protobuf.ExtensionRegistry; import com.google.protobuf.InvalidProtocolBufferException; + import org.jboss.netty.buffer.ChannelBufferFactory; import org.jboss.netty.buffer.ChannelBufferInputStream; +import org.jboss.netty.buffer.ChannelBufferOutputStream; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; - import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BookieProtoEncoding { private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class); - static final EnDecoder REQ_PREV3 = new RequestEnDeCoderPreV3(); - static final EnDecoder REP_PREV3 = new ResponseEnDeCoderPreV3(); - static final EnDecoder REQ_V3 = new RequestEnDecoderV3(); - static final EnDecoder REP_V3 = new ResponseEnDecoderV3(); - static interface EnDecoder { /** @@ -68,6 +65,12 @@ public class BookieProtoEncoding { } static class RequestEnDeCoderPreV3 implements EnDecoder { + final ExtensionRegistry extensionRegistry; + + RequestEnDeCoderPreV3(ExtensionRegistry extensionRegistry) { + this.extensionRegistry = extensionRegistry; + } + @Override public Object encode(Object msg, ChannelBufferFactory bufferFactory) throws Exception { @@ -83,8 +86,7 @@ public class BookieProtoEncoding { buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt()); buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH); return ChannelBuffers.wrappedBuffer(buf, ar.getData()); - } else { - assert(r instanceof BookieProtocol.ReadRequest); + } else if (r instanceof BookieProtocol.ReadRequest) { int totalHeaderSize = 4 // for request type + 8 // for ledgerId + 8; // for entryId @@ -101,6 +103,19 @@ public class BookieProtoEncoding { } return buf; + } else if (r instanceof BookieProtocol.AuthRequest) { + BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest)r).getAuthMessage(); + int totalHeaderSize = 4; // for request type + int totalSize = totalHeaderSize + am.getSerializedSize(); + ChannelBuffer buf = bufferFactory.getBuffer(totalSize); + buf.writeInt(new PacketHeader(r.getProtocolVersion(), + r.getOpCode(), + r.getFlags()).toInt()); + ChannelBufferOutputStream bufStream = new ChannelBufferOutputStream(buf); + am.writeTo(bufStream); + return buf; + } else { + return msg; } } @@ -141,12 +156,23 @@ public class BookieProtoEncoding { } else { return new BookieProtocol.ReadRequest(h.getVersion(), ledgerId, entryId, flags); } + case BookieProtocol.AUTH: + BookkeeperProtocol.AuthMessage.Builder builder + = BookkeeperProtocol.AuthMessage.newBuilder(); + builder.mergeFrom(new ChannelBufferInputStream(packet), extensionRegistry); + return new BookieProtocol.AuthRequest(h.getVersion(), builder.build()); } return packet; } } static class ResponseEnDeCoderPreV3 implements EnDecoder { + final ExtensionRegistry extensionRegistry; + + ResponseEnDeCoderPreV3(ExtensionRegistry extensionRegistry) { + this.extensionRegistry = extensionRegistry; + } + @Override public Object encode(Object msg, ChannelBufferFactory bufferFactory) throws Exception { @@ -157,12 +183,13 @@ public class BookieProtoEncoding { ChannelBuffer buf = bufferFactory.getBuffer(24); buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), (short)0).toInt()); - buf.writeInt(r.getErrorCode()); - buf.writeLong(r.getLedgerId()); - buf.writeLong(r.getEntryId()); ServerStats.getInstance().incrementPacketsSent(); if (msg instanceof BookieProtocol.ReadResponse) { + buf.writeInt(r.getErrorCode()); + buf.writeLong(r.getLedgerId()); + buf.writeLong(r.getEntryId()); + BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r; if (rr.hasData()) { return ChannelBuffers.wrappedBuffer(buf, @@ -171,7 +198,15 @@ public class BookieProtoEncoding { return buf; } } else if (msg instanceof BookieProtocol.AddResponse) { + buf.writeInt(r.getErrorCode()); + buf.writeLong(r.getLedgerId()); + buf.writeLong(r.getEntryId()); + return buf; + } else if (msg instanceof BookieProtocol.AuthResponse) { + BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse)r).getAuthMessage(); + return ChannelBuffers.wrappedBuffer(buf, + ChannelBuffers.wrappedBuffer(am.toByteArray())); } else { LOG.error("Cannot encode unknown response type {}", msg.getClass().getName()); return msg; @@ -180,19 +215,23 @@ public class BookieProtoEncoding { @Override public Object decode(ChannelBuffer buffer) throws Exception { - final int rc; - final long ledgerId, entryId; + int rc; + long ledgerId, entryId; final PacketHeader header; header = PacketHeader.fromInt(buffer.readInt()); - rc = buffer.readInt(); - ledgerId = buffer.readLong(); - entryId = buffer.readLong(); switch (header.getOpCode()) { case BookieProtocol.ADDENTRY: + rc = buffer.readInt(); + ledgerId = buffer.readLong(); + entryId = buffer.readLong(); return new BookieProtocol.AddResponse(header.getVersion(), rc, ledgerId, entryId); case BookieProtocol.READENTRY: + rc = buffer.readInt(); + ledgerId = buffer.readLong(); + entryId = buffer.readLong(); + if (rc == BookieProtocol.EOK) { return new BookieProtocol.ReadResponse(header.getVersion(), rc, ledgerId, entryId, buffer.slice()); @@ -200,6 +239,13 @@ public class BookieProtoEncoding { return new BookieProtocol.ReadResponse(header.getVersion(), rc, ledgerId, entryId); } + case BookieProtocol.AUTH: + ChannelBufferInputStream bufStream = new ChannelBufferInputStream(buffer); + BookkeeperProtocol.AuthMessage.Builder builder + = BookkeeperProtocol.AuthMessage.newBuilder(); + builder.mergeFrom(bufStream, extensionRegistry); + BookkeeperProtocol.AuthMessage am = builder.build(); + return new BookieProtocol.AuthResponse(header.getVersion(), am); default: return buffer; } @@ -207,10 +253,16 @@ public class BookieProtoEncoding { } static class RequestEnDecoderV3 implements EnDecoder { + final ExtensionRegistry extensionRegistry; + + RequestEnDecoderV3(ExtensionRegistry extensionRegistry) { + this.extensionRegistry = extensionRegistry; + } @Override public Object decode(ChannelBuffer packet) throws Exception { - return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet)); + return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet), + extensionRegistry); } @Override @@ -222,10 +274,16 @@ public class BookieProtoEncoding { } static class ResponseEnDecoderV3 implements EnDecoder { + final ExtensionRegistry extensionRegistry; + + ResponseEnDecoderV3(ExtensionRegistry extensionRegistry) { + this.extensionRegistry = extensionRegistry; + } @Override public Object decode(ChannelBuffer packet) throws Exception { - return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet)); + return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet), + extensionRegistry); } @Override @@ -238,6 +296,14 @@ public class BookieProtoEncoding { public static class RequestEncoder extends OneToOneEncoder { + final EnDecoder REQ_PREV3; + final EnDecoder REQ_V3; + + RequestEncoder(ExtensionRegistry extensionRegistry) { + REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry); + REQ_V3 = new RequestEnDecoderV3(extensionRegistry); + } + @Override protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { @@ -256,6 +322,13 @@ public class BookieProtoEncoding { } public static class RequestDecoder extends OneToOneDecoder { + final EnDecoder REQ_PREV3; + final EnDecoder REQ_V3; + + RequestDecoder(ExtensionRegistry extensionRegistry) { + REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry); + REQ_V3 = new RequestEnDecoderV3(extensionRegistry); + } @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) @@ -283,6 +356,13 @@ public class BookieProtoEncoding { } public static class ResponseEncoder extends OneToOneEncoder { + final EnDecoder REP_PREV3; + final EnDecoder REP_V3; + + ResponseEncoder(ExtensionRegistry extensionRegistry) { + REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry); + REP_V3 = new ResponseEnDecoderV3(extensionRegistry); + } @Override protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) @@ -302,6 +382,13 @@ public class BookieProtoEncoding { } public static class ResponseDecoder extends OneToOneDecoder { + final EnDecoder REP_PREV3; + final EnDecoder REP_V3; + + ResponseDecoder(ExtensionRegistry extensionRegistry) { + REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry); + REP_V3 = new ResponseEnDecoderV3(extensionRegistry); + } @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 4dd26d6..2ce5ed8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -24,6 +24,8 @@ package org.apache.bookkeeper.proto; import org.jboss.netty.buffer.ChannelBuffer; import java.nio.ByteBuffer; +import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; + /** * The packets of the Bookie protocol all have a 4-byte integer indicating the * type of request or response at the very beginning of the packet followed by a @@ -133,6 +135,13 @@ public interface BookieProtocol { public static final byte READENTRY = 2; /** + * Auth message. This code is for passing auth messages between the auth + * providers on the client and bookie. The message payload is determined + * by the auth providers themselves. + */ + public static final byte AUTH = 3; + + /** * The error code that indicates success */ public static final int EOK = 0; @@ -273,6 +282,19 @@ public interface BookieProtocol { } } + static class AuthRequest extends Request { + final AuthMessage authMessage; + + AuthRequest(byte protocolVersion, AuthMessage authMessage) { + super(protocolVersion, AUTH, -1, -1, FLAG_NONE, null); + this.authMessage = authMessage; + } + + AuthMessage getAuthMessage() { + return authMessage; + } + } + static class Response { final byte protocolVersion; final byte opCode; @@ -343,4 +365,18 @@ public interface BookieProtocol { super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId); } } + + static class AuthResponse extends Response { + final AuthMessage authMessage; + + AuthResponse(byte protocolVersion, AuthMessage authMessage) { + super(protocolVersion, AUTH, EOK, -1, -1); + this.authMessage = authMessage; + } + + AuthMessage getAuthMessage() { + return authMessage; + } + } + } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 9fec15f..1608328 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -124,6 +124,7 @@ public class BookieRequestProcessor implements RequestProcessor { processReadRequestV3(r, c); break; default: + LOG.info("Unknown operation type {}", header.getOperation()); BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader()) .setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
