This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e3ece34 Initialize TLS context only when attempting connection (#4399)
e3ece34 is described below
commit e3ece349d060b7ce0fd40e37e3a6d197036274c9
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 29 16:20:49 2019 -0700
Initialize TLS context only when attempting connection (#4399)
* Initialize TLS context only when attempting connection
* Fixed eviction time check
---
.../client/impl/PulsarChannelInitializer.java | 53 +++++++++++-------
.../org/apache/pulsar/client/util/ObjectCache.java | 54 ++++++++++++++++++
.../client/impl/ClientInitializationTest.java | 45 +++++++++++++++
.../apache/pulsar/client/util/ObjectCacheTest.java | 64 ++++++++++++++++++++++
4 files changed, 195 insertions(+), 21 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 5d0f872..af7d23c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import java.security.cert.X509Certificate;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import io.netty.channel.ChannelInitializer;
@@ -28,9 +29,9 @@ import io.netty.handler.ssl.SslContext;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ObjectCache;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.util.SecurityUtility;
public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel> {
@@ -38,44 +39,54 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
public static final String TLS_HANDLER = "tls";
private final Supplier<ClientCnx> clientCnxSupplier;
- private final SslContext sslCtx;
- private final ClientConfigurationData conf;
+ private final boolean tlsEnabled;
+
+ private final Supplier<SslContext> sslContextSupplier;
+
+ private static final long TLS_CERTIFICATE_CACHE_MILLIS =
TimeUnit.MINUTES.toMillis(1);
public PulsarChannelInitializer(ClientConfigurationData conf,
Supplier<ClientCnx> clientCnxSupplier)
throws Exception {
super();
this.clientCnxSupplier = clientCnxSupplier;
+ this.tlsEnabled = conf.isUseTls();
+
if (conf.isUseTls()) {
- // Set client certificate if available
- AuthenticationDataProvider authData =
conf.getAuthentication().getAuthData();
- if (authData.hasDataForTls()) {
- this.sslCtx =
SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
- conf.getTlsTrustCertsFilePath(), (X509Certificate[])
authData.getTlsCertificates(),
- authData.getTlsPrivateKey());
- } else {
- this.sslCtx =
SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
- conf.getTlsTrustCertsFilePath());
- }
+ sslContextSupplier = new ObjectCache<SslContext>(() -> {
+ try {
+ // Set client certificate if available
+ AuthenticationDataProvider authData =
conf.getAuthentication().getAuthData();
+ if (authData.hasDataForTls()) {
+ return
SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+ conf.getTlsTrustCertsFilePath(),
(X509Certificate[]) authData.getTlsCertificates(),
+ authData.getTlsPrivateKey());
+ } else {
+ return
SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+ conf.getTlsTrustCertsFilePath());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create TLS context",
e);
+ }
+ }, TLS_CERTIFICATE_CACHE_MILLIS, TimeUnit.MILLISECONDS);
} else {
- this.sslCtx = null;
+ sslContextSupplier = null;
}
- this.conf = conf;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
- if (sslCtx != null) {
- ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+ if (tlsEnabled) {
+ ch.pipeline().addLast(TLS_HANDLER,
sslContextSupplier.get().newHandler(ch.alloc()));
ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
ch.pipeline()
- .addLast("frameDecoder",
- new LengthFieldBasedFrameDecoder(
- Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING,
- 0, 4, 0, 4));
+ .addLast("frameDecoder",
+ new LengthFieldBasedFrameDecoder(
+ Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING,
+ 0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java
new file mode 100644
index 0000000..63c07f2
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ObjectCache.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.function.Supplier;
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+
+public class ObjectCache<T> implements Supplier<T> {
+
+ private final Supplier<T> supplier;
+ private T cachedInstance;
+
+ private final long cacheDurationMillis;
+ private long lastRefreshTimestamp;
+ private final Clock clock;
+
+ public ObjectCache(Supplier<T> supplier, long cacheDuration, TimeUnit
unit) {
+ this(supplier, cacheDuration, unit, Clock.systemUTC());
+ }
+
+ ObjectCache(Supplier<T> supplier, long cacheDuration, TimeUnit unit, Clock
clock) {
+ this.supplier = supplier;
+ this.cachedInstance = null;
+ this.cacheDurationMillis = unit.toMillis(cacheDuration);
+ this.clock = clock;
+ }
+
+ public synchronized T get() {
+ long now = clock.millis();
+ if (cachedInstance == null || (now - lastRefreshTimestamp) >=
cacheDurationMillis) {
+ cachedInstance = supplier.get();
+ lastRefreshTimestamp = now;
+ }
+
+ return cachedInstance;
+ }
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
new file mode 100644
index 0000000..0169296
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.testng.annotations.Test;
+
+public class ClientInitializationTest {
+
+ @Test
+ public void testInitializeAuthWithTls() throws PulsarClientException {
+ Authentication auth = mock(Authentication.class);
+
+ PulsarClient.builder()
+ .serviceUrl("pulsar+ssl://my-host:6650")
+ .authentication(auth)
+ .build();
+
+ // Auth should only be started, though we shouldn't have tried to get
credentials yet (until we first attempt to
+ // connect).
+ verify(auth).start();
+ verifyNoMoreInteractions(auth);
+ }
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
new file mode 100644
index 0000000..771e387
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/util/ObjectCacheTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import org.testng.annotations.Test;
+
+public class ObjectCacheTest {
+ @Test
+ public void testCache() {
+
+ AtomicLong currentTime = new AtomicLong(0);
+
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).then(invocation -> currentTime);
+
+ AtomicInteger currentValue = new AtomicInteger(0);
+
+ Supplier<Integer> cache = new ObjectCache<>(() ->
currentValue.getAndIncrement(),
+ 10, TimeUnit.MILLISECONDS, clock);
+
+ assertEquals(cache.get().intValue(), 0);
+ assertEquals(cache.get().intValue(), 0);
+
+ currentTime.set(1);
+ // Still the value has not expired
+ assertEquals(cache.get().intValue(), 0);
+
+ currentTime.set(10);
+ assertEquals(cache.get().intValue(), 1);
+
+
+ currentTime.set(15);
+ assertEquals(cache.get().intValue(), 1);
+
+ currentTime.set(22);
+ assertEquals(cache.get().intValue(), 2);
+ }
+}