sijie closed pull request #1805: Fixed Auth with v2 protocol
URL: https://github.com/apache/bookkeeper/pull/1805
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 1b1f60fc59..a7ac452aea 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -226,16 +226,19 @@ public void operationComplete(int rc, Void v) {
         final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>();
         final ClientConnectionPeer connectionPeer;
 
+        private final boolean isUsingV2Protocol;
+
         public ClientAuthProvider getAuthProvider() {
             return authProvider;
         }
 
         ClientSideHandler(ClientAuthProvider.Factory authProviderFactory, 
AtomicLong transactionIdGenerator,
-                ClientConnectionPeer connectionPeer) {
+                ClientConnectionPeer connectionPeer, boolean 
isUsingV2Protocol) {
             this.authProviderFactory = authProviderFactory;
             this.transactionIdGenerator = transactionIdGenerator;
             this.connectionPeer = connectionPeer;
             authProvider = null;
+            this.isUsingV2Protocol = isUsingV2Protocol;
         }
 
         @Override
@@ -279,7 +282,7 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
                             if 
(AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())){
                                 SocketAddress remote = 
ctx.channel().remoteAddress();
                                 LOG.info("Authentication is not enabled."
-                                    + "Considering this client {0} 
authenticated", remote);
+                                    + "Considering this client {} 
authenticated", remote);
                                 AuthHandshakeCompleteCallback cb = new 
AuthHandshakeCompleteCallback(ctx);
                                 cb.operationComplete(BKException.Code.OK, 
null);
                                 return;
@@ -296,6 +299,33 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
                         break;
                     }
                 }
+            } else if (msg instanceof BookieProtocol.Response) {
+                BookieProtocol.Response resp = (BookieProtocol.Response) msg;
+                switch (resp.opCode) {
+                case BookieProtocol.AUTH:
+                    if (resp.errorCode != BookieProtocol.EOK) {
+                        authenticationError(ctx, resp.errorCode);
+                    } else {
+                        BookkeeperProtocol.AuthMessage am = 
((BookieProtocol.AuthResponse) resp).authMessage;
+                        if 
(AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())) {
+                            SocketAddress remote = 
ctx.channel().remoteAddress();
+                            LOG.info("Authentication is not enabled."
+                                    + "Considering this client {} 
authenticated", remote);
+                            AuthHandshakeCompleteCallback cb = new 
AuthHandshakeCompleteCallback(ctx);
+                            cb.operationComplete(BKException.Code.OK, null);
+                            return;
+                        }
+                        byte[] payload = am.getPayload().toByteArray();
+                        authProvider.process(AuthToken.wrap(payload), new 
AuthRequestCallback(ctx,
+                                authProviderFactory.getPluginName()));
+                    }
+                    break;
+                default:
+                    LOG.warn("dropping received message {} from bookie {}", 
msg, ctx.channel());
+                    // else just drop the message, we're not authenticated so 
nothing should be coming
+                    // through
+                    break;
+                }
             }
         }
 
@@ -319,7 +349,7 @@ public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
                 } else if (msg instanceof BookieProtocol.Request) {
                     // let auth messages through, queue the rest
                     BookieProtocol.Request req = (BookieProtocol.Request) msg;
-                    if (BookkeeperProtocol.OperationType.AUTH.getNumber() == 
req.getOpCode()) {
+                    if (BookieProtocol.AUTH == req.getOpCode()) {
                         super.write(ctx, msg, promise);
                         super.flush(ctx);
                     } else {
@@ -356,16 +386,24 @@ public void operationComplete(int rc, AuthToken newam) {
                     authenticationError(ctx, rc);
                     return;
                 }
+
                 AuthMessage message = 
AuthMessage.newBuilder().setAuthPluginName(pluginName)
                         
.setPayload(ByteString.copyFrom(newam.getData())).build();
 
-                BookkeeperProtocol.BKPacketHeader header = 
BookkeeperProtocol.BKPacketHeader.newBuilder()
-                        
.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
-                        
.setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
-                BookkeeperProtocol.Request.Builder builder = 
BookkeeperProtocol.Request.newBuilder().setHeader(header)
-                        .setAuthRequest(message);
-
-                channel.writeAndFlush(builder.build());
+                if (isUsingV2Protocol) {
+                    channel.writeAndFlush(
+                            new 
BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message),
+                            channel.voidPromise());
+                } else {
+                    // V3 protocol
+                    BookkeeperProtocol.BKPacketHeader header = 
BookkeeperProtocol.BKPacketHeader.newBuilder()
+                            
.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
+                            
.setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
+                    BookkeeperProtocol.Request.Builder builder = 
BookkeeperProtocol.Request.newBuilder()
+                            .setHeader(header)
+                            .setAuthRequest(message);
+                    channel.writeAndFlush(builder.build());
+                }
             }
         }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 5a50bf6929..78b35ec6cd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -498,6 +498,17 @@ public void processRequest(Object msg, Channel c) {
                     checkArgument(r instanceof BookieProtocol.ReadRequest);
                     processReadRequest((BookieProtocol.ReadRequest) r, c);
                     break;
+                case BookieProtocol.AUTH:
+                    LOG.info("Ignoring auth operation from client {}", 
c.remoteAddress());
+                    BookkeeperProtocol.AuthMessage message = 
BookkeeperProtocol.AuthMessage
+                            .newBuilder()
+                            
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
+                            
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
+                            .build();
+
+                    c.writeAndFlush(new BookieProtocol.AuthResponse(
+                            BookieProtocol.CURRENT_PROTOCOL_VERSION, message));
+                    break;
                 default:
                     LOG.error("Unknown op type {}, sending error", 
r.getOpCode());
                     
c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 071b8395bf..917f6f3323 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -462,7 +462,7 @@ protected void initChannel(Channel ch) throws Exception {
                     "bookieProtoDecoder",
                     new BookieProtoEncoding.ResponseDecoder(extRegistry, 
useV2WireProtocol));
                 pipeline.addLast("authHandler", new 
AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
-                            connectionPeer));
+                            connectionPeer, useV2WireProtocol));
                 pipeline.addLast("mainhandler", PerChannelBookieClient.this);
             }
         });
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
index cca97e8d40..bf7d84b954 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
@@ -28,6 +28,7 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Enumeration;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -45,12 +46,16 @@
 import org.apache.bookkeeper.proto.ClientConnectionPeer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test authentication.
  */
+@RunWith(Parameterized.class)
 public class TestAuth extends BookKeeperClusterTestCase {
     static final Logger LOG = LoggerFactory.getLogger(TestAuth.class);
     public static final String TEST_AUTH_PROVIDER_PLUGIN_NAME = 
"TestAuthProviderPlugin";
@@ -61,8 +66,29 @@
     private static final byte[] FAILURE_RESPONSE = {2};
     private static final byte[] PAYLOAD_MESSAGE = {3};
 
-    public TestAuth() {
+    enum ProtocolVersion {
+        ProtocolV2, ProtocolV3
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+                { ProtocolVersion.ProtocolV2 },
+                { ProtocolVersion.ProtocolV3 },
+        });
+    }
+
+    private final ProtocolVersion protocolVersion;
+
+    public TestAuth(ProtocolVersion protocolVersion) {
         super(0); // start them later when auth providers are configured
+        this.protocolVersion = protocolVersion;
+    }
+
+    protected ClientConfiguration newClientConfiguration() {
+        ClientConfiguration conf = super.newClientConfiguration();
+        conf.setUseV2WireProtocol(protocolVersion == 
ProtocolVersion.ProtocolV2);
+        return conf;
     }
 
     // we pass in ledgerId because the method may throw exceptions
@@ -136,6 +162,13 @@ public void testSingleMessageAuth() throws Exception {
 
     @Test
     public void testCloseMethodCalledOnAuthProvider() throws Exception {
+        LogCloseCallsBookieAuthProviderFactory.closeCountersOnFactory.set(0);
+        
LogCloseCallsBookieAuthProviderFactory.closeCountersOnConnections.set(0);
+        LogCloseCallsBookieAuthProviderFactory.initCountersOnFactory.set(0);
+        
LogCloseCallsBookieAuthProviderFactory.initCountersOnConnections.set(0);
+        LogCloseCallsClientAuthProviderFactory.initCountersOnFactory.set(0);
+        LogCloseCallsClientAuthProviderFactory.closeCountersOnFactory.set(0);
+
         ServerConfiguration bookieConf = newServerConfiguration();
         bookieConf.setBookieAuthProviderFactoryClass(
                 LogCloseCallsBookieAuthProviderFactory.class.getName());
@@ -272,6 +305,11 @@ public void testDifferentPluginFailure() throws Exception {
         } catch (BKException.BKUnauthorizedAccessException bke) {
             // bookie should have sent a negative response before
             // breaking the conneciton
+            assertEquals(ProtocolVersion.ProtocolV3, protocolVersion);
+        } catch (BKException.BKNotEnoughBookiesException nebe) {
+            // With V2 we don't get the authorization error, but rather just
+            // fail to write to bookies.
+            assertEquals(ProtocolVersion.ProtocolV2, protocolVersion);
         }
         assertFalse(ledgerId.get() == -1);
         assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), 
bookieConf, clientConf));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to