This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch revert-9802-issue-9783
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 80e03c07dd673338f06c3b17bedb662c87d106c7
Author: lipenghui <[email protected]>
AuthorDate: Mon Mar 29 11:22:04 2021 +0800

    Revert "[ISSUE 9783][pulsar-client] Allow pulsar client receive external 
timer (#9802)"
    
    This reverts commit af6eabae335ed009dc8a54408ffda6c77348db7f.
---
 .../pulsar/client/impl/PulsarClientImpl.java       | 19 ++-----------
 .../pulsar/client/impl/PulsarClientImplTest.java   | 32 ----------------------
 .../pulsar/proxy/server/ProxyConnection.java       |  4 +--
 .../apache/pulsar/proxy/server/ProxyService.java   | 11 --------
 4 files changed, 5 insertions(+), 61 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e66fb24..2f8dd27 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -94,7 +94,6 @@ public class PulsarClientImpl implements PulsarClient {
     private LookupService lookup;
     private final ConnectionPool cnxPool;
     private final Timer timer;
-    private boolean needStopTimer;
     private final ExecutorProvider externalExecutorProvider;
     private final ExecutorProvider internalExecutorService;
 
@@ -132,16 +131,11 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), 
null);
+        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null);
-    }
-
-    public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool, Timer timer)
-            throws PulsarClientException {
         if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == 
null) {
             throw new 
PulsarClientException.InvalidConfigurationException("Invalid client 
configuration");
         }
@@ -158,12 +152,7 @@ public class PulsarClientImpl implements PulsarClient {
         } else {
             lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), 
conf.getListenerName(), conf.isUseTls(), 
externalExecutorProvider.getExecutor());
         }
-        if (timer == null) {
-            this.timer = new 
HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
-            needStopTimer = true;
-        } else {
-            this.timer = timer;
-        }
+        timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, 
TimeUnit.MILLISECONDS);
         producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
         consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
@@ -682,9 +671,7 @@ public class PulsarClientImpl implements PulsarClient {
         try {
             lookup.close();
             cnxPool.close();
-            if (needStopTimer) {
-                timer.stop();
-            }
+            timer.stop();
             externalExecutorProvider.shutdownNow();
             internalExecutorService.shutdownNow();
             conf.getAuthentication().close();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 2492fc8..5aeb981 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -22,7 +22,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotSame;
 import static org.testng.Assert.assertSame;
@@ -34,10 +33,8 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
-import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
@@ -56,7 +53,6 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
-import org.mockito.Mockito;
 import org.powermock.reflect.Whitebox;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -150,32 +146,4 @@ public class PulsarClientImplTest {
                 assertSame(consumer.getState(), HandlerState.State.Closed));
     }
 
-    @Test
-    public void testInitializeWithoutTimer() throws Exception {
-        ClientConfigurationData conf = new ClientConfigurationData();
-        conf.setServiceUrl("pulsar://localhost:6650");
-        PulsarClientImpl client = new PulsarClientImpl(conf);
-
-        HashedWheelTimer timer = mock(HashedWheelTimer.class);
-        Field field = client.getClass().getDeclaredField("timer");
-        field.setAccessible(true);
-        field.set(client, timer);
-
-        client.shutdown();
-        verify(timer).stop();
-    }
-
-    @Test
-    public void testInitializeWithTimer() throws PulsarClientException {
-        ClientConfigurationData conf = new ClientConfigurationData();
-        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new 
DefaultThreadFactory("test"));
-        ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
-        conf.setServiceUrl("pulsar://localhost:6650");
-
-        HashedWheelTimer timer = new HashedWheelTimer();
-        PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool, 
timer);
-
-        client.shutdown();
-        client.timer().stop();
-    }
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 3fdb0d5e..5d4c576 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -299,7 +299,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
             if (!service.getConfiguration().isAuthenticationEnabled()) {
                 this.client = new PulsarClientImpl(clientConf, 
service.getWorkerGroup(),
                     new ProxyConnectionPool(clientConf, 
service.getWorkerGroup(),
-                        () -> new ClientCnx(clientConf, 
service.getWorkerGroup(), protocolVersion)), service.getTimer());
+                        () -> new ClientCnx(clientConf, 
service.getWorkerGroup(), protocolVersion)));
 
                 completeConnect();
                 return;
@@ -436,7 +436,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
             final String clientAuthMethod, final int protocolVersion) throws 
PulsarClientException {
         return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
                 new ProxyConnectionPool(clientConf, service.getWorkerGroup(), 
() -> new ProxyClientCnx(clientConf,
-                        service.getWorkerGroup(), clientAuthRole, 
clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer());
+                        service.getWorkerGroup(), clientAuthRole, 
clientAuthData, clientAuthMethod, protocolVersion)));
     }
 
     private static int getProtocolVersionToAdvertise(CommandConnect connect) {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index e10d2d4..b3b6aea 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -27,8 +27,6 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timer;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import lombok.Getter;
@@ -71,7 +69,6 @@ import com.google.common.collect.Sets;
 public class ProxyService implements Closeable {
 
     private final ProxyConfiguration proxyConfig;
-    private final Timer timer;
     private String serviceUrl;
     private String serviceUrlTls;
     private ConfigurationMetadataCacheService configurationCacheService;
@@ -131,7 +128,6 @@ public class ProxyService implements Closeable {
                         AuthenticationService authenticationService) throws 
IOException {
         checkNotNull(proxyConfig);
         this.proxyConfig = proxyConfig;
-        this.timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, 
TimeUnit.MILLISECONDS);
         this.clientCnxs = Sets.newConcurrentHashSet();
         this.topicStats = Maps.newConcurrentMap();
 
@@ -265,9 +261,6 @@ public class ProxyService implements Closeable {
         }
         acceptorGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
-        if (timer != null) {
-            timer.stop();
-        }
     }
 
     public String getServiceUrl() {
@@ -282,10 +275,6 @@ public class ProxyService implements Closeable {
         return proxyConfig;
     }
 
-    public Timer getTimer() {
-        return timer;
-    }
-
     public AuthenticationService getAuthenticationService() {
         return authenticationService;
     }

Reply via email to