This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e3ece34  Initialize TLS context only when attempting connection (#4399)
e3ece34 is described below

commit e3ece349d060b7ce0fd40e37e3a6d197036274c9
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 29 16:20:49 2019 -0700

    Initialize TLS context only when attempting connection (#4399)
    
    * Initialize TLS context only when attempting connection
    
    * Fixed eviction time check
---
 .../client/impl/PulsarChannelInitializer.java      | 53 +++++++++++-------
 .../org/apache/pulsar/client/util/ObjectCache.java | 54 ++++++++++++++++++
 .../client/impl/ClientInitializationTest.java      | 45 +++++++++++++++
 .../apache/pulsar/client/util/ObjectCacheTest.java | 64 ++++++++++++++++++++++
 4 files changed, 195 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 5d0f872..af7d23c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import java.security.cert.X509Certificate;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import io.netty.channel.ChannelInitializer;
@@ -28,9 +29,9 @@ import io.netty.handler.ssl.SslContext;
 
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ObjectCache;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.util.SecurityUtility;
 
 public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel> {
@@ -38,44 +39,54 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
     public static final String TLS_HANDLER = "tls";
 
     private final Supplier<ClientCnx> clientCnxSupplier;
-    private final SslContext sslCtx;
-    private final ClientConfigurationData conf;
+    private final boolean tlsEnabled;
+
+    private final Supplier<SslContext> sslContextSupplier;
+
+    private static final long TLS_CERTIFICATE_CACHE_MILLIS = 
TimeUnit.MINUTES.toMillis(1);
 
     public PulsarChannelInitializer(ClientConfigurationData conf, 
Supplier<ClientCnx> clientCnxSupplier)
             throws Exception {
         super();
         this.clientCnxSupplier = clientCnxSupplier;
+        this.tlsEnabled = conf.isUseTls();
+
         if (conf.isUseTls()) {
-            // Set client certificate if available
-            AuthenticationDataProvider authData = 
conf.getAuthentication().getAuthData();
-            if (authData.hasDataForTls()) {
-                this.sslCtx = 
SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
-                        conf.getTlsTrustCertsFilePath(), (X509Certificate[]) 
authData.getTlsCertificates(),
-                        authData.getTlsPrivateKey());
-            } else {
-                this.sslCtx = 
SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
-                        conf.getTlsTrustCertsFilePath());
-            }
+            sslContextSupplier = new ObjectCache<SslContext>(() -> {
+                try {
+                    // Set client certificate if available
+                    AuthenticationDataProvider authData = 
conf.getAuthentication().getAuthData();
+                    if (authData.hasDataForTls()) {
+                        return 
SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+                                conf.getTlsTrustCertsFilePath(), 
(X509Certificate[]) authData.getTlsCertificates(),
+                                authData.getTlsPrivateKey());
+                    } else {
+                        return 
SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+                                conf.getTlsTrustCertsFilePath());
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to create TLS context", 
e);
+                }
+            }, TLS_CERTIFICATE_CACHE_MILLIS, TimeUnit.MILLISECONDS);
         } else {
-            this.sslCtx = null;
+            sslContextSupplier = null;
         }
-        this.conf = conf;
     }
 
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
-        if (sslCtx != null) {
-            ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+        if (tlsEnabled) {
+            ch.pipeline().addLast(TLS_HANDLER, 
sslContextSupplier.get().newHandler(ch.alloc()));
             ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.COPYING_ENCODER);
         } else {
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
 
         ch.pipeline()
-          .addLast("frameDecoder",
-                   new LengthFieldBasedFrameDecoder(
-                       Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING,
-                       0, 4, 0, 4));
+                .addLast("frameDecoder",
+                        new LengthFieldBasedFrameDecoder(
+                                Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING,
+                                0, 4, 0, 4));
         ch.pipeline().addLast("handler", clientCnxSupplier.get());
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java
new file mode 100644
index 0000000..63c07f2
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.function.Supplier;
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+
+public class ObjectCache<T> implements Supplier<T> {
+
+    private final Supplier<T> supplier;
+    private T cachedInstance;
+
+    private final long cacheDurationMillis;
+    private long lastRefreshTimestamp;
+    private final Clock clock;
+
+    public ObjectCache(Supplier<T> supplier, long cacheDuration, TimeUnit 
unit) {
+        this(supplier, cacheDuration, unit, Clock.systemUTC());
+    }
+
+    ObjectCache(Supplier<T> supplier, long cacheDuration, TimeUnit unit, Clock 
clock) {
+        this.supplier = supplier;
+        this.cachedInstance = null;
+        this.cacheDurationMillis = unit.toMillis(cacheDuration);
+        this.clock = clock;
+    }
+
+    public synchronized T get() {
+        long now = clock.millis();
+        if (cachedInstance == null || (now - lastRefreshTimestamp) >= 
cacheDurationMillis) {
+            cachedInstance = supplier.get();
+            lastRefreshTimestamp = now;
+        }
+
+        return cachedInstance;
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
new file mode 100644
index 0000000..0169296
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.testng.annotations.Test;
+
+public class ClientInitializationTest {
+
+    @Test
+    public void testInitializeAuthWithTls() throws PulsarClientException {
+        Authentication auth = mock(Authentication.class);
+
+        PulsarClient.builder()
+                .serviceUrl("pulsar+ssl://my-host:6650")
+                .authentication(auth)
+                .build();
+
+        // Auth should only be started, though we shouldn't have tried to get 
credentials yet (until we first attempt to
+        // connect).
+        verify(auth).start();
+        verifyNoMoreInteractions(auth);
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
new file mode 100644
index 0000000..771e387
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import org.testng.annotations.Test;
+
+public class ObjectCacheTest {
+    @Test
+    public void testCache() {
+
+        AtomicLong currentTime = new AtomicLong(0);
+
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).then(invocation -> currentTime);
+
+        AtomicInteger currentValue = new AtomicInteger(0);
+
+        Supplier<Integer> cache = new ObjectCache<>(() -> 
currentValue.getAndIncrement(),
+                10, TimeUnit.MILLISECONDS, clock);
+
+        assertEquals(cache.get().intValue(), 0);
+        assertEquals(cache.get().intValue(), 0);
+
+        currentTime.set(1);
+        // Still the value has not expired
+        assertEquals(cache.get().intValue(), 0);
+
+        currentTime.set(10);
+        assertEquals(cache.get().intValue(), 1);
+
+
+        currentTime.set(15);
+        assertEquals(cache.get().intValue(), 1);
+
+        currentTime.set(22);
+        assertEquals(cache.get().intValue(), 2);
+    }
+}

Reply via email to