sijie closed pull request #1106: Issue 1103: Add channel TLS counters
URL: https://github.com/apache/bookkeeper/pull/1106
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 60058e363..81b7c2f34 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -66,9 +66,14 @@
     String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
     String CHANNEL_START_TLS_OP = "START_TLS";
     String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
+
     String NETTY_EXCEPTION_CNT = "NETTY_EXCEPTION_CNT";
     String CLIENT_CONNECT_TIMER = "CLIENT_CONNECT_TIMER";
     String ADD_OP_OUTSTANDING = "ADD_OP_OUTSTANDING";
     String READ_OP_OUTSTANDING = "READ_OP_OUTSTANDING";
     String NETTY_OPS = "NETTY_OPS";
+    String ACTIVE_NON_TLS_CHANNEL_COUNTER = "ACTIVE_NON_TLS_CHANNEL_COUNTER";
+    String ACTIVE_TLS_CHANNEL_COUNTER = "ACTIVE_TLS_CHANNEL_COUNTER";
+    String FAILED_CONNECTION_COUNTER = "FAILED_CONNECTION_COUNTER";
+    String FAILED_TLS_HANDSHAKE_COUNTER = "FAILED_TLS_HANDSHAKE_COUNTER";
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 75b319308..d9206c900 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -50,6 +50,7 @@
 import io.netty.channel.local.LocalChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.CorruptedFrameException;
+import io.netty.handler.codec.DecoderException;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.codec.LengthFieldPrepender;
 import io.netty.handler.codec.TooLongFrameException;
@@ -76,6 +77,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiPredicate;
 
+import javax.net.ssl.SSLHandshakeException;
 import javax.net.ssl.SSLPeerUnverifiedException;
 
 import org.apache.bookkeeper.auth.BookKeeperPrincipal;
@@ -180,6 +182,10 @@
     private final Counter readEntryOutstanding;
     /* collect stats on all Ops that flows through netty pipeline */
     private final OpStatsLogger nettyOpLogger;
+    private final Counter activeNonTlsChannelCounter;
+    private final Counter activeTlsChannelCounter;
+    private final Counter failedConnectionCounter;
+    private final Counter failedTlsHandshakeCounter;
 
     private final boolean useV2WireProtocol;
 
@@ -282,6 +288,10 @@ public PerChannelBookieClient(ClientConfiguration conf, 
OrderedSafeExecutor exec
         addEntryOutstanding = 
statsLogger.getCounter(BookKeeperClientStats.ADD_OP_OUTSTANDING);
         readEntryOutstanding = 
statsLogger.getCounter(BookKeeperClientStats.READ_OP_OUTSTANDING);
         nettyOpLogger = 
statsLogger.getOpStatsLogger(BookKeeperClientStats.NETTY_OPS);
+        activeNonTlsChannelCounter = 
statsLogger.getCounter(BookKeeperClientStats.ACTIVE_NON_TLS_CHANNEL_COUNTER);
+        activeTlsChannelCounter = 
statsLogger.getCounter(BookKeeperClientStats.ACTIVE_TLS_CHANNEL_COUNTER);
+        failedConnectionCounter = 
statsLogger.getCounter(BookKeeperClientStats.FAILED_CONNECTION_COUNTER);
+        failedTlsHandshakeCounter = 
statsLogger.getCounter(BookKeeperClientStats.FAILED_TLS_HANDSHAKE_COUNTER);
 
         this.pcbcPool = pcbcPool;
 
@@ -836,6 +846,13 @@ public void close(boolean wait) {
         } finally {
             closeLock.writeLock().unlock();
         }
+
+        if (channel != null && channel.pipeline().get(SslHandler.class) != 
null) {
+            activeTlsChannelCounter.dec();
+        } else {
+            activeNonTlsChannelCounter.dec();
+        }
+
         closeInternal(true, wait);
     }
 
@@ -856,13 +873,13 @@ private void closeInternal(boolean permanent, boolean 
wait) {
                 cf.awaitUninterruptibly();
             }
         }
-
     }
 
     private ChannelFuture closeChannel(Channel c) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Closing channel {}", c);
         }
+
         return c.close();
     }
 
@@ -940,7 +957,7 @@ void errorOut(final CompletionKey key, final int rc) {
     /**
      * Errors out pending ops from per channel bookie client. As the channel
      * is being closed, all the operations waiting on the connection
-     * will be sent to completion with error
+     * will be sent to completion with error.
      */
     void errorOutPendingOps(int rc) {
         Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
@@ -992,6 +1009,11 @@ public void channelInactive(ChannelHandlerContext ctx) 
throws Exception {
         LOG.info("Disconnected from bookie channel {}", ctx.channel());
         if (ctx.channel() != null) {
             closeChannel(ctx.channel());
+            if (ctx.channel().pipeline().get(SslHandler.class) != null) {
+                activeTlsChannelCounter.dec();
+            } else {
+                activeNonTlsChannelCounter.dec();
+            }
         }
 
         
errorOutOutstandingEntries(BKException.Code.BookieHandleNotAvailableException);
@@ -1031,6 +1053,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) throws E
             return;
         }
 
+        if (cause instanceof DecoderException && cause.getCause() instanceof 
SSLHandshakeException) {
+            LOG.error("TLS handshake failed", cause);
+            errorOutPendingOps(BKException.Code.SecurityException);
+            Channel c = ctx.channel();
+            if (c != null) {
+                closeChannel(c);
+            }
+        }
+
         if (cause instanceof IOException) {
             LOG.warn("Exception caught on:{} cause:", ctx.channel(), cause);
             ctx.close();
@@ -1254,6 +1285,7 @@ public void operationComplete(Future<Channel> future) 
throws Exception {
                             AuthHandler.ClientSideHandler authHandler = 
future.get().pipeline()
                                     .get(AuthHandler.ClientSideHandler.class);
                             authHandler.authProvider.onProtocolUpgrade();
+                            activeTlsChannelCounter.inc();
                         } else if (future.isSuccess()
                                 && (state == ConnectionState.CLOSED || state 
== ConnectionState.DISCONNECTED)) {
                             LOG.warn("Closed before TLS handshake completed, 
clean up: {}, current state {}",
@@ -1275,6 +1307,7 @@ public void operationComplete(Future<Channel> future) 
throws Exception {
                             if (state != ConnectionState.CLOSED) {
                                 state = ConnectionState.DISCONNECTED;
                             }
+                            failedTlsHandshakeCounter.inc();
                         }
 
                         // trick to not do operations under the lock, take the 
list
@@ -2010,6 +2043,7 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
                     } else {
                         LOG.info("Successfully connected to bookie: " + addr);
                         state = ConnectionState.CONNECTED;
+                        activeNonTlsChannelCounter.inc();
                     }
                 } else if (future.isSuccess() && state == 
ConnectionState.START_TLS) {
                     rc = BKException.Code.OK;
@@ -2019,6 +2053,7 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
                     AuthHandler.ClientSideHandler authHandler = 
future.channel().pipeline()
                             .get(AuthHandler.ClientSideHandler.class);
                     authHandler.authProvider.onProtocolUpgrade();
+                    activeTlsChannelCounter.inc();
                 } else if (future.isSuccess() && (state == 
ConnectionState.CLOSED
                     || state == ConnectionState.DISCONNECTED)) {
                     LOG.warn("Closed before connection completed, clean up: 
{}, current state {}",
@@ -2040,6 +2075,7 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
                     if (state != ConnectionState.CLOSED) {
                         state = ConnectionState.DISCONNECTED;
                     }
+                    failedConnectionCounter.inc();
                 }
 
                 // trick to not do operations under the lock, take the list
@@ -2085,5 +2121,6 @@ private void failTLS(int rc) {
         for (GenericCallback<PerChannelBookieClient> pendingOp : 
oldPendingOps) {
             pendingOp.operationComplete(rc, null);
         }
+        failedTlsHandshakeCounter.inc();
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
index 2d3f51b2a..17aea85fa 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
@@ -58,7 +58,20 @@
      * Supported Key File Types.
      */
     public enum KeyStoreType {
-        PKCS12, JKS, PEM;
+        PKCS12("PKCS12"),
+        JKS("JKS"),
+        PEM("PEM");
+
+        private String str;
+
+        KeyStoreType(String str) {
+            this.str = str;
+        }
+
+        @Override
+        public String toString() {
+            return this.str;
+        }
     }
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TLSContextFactory.class);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
index 31ea7bdf7..d4a480f50 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.tls;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -24,6 +25,7 @@
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
@@ -42,6 +44,8 @@
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.BookKeeperClientStats;
+import org.apache.bookkeeper.client.BookKeeperTestClient;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
@@ -49,9 +53,11 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieConnectionPeer;
+import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.proto.ClientConnectionPeer;
 import org.apache.bookkeeper.proto.TestPerChannelBookieClient;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.tls.TLSContextFactory.KeyStoreType;
 import org.junit.After;
 import org.junit.Assume;
@@ -281,27 +287,31 @@ public void testStartTLSServerBadPassword() throws 
Exception {
         }
     }
 
-    private LedgerMetadata testClient(ClientConfiguration conf, int 
clusterSize) throws Exception {
-        try (BookKeeper client = new BookKeeper(conf);) {
-            byte[] passwd = "testPassword".getBytes();
-            int numEntries = 100;
-            long lid;
-            byte[] testEntry = "testEntry".getBytes();
-            try (LedgerHandle lh = client.createLedger(clusterSize, 
clusterSize, DigestType.CRC32, passwd);) {
-                for (int i = 0; i <= numEntries; i++) {
-                    lh.addEntry(testEntry);
-                }
-                lid = lh.getId();
+    private LedgerMetadata testClient(BookKeeper client, int clusterSize) 
throws Exception {
+        byte[] passwd = "testPassword".getBytes();
+        int numEntries = 100;
+        long lid;
+        byte[] testEntry = "testEntry".getBytes();
+        try (LedgerHandle lh = client.createLedger(clusterSize, clusterSize, 
DigestType.CRC32, passwd);) {
+            for (int i = 0; i <= numEntries; i++) {
+                lh.addEntry(testEntry);
             }
-            try (LedgerHandle lh = client.openLedger(lid, DigestType.CRC32, 
passwd);) {
-                Enumeration<LedgerEntry> entries = lh.readEntries(0, 
numEntries);
-                while (entries.hasMoreElements()) {
-                    LedgerEntry e = entries.nextElement();
-                    assertTrue("Entry contents incorrect", 
Arrays.equals(e.getEntry(), testEntry));
-                }
-                BookKeeperAdmin admin = new BookKeeperAdmin(client);
-                return admin.getLedgerMetadata(lh);
+            lid = lh.getId();
+        }
+        try (LedgerHandle lh = client.openLedger(lid, DigestType.CRC32, 
passwd);) {
+            Enumeration<LedgerEntry> entries = lh.readEntries(0, numEntries);
+            while (entries.hasMoreElements()) {
+                LedgerEntry e = entries.nextElement();
+                assertTrue("Entry contents incorrect", 
Arrays.equals(e.getEntry(), testEntry));
             }
+            BookKeeperAdmin admin = new BookKeeperAdmin(client);
+            return admin.getLedgerMetadata(lh);
+        }
+    }
+
+    private LedgerMetadata testClient(ClientConfiguration conf, int 
clusterSize) throws Exception {
+        try (BookKeeper client = new BookKeeper(conf);) {
+            return testClient(client, clusterSize);
         }
     }
 
@@ -567,7 +577,7 @@ public void 
testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception
         secureBookieSideChannel = false;
         secureBookieSideChannelPrincipals = null;
         ClientConfiguration clientConf = new 
ClientConfiguration(baseClientConf);
-        clientConf.setTLSProviderFactoryClass("");
+        clientConf.setTLSProviderFactoryClass(null);
 
         try {
             testClient(clientConf, numBookies);
@@ -671,18 +681,6 @@ public void testMixedCluster() throws Exception {
         int origNumBookies = numBookies;
 
         ServerConfiguration bookieConf = newServerConfiguration();
-        /*
-        bookieConf.setTLSProviderFactoryClass(null);
-        bs.add(startBookie(bookieConf));
-        try {
-            testClient(clientConf, origNumBookies + 1);
-            fail("Shouldn't be able to connect");
-        } catch (BKException.BKNotEnoughBookiesException nnbe) {
-            // correct response
-        }
-
-        bookieConf = newServerConfiguration();
-        */
         
bookieConf.setTLSProviderFactoryClass(TLSContextFactory.class.getName());
         bs.add(startBookie(bookieConf));
         testClient(clientConf, origNumBookies + 1);
@@ -705,4 +703,145 @@ public void testHungServer() throws Exception {
         LOG.info("latch countdown");
         latch.countDown();
     }
+
+    /**
+     * Verify TLS and non-TLS channel counters.
+     */
+    @Test
+    public void testTLSChannelCounters() throws Exception {
+        ClientConfiguration tlsClientconf = new 
ClientConfiguration(baseClientConf)
+                .setNumChannelsPerBookie(1);
+        ClientConfiguration nonTlsClientconf = new 
ClientConfiguration(baseClientConf)
+                .setNumChannelsPerBookie(1)
+                .setTLSProviderFactoryClass(null);
+
+        TestStatsProvider tlsStatsProvider = new TestStatsProvider();
+        TestStatsProvider nonTlsStatsProvider = new TestStatsProvider();
+        BookKeeperTestClient tlsClient = new 
BookKeeperTestClient(tlsClientconf, tlsStatsProvider);
+        BookKeeperTestClient nonTlsClient = new 
BookKeeperTestClient(nonTlsClientconf, nonTlsStatsProvider);
+
+        // IO load from clients
+        testClient(tlsClient, numBookies);
+        testClient(nonTlsClient, numBookies);
+
+        // verify stats
+        for (int i = 0; i < numBookies; i++) {
+            BookieServer bookie = bs.get(i);
+            InetSocketAddress addr = 
bookie.getLocalAddress().getSocketAddress();
+            StringBuilder nameBuilder = new 
StringBuilder(BookKeeperClientStats.CHANNEL_SCOPE)
+                    .append(".")
+                    .append(addr.getAddress().getHostAddress()
+                    .replace('.', '_')
+                    .replace('-', '_'))
+                    .append("_")
+                    .append(addr.getPort())
+                    .append(".");
+
+            // check stats on TLS enabled client
+            assertEquals("Mismatch TLS channel count", 1,
+                    
tlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + 
BookKeeperClientStats.ACTIVE_TLS_CHANNEL_COUNTER).get().longValue());
+            assertEquals("TLS handshake failure unexpected", 0,
+                    
tlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + 
BookKeeperClientStats.FAILED_TLS_HANDSHAKE_COUNTER).get().longValue());
+            assertEquals("Mismatch non-TLS channel count", 0,
+                    
tlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + 
BookKeeperClientStats.ACTIVE_NON_TLS_CHANNEL_COUNTER).get().longValue());
+            assertEquals("Connection failures unexpected", 0,
+                    
tlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + 
BookKeeperClientStats.FAILED_CONNECTION_COUNTER).get().longValue());
+
+            // check stats on non-TLS enabled client
+            assertEquals("Mismatch TLS channel count", 0,
+                    
nonTlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + 
BookKeeperClientStats.ACTIVE_TLS_CHANNEL_COUNTER).get().longValue());
+            assertEquals("TLS handshake failure unexpected", 0,
+                    
nonTlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + 
BookKeeperClientStats.FAILED_TLS_HANDSHAKE_COUNTER).get().longValue());
+            assertEquals("Mismatch non-TLS channel count", 1,
+                    
nonTlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + 
BookKeeperClientStats.ACTIVE_NON_TLS_CHANNEL_COUNTER).get().longValue());
+            assertEquals("Connection failures unexpected", 0,
+                    
nonTlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + 
BookKeeperClientStats.FAILED_CONNECTION_COUNTER).get().longValue());
+
+            bookie.shutdown();
+            assertEquals("Mismatch TLS channel count", 0,
+                    
tlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + 
BookKeeperClientStats.ACTIVE_TLS_CHANNEL_COUNTER).get().longValue());
+            assertEquals("Mismatch non-TLS channel count", 0,
+                    
nonTlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + 
BookKeeperClientStats.ACTIVE_NON_TLS_CHANNEL_COUNTER).get().longValue());
+
+        }
+    }
+
+    /**
+     * Verify handshake failure due to missing entry in trust store.
+     */
+    @Test
+    public void testHandshakeFailure() throws Exception {
+        ClientConfiguration clientConf = new 
ClientConfiguration(baseClientConf)
+                .setNumChannelsPerBookie(1);
+
+        // restart a bookie with wrong trust store
+        int restartBookieIdx = 0;
+        ServerConfiguration badBookieConf = bsConfs.get(restartBookieIdx);
+
+        switch (serverTrustStoreFormat) {
+            case PEM:
+                
badBookieConf.setTLSTrustStore(getResourcePath("server-cert.pem"));
+                break;
+            case JKS:
+                
badBookieConf.setTLSTrustStore(getResourcePath("server-key.jks"))
+                        
.setTLSTrustStorePasswordPath(getResourcePath("keyStoreServerPassword.txt"));
+                break;
+            case PKCS12:
+                
badBookieConf.setTLSTrustStore(getResourcePath("server-key.p12"))
+                        
.setTLSTrustStorePasswordPath(getResourcePath("keyStoreServerPassword.txt"));
+                break;
+            default:
+                throw new Exception("Unrecognized trust store format: " + 
serverTrustStoreFormat);
+        }
+
+
+        killBookie(restartBookieIdx);
+        LOG.info("Sleeping for 1s before restarting bookie with bad cert");
+        Thread.sleep(1000);
+        BookieServer bookie = startBookie(badBookieConf);
+        bs.add(bookie);
+        bsConfs.add(badBookieConf);
+
+        // Create ledger and write entries
+        TestStatsProvider testStatsProvider = new TestStatsProvider();
+        BookKeeperTestClient client = new BookKeeperTestClient(clientConf, 
testStatsProvider);
+        byte[] passwd = "testPassword".getBytes();
+        int numEntries = 2;
+        byte[] testEntry = "testEntry".getBytes();
+
+        // should fail to write entries whey WQ == AQ == 3
+        try (LedgerHandle lh = client.createLedger(numBookies, numBookies, 
numBookies, DigestType.CRC32, passwd)) {
+            for (int i = 0; i <= numEntries; i++) {
+                lh.addEntry(testEntry);
+            }
+            fail("Should have failed with not enough bookies to write");
+        } catch (BKException.BKNotEnoughBookiesException bke) {
+            // expected
+        }
+
+        // check failed handshake counter
+        InetSocketAddress addr = bookie.getLocalAddress().getSocketAddress();
+        StringBuilder nameBuilder = new 
StringBuilder(BookKeeperClientStats.CHANNEL_SCOPE)
+                .append(".")
+                .append(addr.getAddress().getHostAddress()
+                        .replace('.', '_')
+                        .replace('-', '_'))
+                .append("_")
+                .append(addr.getPort())
+                .append(".");
+
+        assertEquals("TLS handshake failure expected", 1,
+                client.getTestStatsProvider().getCounter(nameBuilder.toString()
+                + 
BookKeeperClientStats.FAILED_TLS_HANDSHAKE_COUNTER).get().longValue());
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to