This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a187eb72b5dbc429dac1cb313a64b736622ed14f Author: linlinnn <[email protected]> AuthorDate: Mon Mar 29 11:19:42 2021 +0800 [ISSUE 9783][pulsar-client] Allow pulsar client receive external timer (#9802) Fixed #9783 Allow pulsar client to receive external timer instance Add new constructor to provide an external timer, and share timer in pulsar proxy - [x] Make sure that the change passes the CI checks. (cherry picked from commit af6eabae335ed009dc8a54408ffda6c77348db7f) --- .../pulsar/client/impl/PulsarClientImpl.java | 19 +++++++++++-- .../pulsar/client/impl/PulsarClientImplTest.java | 32 ++++++++++++++++++++++ .../pulsar/proxy/server/ProxyConnection.java | 4 +-- .../apache/pulsar/proxy/server/ProxyService.java | 11 ++++++++ .../pulsar/sql/presto/PulsarConnectorCache.java | 2 +- 5 files changed, 62 insertions(+), 6 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 43b8acb..7aa554b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -94,6 +94,7 @@ public class PulsarClientImpl implements PulsarClient { private LookupService lookup; private final ConnectionPool cnxPool; private final Timer timer; + private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorService; @@ -130,11 +131,16 @@ public class PulsarClientImpl implements PulsarClient { } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup)); + this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException { + this(conf, eventLoopGroup, cnxPool, null); + } + + public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer) + throws PulsarClientException { if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) { throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration"); } @@ -151,7 +157,12 @@ public class PulsarClientImpl implements PulsarClient { } else { lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), externalExecutorProvider.getExecutor()); } - timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); + if (timer == null) { + this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); + needStopTimer = true; + } else { + this.timer = timer; + } producers = Collections.newSetFromMap(new ConcurrentHashMap<>()); consumers = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -601,7 +612,9 @@ public class PulsarClientImpl implements PulsarClient { try { lookup.close(); cnxPool.close(); - timer.stop(); + if (needStopTimer) { + timer.stop(); + } externalExecutorProvider.shutdownNow(); internalExecutorService.shutdownNow(); conf.getAuthentication().close(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index f7df415..d712c26 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotSame; import static org.testng.Assert.assertSame; @@ -33,8 +34,10 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; +import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.DefaultThreadFactory; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; @@ -54,6 +57,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.mockito.internal.util.reflection.FieldSetter; +import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -146,4 +150,32 @@ public class PulsarClientImplTest { assertSame(consumer.getState(), HandlerState.State.Closed)); } + @Test + public void testInitializeWithoutTimer() throws Exception { + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("pulsar://localhost:6650"); + PulsarClientImpl client = new PulsarClientImpl(conf); + + HashedWheelTimer timer = mock(HashedWheelTimer.class); + Field field = client.getClass().getDeclaredField("timer"); + field.setAccessible(true); + field.set(client, timer); + + client.shutdown(); + verify(timer).stop(); + } + + @Test + public void testInitializeWithTimer() throws PulsarClientException { + ClientConfigurationData conf = new ClientConfigurationData(); + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test")); + ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop)); + conf.setServiceUrl("pulsar://localhost:6650"); + + HashedWheelTimer timer = new HashedWheelTimer(); + PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool, timer); + + client.shutdown(); + client.timer().stop(); + } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 63e652c..a483ee3 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -299,7 +299,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi if (!service.getConfiguration().isAuthenticationEnabled()) { this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(), new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion))); + () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer()); completeConnect(); return; @@ -436,7 +436,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi final String clientAuthMethod, final int protocolVersion) throws PulsarClientException { return new PulsarClientImpl(clientConf, service.getWorkerGroup(), new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf, - service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion))); + service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer()); } private static int getProtocolVersionToAdvertise(CommandConnect connect) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 58ba3cf..c2e157c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -27,6 +27,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import lombok.Getter; @@ -67,6 +69,7 @@ import com.google.common.collect.Sets; public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; + private final Timer timer; private String serviceUrl; private String serviceUrlTls; private ConfigurationCacheService configurationCacheService; @@ -124,6 +127,7 @@ public class ProxyService implements Closeable { AuthenticationService authenticationService) throws IOException { checkNotNull(proxyConfig); this.proxyConfig = proxyConfig; + this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); this.clientCnxs = Sets.newConcurrentHashSet(); this.topicStats = Maps.newConcurrentMap(); @@ -243,6 +247,9 @@ public class ProxyService implements Closeable { acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); + if (timer != null) { + timer.stop(); + } } public String getServiceUrl() { @@ -257,6 +264,10 @@ public class ProxyService implements Closeable { return proxyConfig; } + public Timer getTimer() { + return timer; + } + public AuthenticationService getAuthenticationService() { return authenticationService; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index c10312a..b16cc46 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -102,7 +102,7 @@ public class PulsarConnectorCache { .setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval()) .setStickyReadsEnabled(false) .setReadEntryTimeout(60) - .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue()) + .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())PulsarClientImplTest.java .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads()) .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
