This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch revert-9802-issue-9783 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 80e03c07dd673338f06c3b17bedb662c87d106c7 Author: lipenghui <[email protected]> AuthorDate: Mon Mar 29 11:22:04 2021 +0800 Revert "[ISSUE 9783][pulsar-client] Allow pulsar client receive external timer (#9802)" This reverts commit af6eabae335ed009dc8a54408ffda6c77348db7f. --- .../pulsar/client/impl/PulsarClientImpl.java | 19 ++----------- .../pulsar/client/impl/PulsarClientImplTest.java | 32 ---------------------- .../pulsar/proxy/server/ProxyConnection.java | 4 +-- .../apache/pulsar/proxy/server/ProxyService.java | 11 -------- 4 files changed, 5 insertions(+), 61 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index e66fb24..2f8dd27 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -94,7 +94,6 @@ public class PulsarClientImpl implements PulsarClient { private LookupService lookup; private final ConnectionPool cnxPool; private final Timer timer; - private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorService; @@ -132,16 +131,11 @@ public class PulsarClientImpl implements PulsarClient { } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null); + this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup)); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, null); - } - - public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer) - throws PulsarClientException { if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) { throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration"); } @@ -158,12 +152,7 @@ public class PulsarClientImpl implements PulsarClient { } else { lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), externalExecutorProvider.getExecutor()); } - if (timer == null) { - this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); - needStopTimer = true; - } else { - this.timer = timer; - } + timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); producers = Collections.newSetFromMap(new ConcurrentHashMap<>()); consumers = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -682,9 +671,7 @@ public class PulsarClientImpl implements PulsarClient { try { lookup.close(); cnxPool.close(); - if (needStopTimer) { - timer.stop(); - } + timer.stop(); externalExecutorProvider.shutdownNow(); internalExecutorService.shutdownNow(); conf.getAuthentication().close(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 2492fc8..5aeb981 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -22,7 +22,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotSame; import static org.testng.Assert.assertSame; @@ -34,10 +33,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; -import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.DefaultThreadFactory; -import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; @@ -56,7 +53,6 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.netty.EventLoopUtil; -import org.mockito.Mockito; import org.powermock.reflect.Whitebox; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -150,32 +146,4 @@ public class PulsarClientImplTest { assertSame(consumer.getState(), HandlerState.State.Closed)); } - @Test - public void testInitializeWithoutTimer() throws Exception { - ClientConfigurationData conf = new ClientConfigurationData(); - conf.setServiceUrl("pulsar://localhost:6650"); - PulsarClientImpl client = new PulsarClientImpl(conf); - - HashedWheelTimer timer = mock(HashedWheelTimer.class); - Field field = client.getClass().getDeclaredField("timer"); - field.setAccessible(true); - field.set(client, timer); - - client.shutdown(); - verify(timer).stop(); - } - - @Test - public void testInitializeWithTimer() throws PulsarClientException { - ClientConfigurationData conf = new ClientConfigurationData(); - EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test")); - ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop)); - conf.setServiceUrl("pulsar://localhost:6650"); - - HashedWheelTimer timer = new HashedWheelTimer(); - PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool, timer); - - client.shutdown(); - client.timer().stop(); - } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 3fdb0d5e..5d4c576 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -299,7 +299,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi if (!service.getConfiguration().isAuthenticationEnabled()) { this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(), new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer()); + () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion))); completeConnect(); return; @@ -436,7 +436,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi final String clientAuthMethod, final int protocolVersion) throws PulsarClientException { return new PulsarClientImpl(clientConf, service.getWorkerGroup(), new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf, - service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer()); + service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion))); } private static int getProtocolVersionToAdvertise(CommandConnect connect) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index e10d2d4..b3b6aea 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -27,8 +27,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timer; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import lombok.Getter; @@ -71,7 +69,6 @@ import com.google.common.collect.Sets; public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; - private final Timer timer; private String serviceUrl; private String serviceUrlTls; private ConfigurationMetadataCacheService configurationCacheService; @@ -131,7 +128,6 @@ public class ProxyService implements Closeable { AuthenticationService authenticationService) throws IOException { checkNotNull(proxyConfig); this.proxyConfig = proxyConfig; - this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); this.clientCnxs = Sets.newConcurrentHashSet(); this.topicStats = Maps.newConcurrentMap(); @@ -265,9 +261,6 @@ public class ProxyService implements Closeable { } acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); - if (timer != null) { - timer.stop(); - } } public String getServiceUrl() { @@ -282,10 +275,6 @@ public class ProxyService implements Closeable { return proxyConfig; } - public Timer getTimer() { - return timer; - } - public AuthenticationService getAuthenticationService() { return authenticationService; }
