This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new af6eaba  [ISSUE 9783][pulsar-client] Allow pulsar client receive 
external timer (#9802)
af6eaba is described below

commit af6eabae335ed009dc8a54408ffda6c77348db7f
Author: linlinnn <[email protected]>
AuthorDate: Mon Mar 29 11:19:42 2021 +0800

    [ISSUE 9783][pulsar-client] Allow pulsar client receive external timer 
(#9802)
    
    Fixed #9783
    
    ### Motivation
    
    Allow pulsar client to receive external timer instance
    
    ### Modifications
    
    Add new constructor to provide an external timer, and share timer in pulsar 
proxy
    
    ### Verifying this change
    - [x] Make sure that the change passes the CI checks.
---
 .../pulsar/client/impl/PulsarClientImpl.java       | 19 +++++++++++--
 .../pulsar/client/impl/PulsarClientImplTest.java   | 32 ++++++++++++++++++++++
 .../pulsar/proxy/server/ProxyConnection.java       |  4 +--
 .../apache/pulsar/proxy/server/ProxyService.java   | 11 ++++++++
 4 files changed, 61 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 2f8dd27..e66fb24 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -94,6 +94,7 @@ public class PulsarClientImpl implements PulsarClient {
     private LookupService lookup;
     private final ConnectionPool cnxPool;
     private final Timer timer;
+    private boolean needStopTimer;
     private final ExecutorProvider externalExecutorProvider;
     private final ExecutorProvider internalExecutorService;
 
@@ -131,11 +132,16 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
+        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), 
null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
+        this(conf, eventLoopGroup, cnxPool, null);
+    }
+
+    public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool, Timer timer)
+            throws PulsarClientException {
         if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == 
null) {
             throw new 
PulsarClientException.InvalidConfigurationException("Invalid client 
configuration");
         }
@@ -152,7 +158,12 @@ public class PulsarClientImpl implements PulsarClient {
         } else {
             lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), 
conf.getListenerName(), conf.isUseTls(), 
externalExecutorProvider.getExecutor());
         }
-        timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, 
TimeUnit.MILLISECONDS);
+        if (timer == null) {
+            this.timer = new 
HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
+            needStopTimer = true;
+        } else {
+            this.timer = timer;
+        }
         producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
         consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
@@ -671,7 +682,9 @@ public class PulsarClientImpl implements PulsarClient {
         try {
             lookup.close();
             cnxPool.close();
-            timer.stop();
+            if (needStopTimer) {
+                timer.stop();
+            }
             externalExecutorProvider.shutdownNow();
             internalExecutorService.shutdownNow();
             conf.getAuthentication().close();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 5aeb981..2492fc8 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotSame;
 import static org.testng.Assert.assertSame;
@@ -33,8 +34,10 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
@@ -53,6 +56,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.mockito.Mockito;
 import org.powermock.reflect.Whitebox;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -146,4 +150,32 @@ public class PulsarClientImplTest {
                 assertSame(consumer.getState(), HandlerState.State.Closed));
     }
 
+    @Test
+    public void testInitializeWithoutTimer() throws Exception {
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("pulsar://localhost:6650");
+        PulsarClientImpl client = new PulsarClientImpl(conf);
+
+        HashedWheelTimer timer = mock(HashedWheelTimer.class);
+        Field field = client.getClass().getDeclaredField("timer");
+        field.setAccessible(true);
+        field.set(client, timer);
+
+        client.shutdown();
+        verify(timer).stop();
+    }
+
+    @Test
+    public void testInitializeWithTimer() throws PulsarClientException {
+        ClientConfigurationData conf = new ClientConfigurationData();
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new 
DefaultThreadFactory("test"));
+        ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
+        conf.setServiceUrl("pulsar://localhost:6650");
+
+        HashedWheelTimer timer = new HashedWheelTimer();
+        PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool, 
timer);
+
+        client.shutdown();
+        client.timer().stop();
+    }
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 5d4c576..3fdb0d5e 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -299,7 +299,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
             if (!service.getConfiguration().isAuthenticationEnabled()) {
                 this.client = new PulsarClientImpl(clientConf, 
service.getWorkerGroup(),
                     new ProxyConnectionPool(clientConf, 
service.getWorkerGroup(),
-                        () -> new ClientCnx(clientConf, 
service.getWorkerGroup(), protocolVersion)));
+                        () -> new ClientCnx(clientConf, 
service.getWorkerGroup(), protocolVersion)), service.getTimer());
 
                 completeConnect();
                 return;
@@ -436,7 +436,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
             final String clientAuthMethod, final int protocolVersion) throws 
PulsarClientException {
         return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
                 new ProxyConnectionPool(clientConf, service.getWorkerGroup(), 
() -> new ProxyClientCnx(clientConf,
-                        service.getWorkerGroup(), clientAuthRole, 
clientAuthData, clientAuthMethod, protocolVersion)));
+                        service.getWorkerGroup(), clientAuthRole, 
clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer());
     }
 
     private static int getProtocolVersionToAdvertise(CommandConnect connect) {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index b3b6aea..e10d2d4 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -27,6 +27,8 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import lombok.Getter;
@@ -69,6 +71,7 @@ import com.google.common.collect.Sets;
 public class ProxyService implements Closeable {
 
     private final ProxyConfiguration proxyConfig;
+    private final Timer timer;
     private String serviceUrl;
     private String serviceUrlTls;
     private ConfigurationMetadataCacheService configurationCacheService;
@@ -128,6 +131,7 @@ public class ProxyService implements Closeable {
                         AuthenticationService authenticationService) throws 
IOException {
         checkNotNull(proxyConfig);
         this.proxyConfig = proxyConfig;
+        this.timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, 
TimeUnit.MILLISECONDS);
         this.clientCnxs = Sets.newConcurrentHashSet();
         this.topicStats = Maps.newConcurrentMap();
 
@@ -261,6 +265,9 @@ public class ProxyService implements Closeable {
         }
         acceptorGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
+        if (timer != null) {
+            timer.stop();
+        }
     }
 
     public String getServiceUrl() {
@@ -275,6 +282,10 @@ public class ProxyService implements Closeable {
         return proxyConfig;
     }
 
+    public Timer getTimer() {
+        return timer;
+    }
+
     public AuthenticationService getAuthenticationService() {
         return authenticationService;
     }

Reply via email to