This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new af6eaba [ISSUE 9783][pulsar-client] Allow pulsar client receive
external timer (#9802)
af6eaba is described below
commit af6eabae335ed009dc8a54408ffda6c77348db7f
Author: linlinnn <[email protected]>
AuthorDate: Mon Mar 29 11:19:42 2021 +0800
[ISSUE 9783][pulsar-client] Allow pulsar client receive external timer
(#9802)
Fixed #9783
### Motivation
Allow pulsar client to receive external timer instance
### Modifications
Add new constructor to provide an external timer, and share timer in pulsar
proxy
### Verifying this change
- [x] Make sure that the change passes the CI checks.
---
.../pulsar/client/impl/PulsarClientImpl.java | 19 +++++++++++--
.../pulsar/client/impl/PulsarClientImplTest.java | 32 ++++++++++++++++++++++
.../pulsar/proxy/server/ProxyConnection.java | 4 +--
.../apache/pulsar/proxy/server/ProxyService.java | 11 ++++++++
4 files changed, 61 insertions(+), 5 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 2f8dd27..e66fb24 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -94,6 +94,7 @@ public class PulsarClientImpl implements PulsarClient {
private LookupService lookup;
private final ConnectionPool cnxPool;
private final Timer timer;
+ private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorService;
@@ -131,11 +132,16 @@ public class PulsarClientImpl implements PulsarClient {
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) throws PulsarClientException {
- this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
+ this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup),
null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
+ this(conf, eventLoopGroup, cnxPool, null);
+ }
+
+ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool, Timer timer)
+ throws PulsarClientException {
if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup ==
null) {
throw new
PulsarClientException.InvalidConfigurationException("Invalid client
configuration");
}
@@ -152,7 +158,12 @@ public class PulsarClientImpl implements PulsarClient {
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(),
conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
}
- timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1,
TimeUnit.MILLISECONDS);
+ if (timer == null) {
+ this.timer = new
HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
+ needStopTimer = true;
+ } else {
+ this.timer = timer;
+ }
producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -671,7 +682,9 @@ public class PulsarClientImpl implements PulsarClient {
try {
lookup.close();
cnxPool.close();
- timer.stop();
+ if (needStopTimer) {
+ timer.stop();
+ }
externalExecutorProvider.shutdownNow();
internalExecutorService.shutdownNow();
conf.getAuthentication().close();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 5aeb981..2492fc8 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertSame;
@@ -33,8 +34,10 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
@@ -53,6 +56,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -146,4 +150,32 @@ public class PulsarClientImplTest {
assertSame(consumer.getState(), HandlerState.State.Closed));
}
+ @Test
+ public void testInitializeWithoutTimer() throws Exception {
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl("pulsar://localhost:6650");
+ PulsarClientImpl client = new PulsarClientImpl(conf);
+
+ HashedWheelTimer timer = mock(HashedWheelTimer.class);
+ Field field = client.getClass().getDeclaredField("timer");
+ field.setAccessible(true);
+ field.set(client, timer);
+
+ client.shutdown();
+ verify(timer).stop();
+ }
+
+ @Test
+ public void testInitializeWithTimer() throws PulsarClientException {
+ ClientConfigurationData conf = new ClientConfigurationData();
+ EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new
DefaultThreadFactory("test"));
+ ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
+ conf.setServiceUrl("pulsar://localhost:6650");
+
+ HashedWheelTimer timer = new HashedWheelTimer();
+ PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool,
timer);
+
+ client.shutdown();
+ client.timer().stop();
+ }
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 5d4c576..3fdb0d5e 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -299,7 +299,7 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
if (!service.getConfiguration().isAuthenticationEnabled()) {
this.client = new PulsarClientImpl(clientConf,
service.getWorkerGroup(),
new ProxyConnectionPool(clientConf,
service.getWorkerGroup(),
- () -> new ClientCnx(clientConf,
service.getWorkerGroup(), protocolVersion)));
+ () -> new ClientCnx(clientConf,
service.getWorkerGroup(), protocolVersion)), service.getTimer());
completeConnect();
return;
@@ -436,7 +436,7 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
final String clientAuthMethod, final int protocolVersion) throws
PulsarClientException {
return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ProxyClientCnx(clientConf,
- service.getWorkerGroup(), clientAuthRole,
clientAuthData, clientAuthMethod, protocolVersion)));
+ service.getWorkerGroup(), clientAuthRole,
clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer());
}
private static int getProtocolVersionToAdvertise(CommandConnect connect) {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index b3b6aea..e10d2d4 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -27,6 +27,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import lombok.Getter;
@@ -69,6 +71,7 @@ import com.google.common.collect.Sets;
public class ProxyService implements Closeable {
private final ProxyConfiguration proxyConfig;
+ private final Timer timer;
private String serviceUrl;
private String serviceUrlTls;
private ConfigurationMetadataCacheService configurationCacheService;
@@ -128,6 +131,7 @@ public class ProxyService implements Closeable {
AuthenticationService authenticationService) throws
IOException {
checkNotNull(proxyConfig);
this.proxyConfig = proxyConfig;
+ this.timer = new HashedWheelTimer(new
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1,
TimeUnit.MILLISECONDS);
this.clientCnxs = Sets.newConcurrentHashSet();
this.topicStats = Maps.newConcurrentMap();
@@ -261,6 +265,9 @@ public class ProxyService implements Closeable {
}
acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
+ if (timer != null) {
+ timer.stop();
+ }
}
public String getServiceUrl() {
@@ -275,6 +282,10 @@ public class ProxyService implements Closeable {
return proxyConfig;
}
+ public Timer getTimer() {
+ return timer;
+ }
+
public AuthenticationService getAuthenticationService() {
return authenticationService;
}