This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bca1e82b49 add unit test for QueryServer (#12599)
bca1e82b49 is described below

commit bca1e82b49138f6c04a7c4870fdadb66d269bbe7
Author: sullis <[email protected]>
AuthorDate: Fri Mar 8 10:22:24 2024 -0800

    add unit test for QueryServer (#12599)
---
 .../apache/pinot/core/transport/QueryServer.java   | 28 ++++----
 .../pinot/core/transport/QueryServerTest.java      | 79 ++++++++++++++++++++++
 2 files changed, 92 insertions(+), 15 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index 915b2eb1f2..0e1c7d1aa0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pinot.core.transport;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocatorMetric;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -59,7 +59,7 @@ public class QueryServer {
   private final EventLoopGroup _workerGroup;
   private final Class<? extends ServerSocketChannel> _channelClass;
   private final ChannelHandler _instanceRequestHandler;
-  private Channel _channel;
+  private ServerSocketChannel _channel;
 
   /**
    * Create an unsecured server instance
@@ -85,16 +85,12 @@ public class QueryServer {
 
     boolean enableNativeTransports = nettyConfig != null && 
nettyConfig.isNativeTransportsEnabled();
     OsCheck.OSType operatingSystemType = OsCheck.getOperatingSystemType();
-    if (enableNativeTransports
-        && operatingSystemType == OsCheck.OSType.Linux
-        && Epoll.isAvailable()) {
+    if (enableNativeTransports && operatingSystemType == OsCheck.OSType.Linux 
&& Epoll.isAvailable()) {
       _bossGroup = new EpollEventLoopGroup();
       _workerGroup = new EpollEventLoopGroup();
       _channelClass = EpollServerSocketChannel.class;
       LOGGER.info("Using Epoll event loop");
-    } else if (enableNativeTransports
-        && operatingSystemType == OsCheck.OSType.MacOS
-        && KQueue.isAvailable()) {
+    } else if (enableNativeTransports && operatingSystemType == 
OsCheck.OSType.MacOS && KQueue.isAvailable()) {
       _bossGroup = new KQueueEventLoopGroup();
       _workerGroup = new KQueueEventLoopGroup();
       _channelClass = KQueueServerSocketChannel.class;
@@ -104,11 +100,9 @@ public class QueryServer {
       _workerGroup = new NioEventLoopGroup();
       _channelClass = NioServerSocketChannel.class;
       StringBuilder log = new StringBuilder("Using NIO event loop");
-      if (operatingSystemType == OsCheck.OSType.Linux
-          && enableNativeTransports) {
+      if (operatingSystemType == OsCheck.OSType.Linux && 
enableNativeTransports) {
         log.append(", as Epoll is not available: 
").append(Epoll.unavailabilityCause());
-      } else if (operatingSystemType == OsCheck.OSType.MacOS
-          && enableNativeTransports) {
+      } else if (operatingSystemType == OsCheck.OSType.MacOS && 
enableNativeTransports) {
         log.append(", as KQueue is not available: 
").append(KQueue.unavailabilityCause());
       }
       LOGGER.info(log.toString());
@@ -130,10 +124,9 @@ public class QueryServer {
       
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL, 
metric::normalCacheSize);
       
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_THREADLOCALCACHE, 
metric::numThreadLocalCaches);
       metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CHUNK_SIZE, 
metric::chunkSize);
-      _channel = serverBootstrap.group(_bossGroup, 
_workerGroup).channel(_channelClass)
+      _channel = (ServerSocketChannel) serverBootstrap.group(_bossGroup, 
_workerGroup).channel(_channelClass)
           .option(ChannelOption.SO_BACKLOG, 
128).childOption(ChannelOption.SO_KEEPALIVE, true)
-          .option(ChannelOption.ALLOCATOR, bufAllocator)
-          .childHandler(new ChannelInitializer<SocketChannel>() {
+          .option(ChannelOption.ALLOCATOR, bufAllocator).childHandler(new 
ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) {
               if (_tlsConfig != null) {
@@ -165,4 +158,9 @@ public class QueryServer {
       _bossGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
     }
   }
+
+  @VisibleForTesting
+  ServerSocketChannel getChannel() {
+    return _channel;
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
new file mode 100644
index 0000000000..bef2609067
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport;
+
+import io.netty.channel.ChannelHandler;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.common.config.NettyConfig;
+import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class QueryServerTest {
+  @DataProvider
+  public Object[][] parameters() {
+    return new Object[][]{
+        new Object[]{true}, new Object[]{false},
+    };
+  }
+
+  @Test(dataProvider = "parameters")
+  public void startAndStop(final boolean nativeTransportEnabled) {
+    PinotMetricUtils.init(new PinotConfiguration());
+    PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+    ServerMetrics.register(new ServerMetrics(registry));
+    NettyConfig nettyConfig = new NettyConfig();
+    nettyConfig.setNativeTransportsEnabled(nativeTransportEnabled);
+    TlsConfig tlsConfig = new TlsConfig();
+    ChannelHandler channelHandler = mock(ChannelHandler.class);
+
+    QueryServer server = new QueryServer(0, nettyConfig, tlsConfig, 
channelHandler);
+    server.start();
+
+    final InetSocketAddress serverAddress = server.getChannel().localAddress();
+
+    assertTrue(connectionOk(serverAddress));
+
+    server.shutDown();
+    assertFalse(connectionOk(serverAddress));
+  }
+
+  private static boolean connectionOk(InetSocketAddress address) {
+    Socket s = null;
+    try {
+      s = new Socket(address.getHostName(), address.getPort());
+      return s.isConnected();
+    } catch (Exception e) {
+      return false;
+    } finally {
+      IOUtils.closeQuietly(s);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to