This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-4.8 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.8 by this push: new f51aec6 Fixed Auth with v2 protocol f51aec6 is described below commit f51aec66233c3f355f3c6a1f2c88693f46417d7a Author: Matteo Merli <mme...@apache.org> AuthorDate: Sun Nov 11 10:42:32 2018 -0800 Fixed Auth with v2 protocol ### Motivation BK auth framework is currently broken when using v2 protocol. ### Changes * Fixed auth when using V2 protocol * Made sure a client with authentication enabled can talk to a bookie without authentication. This is required in any case when enabling/disabling authentication on a live cluster. * Run all auth tests against both v2 and v3 protocol. This should be included in 4.7.2 to give a path to upgrade. cc/ rdhabalia Reviewers: Enrico Olivelli <eolive...@gmail.com>, Sijie Guo <si...@apache.org> This closes #1805 from merlimat/fix-v2-auth (cherry picked from commit dc2aaaa070d9a8d393409b69c80ad668b70f6d2b) Signed-off-by: Sijie Guo <si...@apache.org> --- .../org/apache/bookkeeper/proto/AuthHandler.java | 58 ++++++++++++++++++---- .../bookkeeper/proto/BookieRequestProcessor.java | 11 ++++ .../bookkeeper/proto/PerChannelBookieClient.java | 2 +- .../java/org/apache/bookkeeper/auth/TestAuth.java | 40 ++++++++++++++- 4 files changed, 99 insertions(+), 12 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index 1b1f60f..a7ac452 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -226,16 +226,19 @@ class AuthHandler { final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>(); final ClientConnectionPeer connectionPeer; + private final boolean isUsingV2Protocol; + public ClientAuthProvider getAuthProvider() { return authProvider; } ClientSideHandler(ClientAuthProvider.Factory authProviderFactory, AtomicLong transactionIdGenerator, - ClientConnectionPeer connectionPeer) { + ClientConnectionPeer connectionPeer, boolean isUsingV2Protocol) { this.authProviderFactory = authProviderFactory; this.transactionIdGenerator = transactionIdGenerator; this.connectionPeer = connectionPeer; authProvider = null; + this.isUsingV2Protocol = isUsingV2Protocol; } @Override @@ -279,7 +282,7 @@ class AuthHandler { if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())){ SocketAddress remote = ctx.channel().remoteAddress(); LOG.info("Authentication is not enabled." - + "Considering this client {0} authenticated", remote); + + "Considering this client {} authenticated", remote); AuthHandshakeCompleteCallback cb = new AuthHandshakeCompleteCallback(ctx); cb.operationComplete(BKException.Code.OK, null); return; @@ -296,6 +299,33 @@ class AuthHandler { break; } } + } else if (msg instanceof BookieProtocol.Response) { + BookieProtocol.Response resp = (BookieProtocol.Response) msg; + switch (resp.opCode) { + case BookieProtocol.AUTH: + if (resp.errorCode != BookieProtocol.EOK) { + authenticationError(ctx, resp.errorCode); + } else { + BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) resp).authMessage; + if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())) { + SocketAddress remote = ctx.channel().remoteAddress(); + LOG.info("Authentication is not enabled." + + "Considering this client {} authenticated", remote); + AuthHandshakeCompleteCallback cb = new AuthHandshakeCompleteCallback(ctx); + cb.operationComplete(BKException.Code.OK, null); + return; + } + byte[] payload = am.getPayload().toByteArray(); + authProvider.process(AuthToken.wrap(payload), new AuthRequestCallback(ctx, + authProviderFactory.getPluginName())); + } + break; + default: + LOG.warn("dropping received message {} from bookie {}", msg, ctx.channel()); + // else just drop the message, we're not authenticated so nothing should be coming + // through + break; + } } } @@ -319,7 +349,7 @@ class AuthHandler { } else if (msg instanceof BookieProtocol.Request) { // let auth messages through, queue the rest BookieProtocol.Request req = (BookieProtocol.Request) msg; - if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) { + if (BookieProtocol.AUTH == req.getOpCode()) { super.write(ctx, msg, promise); super.flush(ctx); } else { @@ -356,16 +386,24 @@ class AuthHandler { authenticationError(ctx, rc); return; } + AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(pluginName) .setPayload(ByteString.copyFrom(newam.getData())).build(); - BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder() - .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE) - .setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build(); - BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder().setHeader(header) - .setAuthRequest(message); - - channel.writeAndFlush(builder.build()); + if (isUsingV2Protocol) { + channel.writeAndFlush( + new BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message), + channel.voidPromise()); + } else { + // V3 protocol + BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder() + .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE) + .setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build(); + BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder() + .setHeader(header) + .setAuthRequest(message); + channel.writeAndFlush(builder.build()); + } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 8ee363f..a7f2dd6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -489,6 +489,17 @@ public class BookieRequestProcessor implements RequestProcessor { checkArgument(r instanceof BookieProtocol.ReadRequest); processReadRequest((BookieProtocol.ReadRequest) r, c); break; + case BookieProtocol.AUTH: + LOG.info("Ignoring auth operation from client {}", c.remoteAddress()); + BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage + .newBuilder() + .setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME) + .setPayload(ByteString.copyFrom(AuthToken.NULL.getData())) + .build(); + + c.writeAndFlush(new BookieProtocol.AuthResponse( + BookieProtocol.CURRENT_PROTOCOL_VERSION, message)); + break; default: LOG.error("Unknown op type {}, sending error", r.getOpCode()); c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r)); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index dd006f4..4f1f35c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -457,7 +457,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { "bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder(extRegistry, useV2WireProtocol)); pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator, - connectionPeer)); + connectionPeer, useV2WireProtocol)); pipeline.addLast("mainhandler", PerChannelBookieClient.this); } }); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java index cca97e8..bf7d84b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java @@ -28,6 +28,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Enumeration; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -45,12 +46,16 @@ import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.proto.ClientConnectionPeer; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Test authentication. */ +@RunWith(Parameterized.class) public class TestAuth extends BookKeeperClusterTestCase { static final Logger LOG = LoggerFactory.getLogger(TestAuth.class); public static final String TEST_AUTH_PROVIDER_PLUGIN_NAME = "TestAuthProviderPlugin"; @@ -61,8 +66,29 @@ public class TestAuth extends BookKeeperClusterTestCase { private static final byte[] FAILURE_RESPONSE = {2}; private static final byte[] PAYLOAD_MESSAGE = {3}; - public TestAuth() { + enum ProtocolVersion { + ProtocolV2, ProtocolV3 + } + + @Parameters + public static Collection<Object[]> configs() { + return Arrays.asList(new Object[][] { + { ProtocolVersion.ProtocolV2 }, + { ProtocolVersion.ProtocolV3 }, + }); + } + + private final ProtocolVersion protocolVersion; + + public TestAuth(ProtocolVersion protocolVersion) { super(0); // start them later when auth providers are configured + this.protocolVersion = protocolVersion; + } + + protected ClientConfiguration newClientConfiguration() { + ClientConfiguration conf = super.newClientConfiguration(); + conf.setUseV2WireProtocol(protocolVersion == ProtocolVersion.ProtocolV2); + return conf; } // we pass in ledgerId because the method may throw exceptions @@ -136,6 +162,13 @@ public class TestAuth extends BookKeeperClusterTestCase { @Test public void testCloseMethodCalledOnAuthProvider() throws Exception { + LogCloseCallsBookieAuthProviderFactory.closeCountersOnFactory.set(0); + LogCloseCallsBookieAuthProviderFactory.closeCountersOnConnections.set(0); + LogCloseCallsBookieAuthProviderFactory.initCountersOnFactory.set(0); + LogCloseCallsBookieAuthProviderFactory.initCountersOnConnections.set(0); + LogCloseCallsClientAuthProviderFactory.initCountersOnFactory.set(0); + LogCloseCallsClientAuthProviderFactory.closeCountersOnFactory.set(0); + ServerConfiguration bookieConf = newServerConfiguration(); bookieConf.setBookieAuthProviderFactoryClass( LogCloseCallsBookieAuthProviderFactory.class.getName()); @@ -272,6 +305,11 @@ public class TestAuth extends BookKeeperClusterTestCase { } catch (BKException.BKUnauthorizedAccessException bke) { // bookie should have sent a negative response before // breaking the conneciton + assertEquals(ProtocolVersion.ProtocolV3, protocolVersion); + } catch (BKException.BKNotEnoughBookiesException nebe) { + // With V2 we don't get the authorization error, but rather just + // fail to write to bookies. + assertEquals(ProtocolVersion.ProtocolV2, protocolVersion); } assertFalse(ledgerId.get() == -1); assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));